-
Notifications
You must be signed in to change notification settings - Fork 1.8k
routing: introduce metrics for unmatched (dropped) logs #11144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Eduardo Silva <[email protected]>
Signed-off-by: Eduardo Silva <[email protected]>
WalkthroughThese changes add comprehensive tracking and metrics for unmatched log records in routing scenarios. A new Changes
Sequence DiagramsequenceDiagram
participant Router as Route Handler
participant Payload as Payload Builder
participant Metrics as Metrics Calculator
participant Recorder as Metrics Recorder
Router->>Payload: split_and_append_route_payloads()
Payload->>Payload: allocate matched_any_route[]
loop For each record
Payload->>Payload: build_payload_for_route()
alt Record matches route
Payload->>Payload: set matched_any_route[i] = 1
end
end
Payload->>Metrics: calculate_unmatched_route_metrics()
Metrics->>Metrics: iterate records, find unmatched
Metrics->>Metrics: encode unmatched via log_encoder
Metrics-->>Recorder: return dropped_records, dropped_bytes
alt Unmatched records exist
Recorder->>Recorder: Update logs_drop_records_total
Recorder->>Recorder: Update logs_drop_bytes_total
note over Recorder: Labels: input, "unmatched"
end
Payload->>Payload: cleanup matched_any_route
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
src/flb_task.c (1)
34-37: Consider conditionally including flb_time.h.The
flb_time.hheader is included unconditionally but is only used within theFLB_HAVE_METRICSguarded code. For consistency and to avoid unnecessary includes when metrics are disabled, consider moving it inside the guard.Apply this diff:
-#include <fluent-bit/flb_time.h> #ifdef FLB_HAVE_METRICS +#include <fluent-bit/flb_time.h> #include <fluent-bit/flb_metrics.h> #endifsrc/flb_input_log.c (1)
40-40: Consider conditionally including flb_time.h.The
flb_time.hheader is only used in the metrics recording block (line 1216). For consistency with conditional compilation when metrics are disabled, consider guarding this include.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/flb_input_log.c(12 hunks)src/flb_task.c(3 hunks)
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/flb_task.csrc/flb_input_log.c
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.
Applied to files:
src/flb_task.csrc/flb_input_log.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/flb_task.csrc/flb_input_log.c
📚 Learning: 2025-08-29T06:24:44.797Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:26-26
Timestamp: 2025-08-29T06:24:44.797Z
Learning: In Fluent Bit, ZSTD support is always available and enabled by default. The build system automatically detects and uses either the system libzstd library or builds the bundled ZSTD version. Unlike other optional dependencies like Arrow which use conditional compilation guards (e.g., FLB_HAVE_ARROW), ZSTD does not require conditional includes or build flags.
Applied to files:
src/flb_input_log.c
📚 Learning: 2025-08-29T06:24:55.855Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: src/aws/flb_aws_compress.c:52-56
Timestamp: 2025-08-29T06:24:55.855Z
Learning: ZSTD compression is always available in Fluent Bit and does not require conditional compilation guards. Unlike Arrow/Parquet which use #ifdef FLB_HAVE_ARROW guards, ZSTD is built unconditionally with flb_zstd.c included directly in src/CMakeLists.txt and a bundled ZSTD library at lib/zstd-1.5.7/.
Applied to files:
src/flb_input_log.c
🧬 Code graph analysis (2)
src/flb_task.c (3)
src/flb_input.c (1)
flb_input_name(790-797)src/flb_input_chunk.c (1)
flb_input_chunk_get_real_size(456-484)lib/cmetrics/src/cmt_counter.c (1)
cmt_counter_add(119-135)
src/flb_input_log.c (7)
src/flb_log_event_encoder.c (2)
flb_log_event_encoder_create(76-97)flb_log_event_encoder_destroy(99-116)src/flb_log_event_decoder.c (1)
flb_log_event_decoder_get_record_type(458-478)include/fluent-bit/flb_mem.h (2)
flb_calloc(84-96)flb_free(126-128)src/flb_router_condition.c (1)
flb_router_chunk_context_destroy(187-190)src/flb_event.c (1)
flb_event_chunk_destroy(69-79)src/flb_input.c (1)
flb_input_name(790-797)lib/cmetrics/src/cmt_counter.c (1)
cmt_counter_add(119-135)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: run-windows-unit-tests / call-build-windows-unit-test (Windows 32bit, x86, x86-windows-static, 3.31.6, windows-latest)
- GitHub Check: run-windows-unit-tests / call-build-windows-unit-test (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DC...
- GitHub Check: run-windows-unit-tests / call-build-windows-unit-test (Windows 64bit, x64, x64-windows-static, 3.31.6, windows-latest)
- GitHub Check: run-qemu-ubuntu-unit-tests (s390x)
- GitHub Check: run-macos-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6)
- GitHub Check: run-qemu-ubuntu-unit-tests (riscv64)
- GitHub Check: run-macos-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6)
🔇 Additional comments (8)
src/flb_task.c (1)
867-869: LGTM! Proper placement of unmatched metrics call.The metrics recording is correctly placed when no routes are found (
count == 0) and properly guarded withFLB_HAVE_METRICS. The function call receives all necessary parameters.src/flb_input_log.c (7)
443-449: LGTM! Function signature properly extended.The addition of the
matched_any_routeparameter correctly tracks which records match any route, enabling accurate drop metrics calculation.
461-463: LGTM! Proper validation added.The validation correctly ensures
matched_any_routeis non-NULL before proceeding, maintaining consistency with other parameter checks.
505-505: LGTM! Correct tracking of matched records.The flag is properly set when a record matches a specific route condition.
681-681: LGTM! Correct tracking for default route matches.The
matched_any_routeflag is properly set for records that match the default route, ensuring comprehensive tracking.Also applies to: 686-686
782-882: LGTM! Well-structured metrics calculation function.The function correctly:
- Encodes unmatched records using the log event encoder
- Preserves group structure during encoding
- Counts dropped records and calculates dropped bytes
- Handles errors appropriately with proper encoder cleanup
1138-1147: LGTM! Proper allocation with error handling.The allocation of
matched_any_routearray includes appropriate error handling and cleanup of previously allocated resources.
1169-1169: LGTM! Proper cleanup of matched_any_route.The
matched_any_routearray is correctly freed on all error paths and at the end of successful execution, preventing memory leaks.Also applies to: 1192-1192, 1234-1234
| { | ||
| size_t dropped_records = 0; | ||
| size_t dropped_bytes = 0; | ||
| struct flb_router *router = NULL; | ||
| char *input_label = NULL; | ||
| char *output_label = "unmatched"; | ||
| uint64_t now; | ||
|
|
||
| ret = calculate_unmatched_route_metrics(records_array, | ||
| record_count, | ||
| matched_any_route, | ||
| &dropped_records, | ||
| &dropped_bytes); | ||
| if (ret == 0 && dropped_records > 0 && ins->config && ins->config->router) { | ||
| router = ins->config->router; | ||
| input_label = (char *) flb_input_name(ins); | ||
| now = cfl_time_now(); | ||
|
|
||
| cmt_counter_add(router->logs_drop_records_total, | ||
| now, | ||
| (double) dropped_records, | ||
| 2, | ||
| (char *[]){input_label, output_label}); | ||
|
|
||
| cmt_counter_add(router->logs_drop_bytes_total, | ||
| now, | ||
| (double) dropped_bytes, | ||
| 2, | ||
| (char *[]){input_label, output_label}); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Guard metrics code with FLB_HAVE_METRICS.
This entire block uses metrics APIs (cmt_counter_add) and router metric counters without being guarded by #ifdef FLB_HAVE_METRICS. This will cause compilation failures when metrics are disabled, as these symbols won't be available. Additionally, the code doesn't verify that the metric counters are non-NULL before use.
Apply this diff to fix both issues:
+#ifdef FLB_HAVE_METRICS
{
size_t dropped_records = 0;
size_t dropped_bytes = 0;
struct flb_router *router = NULL;
char *input_label = NULL;
char *output_label = "unmatched";
uint64_t now;
ret = calculate_unmatched_route_metrics(records_array,
record_count,
matched_any_route,
&dropped_records,
&dropped_bytes);
- if (ret == 0 && dropped_records > 0 && ins->config && ins->config->router) {
+ if (ret == 0 && dropped_records > 0 && ins->config && ins->config->router &&
+ ins->config->router->logs_drop_records_total &&
+ ins->config->router->logs_drop_bytes_total) {
router = ins->config->router;
input_label = (char *) flb_input_name(ins);
now = cfl_time_now();
cmt_counter_add(router->logs_drop_records_total,
now,
(double) dropped_records,
2,
(char *[]){input_label, output_label});
cmt_counter_add(router->logs_drop_bytes_total,
now,
(double) dropped_bytes,
2,
(char *[]){input_label, output_label});
}
}
+#endif📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| { | |
| size_t dropped_records = 0; | |
| size_t dropped_bytes = 0; | |
| struct flb_router *router = NULL; | |
| char *input_label = NULL; | |
| char *output_label = "unmatched"; | |
| uint64_t now; | |
| ret = calculate_unmatched_route_metrics(records_array, | |
| record_count, | |
| matched_any_route, | |
| &dropped_records, | |
| &dropped_bytes); | |
| if (ret == 0 && dropped_records > 0 && ins->config && ins->config->router) { | |
| router = ins->config->router; | |
| input_label = (char *) flb_input_name(ins); | |
| now = cfl_time_now(); | |
| cmt_counter_add(router->logs_drop_records_total, | |
| now, | |
| (double) dropped_records, | |
| 2, | |
| (char *[]){input_label, output_label}); | |
| cmt_counter_add(router->logs_drop_bytes_total, | |
| now, | |
| (double) dropped_bytes, | |
| 2, | |
| (char *[]){input_label, output_label}); | |
| } | |
| } | |
| #ifdef FLB_HAVE_METRICS | |
| { | |
| size_t dropped_records = 0; | |
| size_t dropped_bytes = 0; | |
| struct flb_router *router = NULL; | |
| char *input_label = NULL; | |
| char *output_label = "unmatched"; | |
| uint64_t now; | |
| ret = calculate_unmatched_route_metrics(records_array, | |
| record_count, | |
| matched_any_route, | |
| &dropped_records, | |
| &dropped_bytes); | |
| if (ret == 0 && dropped_records > 0 && ins->config && ins->config->router && | |
| ins->config->router->logs_drop_records_total && | |
| ins->config->router->logs_drop_bytes_total) { | |
| router = ins->config->router; | |
| input_label = (char *) flb_input_name(ins); | |
| now = cfl_time_now(); | |
| cmt_counter_add(router->logs_drop_records_total, | |
| now, | |
| (double) dropped_records, | |
| 2, | |
| (char *[]){input_label, output_label}); | |
| cmt_counter_add(router->logs_drop_bytes_total, | |
| now, | |
| (double) dropped_bytes, | |
| 2, | |
| (char *[]){input_label, output_label}); | |
| } | |
| } | |
| #endif |
🤖 Prompt for AI Agents
In src/flb_input_log.c around lines 1200 to 1230, the metrics usage block must
be guarded by FLB_HAVE_METRICS and should check metric pointers before use: wrap
the entire block that calls cmt_counter_add and accesses router metric members
inside #ifdef FLB_HAVE_METRICS ... #endif, and inside that guard after obtaining
router ensure router->logs_drop_records_total and router->logs_drop_bytes_total
are non-NULL before calling cmt_counter_add (skip or early-return if either is
NULL) to avoid dereferencing null pointers when metrics are disabled or not
initialized.
| #ifdef FLB_HAVE_METRICS | ||
| static void record_unmatched_route_drop_metrics(struct flb_input_instance *ins, | ||
| struct flb_input_chunk *ic, | ||
| size_t chunk_size) | ||
| { | ||
| struct flb_router *router; | ||
| uint64_t now; | ||
| double dropped_bytes; | ||
| char *labels[2]; | ||
|
|
||
| if (!ins || !ic || !ins->config) { | ||
| return; | ||
| } | ||
|
|
||
| if (ic->event_type != FLB_INPUT_LOGS || ic->total_records <= 0) { | ||
| return; | ||
| } | ||
|
|
||
| router = ins->config->router; | ||
| if (!router) { | ||
| return; | ||
| } | ||
|
|
||
| now = cfl_time_now(); | ||
| labels[0] = (char *) flb_input_name(ins); | ||
| labels[1] = "unmatched"; | ||
|
|
||
| dropped_bytes = (double) chunk_size; | ||
| if (dropped_bytes <= 0) { | ||
| ssize_t real_size; | ||
|
|
||
| real_size = flb_input_chunk_get_real_size(ic); | ||
| if (real_size > 0) { | ||
| dropped_bytes = (double) real_size; | ||
| } | ||
| else { | ||
| dropped_bytes = 0; | ||
| } | ||
| } | ||
|
|
||
| cmt_counter_add(router->logs_drop_records_total, | ||
| now, | ||
| (double) ic->total_records, | ||
| 2, | ||
| labels); | ||
|
|
||
| cmt_counter_add(router->logs_drop_bytes_total, | ||
| now, | ||
| dropped_bytes, | ||
| 2, | ||
| labels); | ||
| } | ||
| #endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add NULL checks for router metric counters.
The function directly uses router->logs_drop_records_total and router->logs_drop_bytes_total (lines 120, 126) without verifying they are non-NULL, which could lead to crashes if the router metrics are not properly initialized.
Apply this diff to add safety checks:
static void record_unmatched_route_drop_metrics(struct flb_input_instance *ins,
struct flb_input_chunk *ic,
size_t chunk_size)
{
struct flb_router *router;
uint64_t now;
double dropped_bytes;
char *labels[2];
if (!ins || !ic || !ins->config) {
return;
}
if (ic->event_type != FLB_INPUT_LOGS || ic->total_records <= 0) {
return;
}
router = ins->config->router;
- if (!router) {
+ if (!router || !router->logs_drop_records_total || !router->logs_drop_bytes_total) {
return;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #ifdef FLB_HAVE_METRICS | |
| static void record_unmatched_route_drop_metrics(struct flb_input_instance *ins, | |
| struct flb_input_chunk *ic, | |
| size_t chunk_size) | |
| { | |
| struct flb_router *router; | |
| uint64_t now; | |
| double dropped_bytes; | |
| char *labels[2]; | |
| if (!ins || !ic || !ins->config) { | |
| return; | |
| } | |
| if (ic->event_type != FLB_INPUT_LOGS || ic->total_records <= 0) { | |
| return; | |
| } | |
| router = ins->config->router; | |
| if (!router) { | |
| return; | |
| } | |
| now = cfl_time_now(); | |
| labels[0] = (char *) flb_input_name(ins); | |
| labels[1] = "unmatched"; | |
| dropped_bytes = (double) chunk_size; | |
| if (dropped_bytes <= 0) { | |
| ssize_t real_size; | |
| real_size = flb_input_chunk_get_real_size(ic); | |
| if (real_size > 0) { | |
| dropped_bytes = (double) real_size; | |
| } | |
| else { | |
| dropped_bytes = 0; | |
| } | |
| } | |
| cmt_counter_add(router->logs_drop_records_total, | |
| now, | |
| (double) ic->total_records, | |
| 2, | |
| labels); | |
| cmt_counter_add(router->logs_drop_bytes_total, | |
| now, | |
| dropped_bytes, | |
| 2, | |
| labels); | |
| } | |
| #endif | |
| #ifdef FLB_HAVE_METRICS | |
| static void record_unmatched_route_drop_metrics(struct flb_input_instance *ins, | |
| struct flb_input_chunk *ic, | |
| size_t chunk_size) | |
| { | |
| struct flb_router *router; | |
| uint64_t now; | |
| double dropped_bytes; | |
| char *labels[2]; | |
| if (!ins || !ic || !ins->config) { | |
| return; | |
| } | |
| if (ic->event_type != FLB_INPUT_LOGS || ic->total_records <= 0) { | |
| return; | |
| } | |
| router = ins->config->router; | |
| if (!router || !router->logs_drop_records_total || !router->logs_drop_bytes_total) { | |
| return; | |
| } | |
| now = cfl_time_now(); | |
| labels[0] = (char *) flb_input_name(ins); | |
| labels[1] = "unmatched"; | |
| dropped_bytes = (double) chunk_size; | |
| if (dropped_bytes <= 0) { | |
| ssize_t real_size; | |
| real_size = flb_input_chunk_get_real_size(ic); | |
| if (real_size > 0) { | |
| dropped_bytes = (double) real_size; | |
| } | |
| else { | |
| dropped_bytes = 0; | |
| } | |
| } | |
| cmt_counter_add(router->logs_drop_records_total, | |
| now, | |
| (double) ic->total_records, | |
| 2, | |
| labels); | |
| cmt_counter_add(router->logs_drop_bytes_total, | |
| now, | |
| dropped_bytes, | |
| 2, | |
| labels); | |
| } | |
| #endif |
🤖 Prompt for AI Agents
In src/flb_task.c around lines 80 to 132, the function assumes
router->logs_drop_records_total and router->logs_drop_bytes_total are valid; add
NULL checks before calling cmt_counter_add to avoid crashes. Modify the logic to
verify each metric pointer is non-NULL and only call cmt_counter_add for the
counters that exist (or return/skip if both are NULL), preserving the same
labels and values; ensure no dereference occurs when a metric pointer is NULL.
Routing metrics now includes new metrics to account the dropped logs:
the
inputlabel represents the input plugin that generated the data (or it alias) and the output for dropped records will always point out tounmatched:Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit