Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Testing of Fluent-bit with several filters shows log processing falling < 5mb/s #9399

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions examples/k8s_perf_test/run-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#/bin/bash
export TEST_NAMESPACE="${TEST_NAMESPACE:-default}"
export FLUENTBIT_IMAGE_REPOSITORY=${FLUENTBIT_IMAGE_REPOSITORY:-ghcr.io/fluent/fluent-bit}
export FLUENTBIT_IMAGE_TAG=${FLUENTBIT_IMAGE_TAG:-latest}

# update helm
helm repo add fluent https://fluent.github.io/helm-charts/
helm repo update --fail-on-repo-update-fail

echo "Installing fluent-bit via helm in namespace $TEST_NAMESPACE"
helm upgrade --install --debug --create-namespace --namespace "$TEST_NAMESPACE" fluent-bit fluent/fluent-bit \
--values values.yaml \
--set image.repository=${FLUENTBIT_IMAGE_REPOSITORY},image.tag=${FLUENTBIT_IMAGE_TAG} \
--timeout "${HELM_FB_TIMEOUT:-5m0s}" \
--wait

export POD_NAME=$(kubectl get pods --namespace $TEST_NAMESPACE -l "app.kubernetes.io/name=fluent-bit,app.kubernetes.io/instance=fluent-bit" --field-selector status.phase=Running -o jsonpath="{.items[-1].metadata.name}")
echo "$POD_NAME" deployed, tailing logs
kubectl logs -n $TEST_NAMESPACE $POD_NAME -c logwriter -f

#NOTE You can also follow -c fluent-bit for the fluent-bit logs if you'd like

# To rest the test, just `helm uninstall fluent-bit` in your $TEST_NAMESPACE and re-run ./run-test.sh
288 changes: 288 additions & 0 deletions examples/k8s_perf_test/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
kind: Deployment
image:
pullPolicy: Always
replicaCount: 1
rbac:
create: true
extraVolumeMounts:
- mountPath: /app/perftest/containers
name: perftest-volume
extraVolumes:
- name: perftest-volume
emptyDir: {}

env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
extraContainers:
- name: logwriter
image: python:3.12.6-bookworm
command: ["/bin/bash", "/fluent-bit/etc/conf/run-log-writer-test.sh"]
volumeMounts:
- name: config
mountPath: /fluent-bit/etc/conf
- mountPath: /app/perftest/containers
name: perftest-volume
env:
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: NAMESPACE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace

podLabels:
label1: "test1"
label2: "test2"
label3: "test3"
label4: "test4"

config:
service: |
[SERVICE]
Flush 0.25
Grace 2
Daemon Off
Log_Level info
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
Parsers_File /fluent-bit/etc/conf/custom_parsers.conf

inputs: |
[INPUT]
name tail
read_from_head false
skip_long_lines on
path /app/perftest/containers/*.log
multiline.parser cri
Tag kube.*
#buffer_chunk_size 5M
#buffer_max_size 5M
Rotate_Wait 60
Refresh_Interval 1
mem_buf_limit 50MB
threaded on
#Inotify_Watcher false

# If you want to bypass filters, you must pass an empty string, otherwise helm
# will give you some defaults
filters: ""

filters-simple: |
[FILTER]
Name modify
Match kube*
Copy level severity
Remove _p

[FILTER]
Name kubernetes
Alias get_k8s_metadata
Kube_Tag_Prefix kube.app.perftest.containers
Match kube*
K8S-Logging.Parser Off
K8S-Logging.Exclude Off
#Use_Kubelet On
#Kubelet_Host ${NODE_IP}
#Kubelet_Port 10250
Buffer_Size 2MB
Labels On
Annotations Off
#Merge_Log On moves 'log' field to message
Merge_Log On
Merge_Log_Trim On
kube_meta_cache_ttl 15m
kube_meta_namespace_cache_ttl 15m
Namespace_labels On
Namespace_annotations Off
namespace_metadata_only Off

filters-extended: |
[FILTER]
Name parser
Match kube*
Key_Name log
Reserve_Data True
Parser glog
Parser json

[FILTER]
Name modify
Match kube*
Copy level severity
Remove _p

[FILTER]
Name kubernetes
Alias get_k8s_metadata
Kube_Tag_Prefix kube.app.perftest.containers
Match kube*
K8S-Logging.Parser Off
K8S-Logging.Exclude Off
#Use_Kubelet On
#Kubelet_Host ${NODE_IP}
#Kubelet_Port 10250
Buffer_Size 2MB
Labels On
Annotations Off
#Merge_Log On moves 'log' field to message
Merge_Log On
Merge_Log_Trim On
kube_meta_cache_ttl 15m
kube_meta_namespace_cache_ttl 15m
Namespace_labels On
Namespace_annotations On
namespace_metadata_only Off

# We only want the metadata labels & host (compute resource), not all that other
# metadata, so we do this messy filtering below with the nest lift to move up
# meta info into kuberenetesmeta_*, then the lua script to set the compute resource tag,
# then we modify (delete) all 'kubernetesmeta' fields excluding kubernetesmeta_labels
[FILTER]
Name nest
Match kube*
Operation lift
Nested_under kubernetes
Add_prefix kubernetesmeta_

#[FILTER]
# lua not included, uses kubernetes_namespace data from record
# Name lua
# Alias fix_k8s_labels
# Match kube*
# Call fix_k8s_labels
# Script functions.lua

[FILTER]
Name modify
Alias remove_unused_k8s_meta
Match kube*
Remove_regex kubernetesmeta_(?!labels)
Remove_regex kubernetes_namespace

outputs: |
[OUTPUT]
Name null
Match *

customParsers: |
[PARSER]
Name json
Format json

[PARSER]
Name glog
Format regex
Regex ^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source_file>[^ \]]+)\:(?<source_line>\d+)\]\s(?<message>.*)$
Time_Key time
Time_Format %m%d %H:%M:%S.%L

extraFiles:
run-log-writer-test.sh: |
# LOG_NAME simulates a container log file name created by containerd
# since there's no universal way to get the container id from within the running container, just use a hardcoded fake one
# if you can't find one
GKE_CONTAINER_ID=$(grep 'systemd' /proc/1/mountinfo | awk '{ print $4 }'| awk -F'-' '{print $NF}' | awk -F'/' '{print $NF}')
CONTAINER_ID="${GKE_CONTAINER_ID:-70e91ee167ebe632e7a18a7fcc1b7bc9b81c1fbf0b4239f55c09c23817740a0f}"
LOG_NAME=${HOSTNAME}_${NAMESPACE_NAME}_logwriter-${CONTAINER_ID}
sleep 3
python3 /fluent-bit/etc/conf/test_runner.py --num-lines 10000000 --outfile /app/perftest/containers/${LOG_NAME}.log --flb-endpoint "http://localhost:2020" --sleep-between-rotations 0.75
sleep infinity

test_runner.py: |
#!/usr/bin/env python
import argparse
import logging
import time
import urllib.request
import json
import datetime
import os

NANOS_IN_SEC = 10**9
BYTES_IN_MB = 10**6

def main(args):
test_start = time.time_ns()
logging.info(f"starting test, writing to {args.outfile}")
# Write logs
file = open(args.outfile, 'a+', buffering=args.file_buffer_max)
start = time.time_ns()
bytes_written = 0
last_rotation = 0
rotations = 0
rotation_size = args.rotate_mb*BYTES_IN_MB

today = datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%dT%H:%M:%S')
for i in range(0, args.num_lines):
fake_nanos = f"{i%NANOS_IN_SEC:09d}" # we do this because it is immensely faster than standard/real datetime prints
line = f"{today}.{fake_nanos}Z stdout F this is a line {i}\n"
bytes_written += file.write(line)
if (bytes_written - last_rotation) > rotation_size:
last_rotation = bytes_written
rotations += 1
logging.info(f"Running log rotation number {rotations}")
for rev_rotation in range(rotations, 0, -1):
from_file = f"{args.outfile}.{rev_rotation-1}" if rev_rotation > 1 else f"{args.outfile}"
to_file = f"{args.outfile}.{rev_rotation}"
logging.debug(f"Rotating {from_file} -> {to_file}")
os.rename(from_file, to_file)
file = open(args.outfile, 'a+', buffering=args.file_buffer_max)
if args.sleep_between_rotations > 0:
time.sleep(args.sleep_between_rotations)

file.flush()
stop = time.time_ns()
seconds = (stop-start)/NANOS_IN_SEC
mb = bytes_written/BYTES_IN_MB
logging.info(f"records written={args.num_lines}, time={seconds}s, Mbs written={mb}, Mb/s={mb/seconds}. Log Rotations={rotations}")

# Wait for fluent-bit
waiting = True
url = f"{args.flb_endpoint}/api/v1/metrics"
attempts = args.timeout
while waiting and attempts > 0:
attempts -= 1
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as response:

metrics = json.loads(response.read().decode(response.info().get_param('charset') or 'utf-8'))
logging.debug(metrics)
logging.info(f"in_tail={metrics['input']['tail.0']['records']} output={metrics['output']['null.0']['proc_records']}")
if metrics["output"]["null.0"]["proc_records"] >= args.num_lines:
waiting = False
logging.info("All records found!")
logging.info(metrics)
break
except Exception as e:
logging.exception(e)
time.sleep(1)

if attempts <= 0:
logging.error(f"Test failed to complete in {args.timeout}s")
return
test_stop = time.time_ns()
time_to_complete = (test_stop-test_start)/NANOS_IN_SEC
flb_process_rate = bytes_written/BYTES_IN_MB/time_to_complete
logging.info(f"Test completed in {time_to_complete:.2f}s. Fluent-bit processing rate {flb_process_rate:.3f} Mb/s")

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format="%(asctime)s.%(msecs)03d %(message)s", datefmt='%Y-%m-%dT%H:%M:%S')

parser = argparse.ArgumentParser(description="Generates a log file, waits for fluent-bit to finish")
parser.add_argument("--num-lines", "-n", default=1000000, help='Number of Lines to write', type=int)
parser.add_argument("--outfile", "-o", default="./test.txt")
parser.add_argument("--file-buffer-max", default=1024*10000)
parser.add_argument("--flb-endpoint", default="http://localhost:2020")
parser.add_argument("--timeout", "-t", default=240, type=int, help="time in seconds to wait for fluentbit to finish processing")
parser.add_argument("--rotate-mb", default=100, type=int, help="do log rotation after --rotate-mb Mbs have been written")
parser.add_argument("--sleep-between-rotations", default=0, type=float, help="sleep seconds after log rotation")
main(parser.parse_args())
Loading