Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 164 additions & 6 deletions src/flb_input_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <fluent-bit/flb_event.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_time.h>

#include <chunkio/chunkio.h>

Expand Down Expand Up @@ -443,7 +444,8 @@ static int build_payload_for_route(struct flb_input_instance *ins,
struct flb_route_payload *payload,
struct flb_mp_chunk_record **records,
size_t record_count,
uint8_t *matched_non_default)
uint8_t *matched_non_default,
uint8_t *matched_any_route)
{
int i;
int j;
Expand All @@ -456,7 +458,8 @@ static int build_payload_for_route(struct flb_input_instance *ins,
struct flb_mp_chunk_record *group_start_record = NULL;
uint8_t *matched_by_route = NULL;

if (!payload || !records || record_count == 0 || !matched_non_default) {
if (!payload || !records || record_count == 0 || !matched_non_default ||
!matched_any_route) {
return -1;
}

Expand Down Expand Up @@ -499,6 +502,7 @@ static int build_payload_for_route(struct flb_input_instance *ins,

matched_by_route[i] = 1;
matched_non_default[i] = 1;
matched_any_route[i] = 1;
matched++;
}

Expand Down Expand Up @@ -603,7 +607,8 @@ static int build_payload_for_default_route(struct flb_input_instance *ins,
struct flb_route_payload *payload,
struct flb_mp_chunk_record **records,
size_t record_count,
uint8_t *matched_non_default)
uint8_t *matched_non_default,
uint8_t *matched_any_route)
{
int i;
int j;
Expand All @@ -616,7 +621,7 @@ static int build_payload_for_default_route(struct flb_input_instance *ins,
struct flb_mp_chunk_record *group_start_record = NULL;
int *matched_by_default = NULL;

if (!payload || !records || !matched_non_default) {
if (!payload || !records || !matched_non_default || !matched_any_route) {
return -1;
}

Expand Down Expand Up @@ -673,10 +678,12 @@ static int build_payload_for_default_route(struct flb_input_instance *ins,
condition_result = flb_router_condition_evaluate_record(payload->route, records[i]);
if (condition_result == FLB_TRUE) {
matched_by_default[i] = 1;
matched_any_route[i] = 1;
}
}
else {
matched_by_default[i] = 1;
matched_any_route[i] = 1;
}
}
}
Expand Down Expand Up @@ -772,6 +779,108 @@ static int build_payload_for_default_route(struct flb_input_instance *ins,
return 0;
}

static int calculate_unmatched_route_metrics(struct flb_mp_chunk_record **records,
size_t record_count,
uint8_t *matched_any_route,
size_t *dropped_records,
size_t *dropped_bytes)
{
size_t i;
size_t j;
int32_t record_type;
int32_t inner_type;
int ret;
struct flb_log_event_encoder *encoder;
struct flb_mp_chunk_record *group_end = NULL;
struct flb_mp_chunk_record *group_start_record = NULL;

if (!records || record_count == 0 || !matched_any_route ||
!dropped_records || !dropped_bytes) {
return -1;
}

encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);
if (!encoder) {
return -1;
}

*dropped_records = 0;
*dropped_bytes = 0;

for (i = 0; i < record_count; i++) {
if (flb_log_event_decoder_get_record_type(&records[i]->event, &record_type) != 0) {
continue;
}

if (record_type == FLB_LOG_EVENT_GROUP_START) {
group_start_record = records[i];
group_end = NULL;
continue;
}
else if (record_type == FLB_LOG_EVENT_GROUP_END) {
if (group_end != NULL &&
group_start_record != NULL &&
records[i]->cobj_group_metadata == group_start_record->cobj_group_metadata) {
ret = encode_chunk_record(encoder, group_end);
if (ret != 0) {
flb_log_event_encoder_destroy(encoder);
return -1;
}
group_end = NULL;
}
group_start_record = NULL;
continue;
}
else if (record_type != FLB_LOG_EVENT_NORMAL) {
continue;
}

if (!matched_any_route[i]) {
if (group_start_record != NULL &&
records[i]->cobj_group_metadata == group_start_record->cobj_group_metadata) {
if (group_end == NULL) {
ret = encode_chunk_record(encoder, group_start_record);
if (ret != 0) {
flb_log_event_encoder_destroy(encoder);
return -1;
}

for (j = i + 1; j < record_count; j++) {
if (flb_log_event_decoder_get_record_type(&records[j]->event, &inner_type) == 0 &&
inner_type == FLB_LOG_EVENT_GROUP_END &&
records[j]->cobj_group_metadata == group_start_record->cobj_group_metadata) {
group_end = records[j];
break;
}
}
}
}

ret = encode_chunk_record(encoder, records[i]);
if (ret != 0) {
flb_log_event_encoder_destroy(encoder);
return -1;
}

(*dropped_records)++;
}
}

if (group_end != NULL) {
ret = encode_chunk_record(encoder, group_end);
if (ret != 0) {
flb_log_event_encoder_destroy(encoder);
return -1;
}
}

*dropped_bytes = encoder->buffer.size;

flb_log_event_encoder_destroy(encoder);

return 0;
}

static void route_payload_list_destroy(struct cfl_list *payloads)
{
struct cfl_list *head;
Expand Down Expand Up @@ -879,6 +988,7 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
int context_initialized = FLB_FALSE;
size_t out_size = 0;
uint8_t *matched_non_default = NULL;
uint8_t *matched_any_route = NULL;
struct cfl_list payloads;
struct cfl_list *head;
struct cfl_list *tmp;
Expand Down Expand Up @@ -1025,6 +1135,17 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
return -1;
}

matched_any_route = flb_calloc(record_count, sizeof(uint8_t));
if (!matched_any_route) {
flb_errno();
flb_free(matched_non_default);
flb_free(records_array);
flb_router_chunk_context_destroy(&context);
route_payload_list_destroy(&payloads);
flb_event_chunk_destroy(chunk);
return -1;
}

index = 0;
cfl_list_foreach(head, &context.chunk_cobj->records) {
records_array[index++] = cfl_list_entry(head, struct flb_mp_chunk_record, _head);
Expand All @@ -1040,10 +1161,12 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
payload,
records_array,
record_count,
matched_non_default);
matched_non_default,
matched_any_route);
if (ret != 0) {
flb_free(records_array);
flb_free(matched_non_default);
flb_free(matched_any_route);
flb_router_chunk_context_destroy(&context);
route_payload_list_destroy(&payloads);
flb_event_chunk_destroy(chunk);
Expand All @@ -1061,19 +1184,54 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
payload,
records_array,
record_count,
matched_non_default);
matched_non_default,
matched_any_route);
if (ret != 0) {
flb_free(records_array);
flb_free(matched_non_default);
flb_free(matched_any_route);
flb_router_chunk_context_destroy(&context);
route_payload_list_destroy(&payloads);
flb_event_chunk_destroy(chunk);
return -1;
}
}

{
size_t dropped_records = 0;
size_t dropped_bytes = 0;
struct flb_router *router = NULL;
char *input_label = NULL;
char *output_label = "unmatched";
uint64_t now;

ret = calculate_unmatched_route_metrics(records_array,
record_count,
matched_any_route,
&dropped_records,
&dropped_bytes);
if (ret == 0 && dropped_records > 0 && ins->config && ins->config->router) {
router = ins->config->router;
input_label = (char *) flb_input_name(ins);
now = cfl_time_now();

cmt_counter_add(router->logs_drop_records_total,
now,
(double) dropped_records,
2,
(char *[]){input_label, output_label});

cmt_counter_add(router->logs_drop_bytes_total,
now,
(double) dropped_bytes,
2,
(char *[]){input_label, output_label});
}
}
Comment on lines +1200 to +1230
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Guard metrics code with FLB_HAVE_METRICS.

This entire block uses metrics APIs (cmt_counter_add) and router metric counters without being guarded by #ifdef FLB_HAVE_METRICS. This will cause compilation failures when metrics are disabled, as these symbols won't be available. Additionally, the code doesn't verify that the metric counters are non-NULL before use.

Apply this diff to fix both issues:

+#ifdef FLB_HAVE_METRICS
     {
         size_t dropped_records = 0;
         size_t dropped_bytes = 0;
         struct flb_router *router = NULL;
         char *input_label = NULL;
         char *output_label = "unmatched";
         uint64_t now;
 
         ret = calculate_unmatched_route_metrics(records_array,
                                                 record_count,
                                                 matched_any_route,
                                                 &dropped_records,
                                                 &dropped_bytes);
-        if (ret == 0 && dropped_records > 0 && ins->config && ins->config->router) {
+        if (ret == 0 && dropped_records > 0 && ins->config && ins->config->router &&
+            ins->config->router->logs_drop_records_total &&
+            ins->config->router->logs_drop_bytes_total) {
             router = ins->config->router;
             input_label = (char *) flb_input_name(ins);
             now = cfl_time_now();
 
             cmt_counter_add(router->logs_drop_records_total,
                             now,
                             (double) dropped_records,
                             2,
                             (char *[]){input_label, output_label});
 
             cmt_counter_add(router->logs_drop_bytes_total,
                             now,
                             (double) dropped_bytes,
                             2,
                             (char *[]){input_label, output_label});
         }
     }
+#endif
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
{
size_t dropped_records = 0;
size_t dropped_bytes = 0;
struct flb_router *router = NULL;
char *input_label = NULL;
char *output_label = "unmatched";
uint64_t now;
ret = calculate_unmatched_route_metrics(records_array,
record_count,
matched_any_route,
&dropped_records,
&dropped_bytes);
if (ret == 0 && dropped_records > 0 && ins->config && ins->config->router) {
router = ins->config->router;
input_label = (char *) flb_input_name(ins);
now = cfl_time_now();
cmt_counter_add(router->logs_drop_records_total,
now,
(double) dropped_records,
2,
(char *[]){input_label, output_label});
cmt_counter_add(router->logs_drop_bytes_total,
now,
(double) dropped_bytes,
2,
(char *[]){input_label, output_label});
}
}
#ifdef FLB_HAVE_METRICS
{
size_t dropped_records = 0;
size_t dropped_bytes = 0;
struct flb_router *router = NULL;
char *input_label = NULL;
char *output_label = "unmatched";
uint64_t now;
ret = calculate_unmatched_route_metrics(records_array,
record_count,
matched_any_route,
&dropped_records,
&dropped_bytes);
if (ret == 0 && dropped_records > 0 && ins->config && ins->config->router &&
ins->config->router->logs_drop_records_total &&
ins->config->router->logs_drop_bytes_total) {
router = ins->config->router;
input_label = (char *) flb_input_name(ins);
now = cfl_time_now();
cmt_counter_add(router->logs_drop_records_total,
now,
(double) dropped_records,
2,
(char *[]){input_label, output_label});
cmt_counter_add(router->logs_drop_bytes_total,
now,
(double) dropped_bytes,
2,
(char *[]){input_label, output_label});
}
}
#endif
🤖 Prompt for AI Agents
In src/flb_input_log.c around lines 1200 to 1230, the metrics usage block must
be guarded by FLB_HAVE_METRICS and should check metric pointers before use: wrap
the entire block that calls cmt_counter_add and accesses router metric members
inside #ifdef FLB_HAVE_METRICS ... #endif, and inside that guard after obtaining
router ensure router->logs_drop_records_total and router->logs_drop_bytes_total
are non-NULL before calling cmt_counter_add (skip or early-return if either is
NULL) to avoid dereferencing null pointers when metrics are disabled or not
initialized.


flb_free(records_array);
flb_free(matched_non_default);
flb_free(matched_any_route);

cfl_list_foreach_safe(head, tmp, &payloads) {
payload = cfl_list_entry(head, struct flb_route_payload, _head);
Expand Down
61 changes: 61 additions & 0 deletions src/flb_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_str.h>
#include <fluent-bit/flb_scheduler.h>
#include <fluent-bit/flb_time.h>
#ifdef FLB_HAVE_METRICS
#include <fluent-bit/flb_metrics.h>
#endif
#include <string.h>

/*
Expand Down Expand Up @@ -73,6 +77,60 @@ static inline void map_free_task_id(int id, struct flb_config *config)
config->task_map[id].task = NULL;
}

#ifdef FLB_HAVE_METRICS
static void record_unmatched_route_drop_metrics(struct flb_input_instance *ins,
struct flb_input_chunk *ic,
size_t chunk_size)
{
struct flb_router *router;
uint64_t now;
double dropped_bytes;
char *labels[2];

if (!ins || !ic || !ins->config) {
return;
}

if (ic->event_type != FLB_INPUT_LOGS || ic->total_records <= 0) {
return;
}

router = ins->config->router;
if (!router) {
return;
}

now = cfl_time_now();
labels[0] = (char *) flb_input_name(ins);
labels[1] = "unmatched";

dropped_bytes = (double) chunk_size;
if (dropped_bytes <= 0) {
ssize_t real_size;

real_size = flb_input_chunk_get_real_size(ic);
if (real_size > 0) {
dropped_bytes = (double) real_size;
}
else {
dropped_bytes = 0;
}
}

cmt_counter_add(router->logs_drop_records_total,
now,
(double) ic->total_records,
2,
labels);

cmt_counter_add(router->logs_drop_bytes_total,
now,
dropped_bytes,
2,
labels);
}
#endif
Comment on lines +80 to +132
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add NULL checks for router metric counters.

The function directly uses router->logs_drop_records_total and router->logs_drop_bytes_total (lines 120, 126) without verifying they are non-NULL, which could lead to crashes if the router metrics are not properly initialized.

Apply this diff to add safety checks:

 static void record_unmatched_route_drop_metrics(struct flb_input_instance *ins,
                                                 struct flb_input_chunk *ic,
                                                 size_t chunk_size)
 {
     struct flb_router *router;
     uint64_t now;
     double dropped_bytes;
     char *labels[2];
 
     if (!ins || !ic || !ins->config) {
         return;
     }
 
     if (ic->event_type != FLB_INPUT_LOGS || ic->total_records <= 0) {
         return;
     }
 
     router = ins->config->router;
-    if (!router) {
+    if (!router || !router->logs_drop_records_total || !router->logs_drop_bytes_total) {
         return;
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#ifdef FLB_HAVE_METRICS
static void record_unmatched_route_drop_metrics(struct flb_input_instance *ins,
struct flb_input_chunk *ic,
size_t chunk_size)
{
struct flb_router *router;
uint64_t now;
double dropped_bytes;
char *labels[2];
if (!ins || !ic || !ins->config) {
return;
}
if (ic->event_type != FLB_INPUT_LOGS || ic->total_records <= 0) {
return;
}
router = ins->config->router;
if (!router) {
return;
}
now = cfl_time_now();
labels[0] = (char *) flb_input_name(ins);
labels[1] = "unmatched";
dropped_bytes = (double) chunk_size;
if (dropped_bytes <= 0) {
ssize_t real_size;
real_size = flb_input_chunk_get_real_size(ic);
if (real_size > 0) {
dropped_bytes = (double) real_size;
}
else {
dropped_bytes = 0;
}
}
cmt_counter_add(router->logs_drop_records_total,
now,
(double) ic->total_records,
2,
labels);
cmt_counter_add(router->logs_drop_bytes_total,
now,
dropped_bytes,
2,
labels);
}
#endif
#ifdef FLB_HAVE_METRICS
static void record_unmatched_route_drop_metrics(struct flb_input_instance *ins,
struct flb_input_chunk *ic,
size_t chunk_size)
{
struct flb_router *router;
uint64_t now;
double dropped_bytes;
char *labels[2];
if (!ins || !ic || !ins->config) {
return;
}
if (ic->event_type != FLB_INPUT_LOGS || ic->total_records <= 0) {
return;
}
router = ins->config->router;
if (!router || !router->logs_drop_records_total || !router->logs_drop_bytes_total) {
return;
}
now = cfl_time_now();
labels[0] = (char *) flb_input_name(ins);
labels[1] = "unmatched";
dropped_bytes = (double) chunk_size;
if (dropped_bytes <= 0) {
ssize_t real_size;
real_size = flb_input_chunk_get_real_size(ic);
if (real_size > 0) {
dropped_bytes = (double) real_size;
}
else {
dropped_bytes = 0;
}
}
cmt_counter_add(router->logs_drop_records_total,
now,
(double) ic->total_records,
2,
labels);
cmt_counter_add(router->logs_drop_bytes_total,
now,
dropped_bytes,
2,
labels);
}
#endif
🤖 Prompt for AI Agents
In src/flb_task.c around lines 80 to 132, the function assumes
router->logs_drop_records_total and router->logs_drop_bytes_total are valid; add
NULL checks before calling cmt_counter_add to avoid crashes. Modify the logic to
verify each metric pointer is non-NULL and only call cmt_counter_add for the
counters that exist (or return/skip if both are NULL), preserving the same
labels and values; ensure no dereference occurs when a metric pointer is NULL.


static int task_collect_output_references(struct flb_config *config,
const struct flb_chunk_direct_route *route,
struct flb_output_instance ***out_matches,
Expand Down Expand Up @@ -806,6 +864,9 @@ struct flb_task *flb_task_create(uint64_t ref_id,

/* no destinations ?, useless task. */
if (count == 0) {
#ifdef FLB_HAVE_METRICS
record_unmatched_route_drop_metrics(i_ins, task_ic, size);
#endif
flb_debug("[task] created task=%p id=%i without routes, dropping.",
task, task->id);
if (router_context_initialized) {
Expand Down
Loading