Skip to content

in_tcp: in_udp: Enhance of parser capabilities#11705

Open
cosmo0920 wants to merge 5 commits intomasterfrom
cosmo0920-enhance-of-parser-capabilities
Open

in_tcp: in_udp: Enhance of parser capabilities#11705
cosmo0920 wants to merge 5 commits intomasterfrom
cosmo0920-enhance-of-parser-capabilities

Conversation

@cosmo0920
Copy link
Copy Markdown
Contributor

@cosmo0920 cosmo0920 commented Apr 14, 2026

In the current master branch, we didn't handle parser on in_tcp and in_udp plugins.
This could be reasonable to handle parser in both of plugins.

Closes #11688.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • TCP and UDP inputs: optional parser for line-delimited records, parsed timestamps, and optional source-address enrichment; selecting a parser forces the input format to "none" with a warning.
  • Bug Fixes / Behavior

    • Better fallback on parse failures, improved timestamp handling, and clearer initialization error reporting and cleanup when a configured parser is unavailable.
  • Tests

    • Added runtime tests for parser-present, unknown-parser, and parser-fallback scenarios (enabled when parser support is available).

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 14, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds optional parser support to TCP and UDP inputs: new parser config option, parser resolution during init (init fails if missing or unsupported), forces format to none when set, routes line-delimited segments through the parser (uses parser timestamps, can append source address), and adds runtime tests guarded by FLB_HAVE_PARSER.

Changes

Cohort / File(s) Summary
TCP: public & config
plugins/in_tcp/tcp.c, plugins/in_tcp/tcp.h
Add parser config entry; struct flb_in_tcp_config gains parser_name and conditional parser pointer.
TCP: init
plugins/in_tcp/tcp_config.c
Resolve parser_name via flb_parser_get() (fail init if missing or parser support unavailable); warn and force format to FLB_TCP_FMT_NONE when a parser is set.
TCP: runtime
plugins/in_tcp/tcp_conn.c
When parser present, use flb_parser_do() per delimited segment, apply parser out_time (fallback to now), optionally append source address into msgpack, emit parsed record or fallback to raw "log" on parse failure; manage/free buffers.
UDP: public & config
plugins/in_udp/udp.c, plugins/in_udp/udp.h
Add parser config entry; struct flb_in_udp_config gains parser_name and conditional parser pointer.
UDP: init
plugins/in_udp/udp_config.c
Resolve parser_name via flb_parser_get() (fail init if missing or parser support unavailable); warn and force format to FLB_UDP_FMT_NONE when a parser is set.
UDP: runtime
plugins/in_udp/udp_conn.c
When parser present, call flb_parser_do() per segment, apply parser timestamp (fallback), optionally append source address, update consumed bytes by separator length, free buffers; fallback to raw behavior on parse failure.
Tests
tests/runtime/in_tcp.c, tests/runtime/in_udp.c
Load parsers.conf in test setup and add runtime tests for format=none with a valid parser, unknown parser (expect init failure), and parser-fallback cases; tests guarded by FLB_HAVE_PARSER.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Plugin as "TCP/UDP Plugin"
    participant Parser as "Parser Engine"
    participant Output as "Output Handler"

    Client->>Plugin: Send line-delimited payload
    Plugin->>Plugin: Split payload by delimiter
    Plugin->>Plugin: Is parser configured?
    alt parser configured
        Plugin->>Parser: flb_parser_do(segment)
        Parser-->>Plugin: parsed msgpack + out_time
        Plugin->>Plugin: set record timestamp (out_time or now)
        Plugin->>Plugin: append source address if configured
        Plugin->>Output: emit parsed record (msgpack)
    else no parser / parse failed
        Plugin->>Plugin: build record with raw payload under "log" and now timestamp
        Plugin->>Output: emit raw record
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Possibly related PRs

Suggested reviewers

  • edsiper
  • patrick-stephens

Poem

🐰 I hopped through sockets, whiskers twitching bright,
Lines became records beneath the moonlight.
Parsers hummed softly, timestamps in tow,
TCP and UDP learned a new flow. 🎉

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 15.38% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'in_tcp: in_udp: Enhance of parser capabilities' clearly identifies the main change—adding parser support to TCP and UDP input plugins—and aligns with the primary objective.
Linked Issues check ✅ Passed All coding requirements from issue #11688 are met: parser configuration is added to TCP and UDP inputs via config_map entries, parser resolution and validation logic is implemented, and parser-based payload parsing is functional.
Out of Scope Changes check ✅ Passed All changes directly support the parser enhancement objective. Config additions, parser resolution, and payload parsing logic in tcp_conn.c and udp_conn.c are in scope; test additions validate the feature appropriately.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch cosmo0920-enhance-of-parser-capabilities

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@cosmo0920 cosmo0920 added this to the Fluent Bit v5.0.3 milestone Apr 14, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b860035924

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
plugins/in_udp/udp_conn.c (1)

261-363: ⚠️ Potential issue | 🔴 Critical

Free out_buf only when flb_parser_do() returns success.

The cleanup at line 98–100 runs unconditionally after the parser result check, causing flb_free(out_buf) to execute on both success and error paths. Per the API contract, flb_parser_do() does not allocate or already deallocates out_buf on error; calling flb_free() unconditionally risks a double-free. Separate the parser result from ret (which is reused for encoder state), then guard the free with the parser result check.

🛠️ Proposed fix
 static ssize_t parse_payload_none(struct udp_conn *conn)
 {
     int ret;
+    int parser_ret;
     int len;
     int sep_len;
     size_t consumed = 0;
     size_t out_size;
@@
-                ret = flb_parser_do(ctx->parser, buf, len,
-                                    &out_buf, &out_size, &out_time);
-                if (ret >= 0) {
+                parser_ret = flb_parser_do(ctx->parser, buf, len,
+                                           &out_buf, &out_size, &out_time);
+                if (parser_ret >= 0) {
                     ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
@@
-                if (out_buf != NULL) {
+                if (parser_ret >= 0 && out_buf != NULL) {
                     flb_free(out_buf);
                 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/in_udp/udp_conn.c` around lines 261 - 363, The parser result and the
encoder state are conflated by reusing ret for both; capture flb_parser_do()
return into a new variable (e.g., parser_ret) and only call flb_free(out_buf)
when parser_ret indicates success (parser_ret >= 0) and out_buf != NULL; leave
the existing encoder-related ret usage intact for flb_log_event_encoder_* calls,
and still free appended_address_buffer unconditionally if non-NULL. Concretely:
introduce parser_ret = flb_parser_do(...), use parser_ret for the parser-success
branch (building the record and appending address), and at cleanup call
flb_free(out_buf) only when parser_ret >= 0 and out_buf != NULL while keeping
flb_free(appended_address_buffer) as-is.
plugins/in_tcp/tcp_conn.c (1)

206-308: ⚠️ Potential issue | 🔴 Critical

Free out_buf only when flb_parser_do() succeeded.

The cleanup unconditionally frees out_buf after the parser error path, violating the parser API contract. Although the code initializes out_buf = NULL making it safe in practice, the parser implementation may not guarantee the pointer remains unmodified on error. Gate the cleanup to the success path only.

🛠️ Proposed fix
 static ssize_t parse_payload_none(struct tcp_conn *conn)
 {
     int ret;
+    int parser_ret;
     size_t len;
     size_t sep_len;
     size_t consumed = 0;
     size_t out_size;
@@
-                ret = flb_parser_do(ctx->parser, buf, len,
-                                    &out_buf, &out_size, &out_time);
-                if (ret >= 0) {
+                parser_ret = flb_parser_do(ctx->parser, buf, len,
+                                           &out_buf, &out_size, &out_time);
+                if (parser_ret >= 0) {
                     ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
@@
-                if (out_buf != NULL) {
+                if (parser_ret >= 0 && out_buf != NULL) {
                     flb_free(out_buf);
                 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/in_tcp/tcp_conn.c` around lines 206 - 308, The cleanup currently
unconditionally calls flb_free(out_buf) after flb_parser_do(), which can violate
the parser API; change the cleanup so that out_buf is freed only when the parser
succeeded (i.e., only when the flb_parser_do() path that sets ret >= 0 ran).
Concretely, in the function handling incoming data (symbols: flb_parser_do,
out_buf, ret), move or guard the flb_free(out_buf) call so it is executed only
on the successful parser branch; keep the existing free of
appended_address_buffer as-is.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plugins/in_tcp/tcp_conn.c`:
- Around line 297-300: The parser-failure branch currently turns a parser miss
into FLB_EVENT_ENCODER_SUCCESS and allows the loop to advance consumed, which
drops the line; instead, do not mark success or advance consumed when the parser
rejects input: remove the assignment ret = FLB_EVENT_ENCODER_SUCCESS and do not
increment/advance the consumed variable in that branch, return/propagate a
non-consumption result (or break so the caller preserves the buffer) so the
record remains intact; apply the same change to the other symmetric parser-miss
branch (the block referencing ctx->parser_name and the ret/consumed behavior
around FLB_EVENT_ENCODER_SUCCESS at the second location). Ensure references to
ctx->parser_name, ret, consumed and FLB_EVENT_ENCODER_SUCCESS are updated
accordingly.

In `@tests/runtime/in_tcp.c`:
- Around line 677-735: Add a failure-path test alongside
flb_test_format_none_with_parser that verifies startup fails when an unknown
parser is configured: create a new test function (e.g.,
flb_test_format_none_with_unknown_parser) that sets up ctx and output the same
way, call flb_input_set(..., "format", "none", "parser", "nonexistent_parser",
NULL), then assert that flb_start(ctx->flb) returns non-zero (TEST_CHECK(ret !=
0)) and perform appropriate cleanup (test_ctx_destroy(ctx), close sockets if
opened); reference flb_test_format_none_with_parser, flb_input_set, and
flb_start to locate where to mirror setup and assertions.

In `@tests/runtime/in_udp.c`:
- Around line 436-496: Add a failure-path test next to
flb_test_format_none_with_parser that verifies unknown/invalid parser behavior:
create a test context via test_ctx_create (same cb_data), call flb_input_set
with "format","none","parser","no_such_parser" (or another invalid parser name),
then attempt flb_start and assert failure (ret != 0) or—if flb_start
succeeds—send a UDP packet with sendto and assert get_output_num() == 0 (no
outputs); use TEST_CHECK/TEST_MSG around flb_input_set, flb_start, sendto,
get_output_num and ensure flb_socket_close(fd) and test_ctx_destroy(ctx) are
always called to clean up. References: flb_test_format_none_with_parser,
flb_input_set, flb_start, sendto, get_output_num, TEST_CHECK, TEST_MSG.

---

Outside diff comments:
In `@plugins/in_tcp/tcp_conn.c`:
- Around line 206-308: The cleanup currently unconditionally calls
flb_free(out_buf) after flb_parser_do(), which can violate the parser API;
change the cleanup so that out_buf is freed only when the parser succeeded
(i.e., only when the flb_parser_do() path that sets ret >= 0 ran). Concretely,
in the function handling incoming data (symbols: flb_parser_do, out_buf, ret),
move or guard the flb_free(out_buf) call so it is executed only on the
successful parser branch; keep the existing free of appended_address_buffer
as-is.

In `@plugins/in_udp/udp_conn.c`:
- Around line 261-363: The parser result and the encoder state are conflated by
reusing ret for both; capture flb_parser_do() return into a new variable (e.g.,
parser_ret) and only call flb_free(out_buf) when parser_ret indicates success
(parser_ret >= 0) and out_buf != NULL; leave the existing encoder-related ret
usage intact for flb_log_event_encoder_* calls, and still free
appended_address_buffer unconditionally if non-NULL. Concretely: introduce
parser_ret = flb_parser_do(...), use parser_ret for the parser-success branch
(building the record and appending address), and at cleanup call
flb_free(out_buf) only when parser_ret >= 0 and out_buf != NULL while keeping
flb_free(appended_address_buffer) as-is.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: bca50daf-588b-4008-a7f1-e833dd0d5666

📥 Commits

Reviewing files that changed from the base of the PR and between c6111db and b860035.

📒 Files selected for processing (10)
  • plugins/in_tcp/tcp.c
  • plugins/in_tcp/tcp.h
  • plugins/in_tcp/tcp_config.c
  • plugins/in_tcp/tcp_conn.c
  • plugins/in_udp/udp.c
  • plugins/in_udp/udp.h
  • plugins/in_udp/udp_config.c
  • plugins/in_udp/udp_conn.c
  • tests/runtime/in_tcp.c
  • tests/runtime/in_udp.c

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
plugins/in_tcp/tcp_conn.c (1)

302-306: ⚠️ Potential issue | 🟠 Major

Parser misses are still consumed and dropped.

On Line 305, parser failure is converted to success; then Line 342 advances consumed, so that line is discarded without emitting a fallback record.

🛠️ Proposed fix
                 else {
                     flb_plg_debug(ctx->ins, "parser '%s' failed on incoming data",
                                   ctx->parser_name);
-                    ret = FLB_EVENT_ENCODER_SUCCESS;
+                    ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
+
+                    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+                        ret = flb_log_event_encoder_set_current_timestamp(
+                                ctx->log_encoder);
+                    }
+
+                    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+                        ret = flb_log_event_encoder_append_body_values(
+                                ctx->log_encoder,
+                                FLB_LOG_EVENT_CSTRING_VALUE("log"),
+                                FLB_LOG_EVENT_STRING_VALUE(buf, len));
+                    }
+
+                    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
+                        ret = flb_log_event_encoder_commit_record(
+                                ctx->log_encoder);
+                    }
                 }

Also applies to: 338-343

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/in_tcp/tcp_conn.c` around lines 302 - 306, The code currently treats
a parser failure as success (setting ret = FLB_EVENT_ENCODER_SUCCESS) which
allows consumed to advance and drops the input instead of emitting a fallback
record; change the handling in the parser-failure branches (the block
referencing ctx->parser_name and the similar block at the other site) so that on
parser failure you do not set ret to FLB_EVENT_ENCODER_SUCCESS and do not
advance consumed — instead set/return a failure indicator (or leave ret
unchanged) so the fallback/emission path runs; specifically update the branches
that reference ctx->parser_name, ret, FLB_EVENT_ENCODER_SUCCESS and consumed so
parser misses are preserved for fallback emission rather than being dropped.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plugins/in_tcp/tcp_conn.c`:
- Around line 251-253: The parser output buffer out_buf should only be freed
when flb_parser_do succeeds; adjust the cleanup around flb_parser_do (the call
that sets ret, out_buf, out_size, out_time) so that free(out_buf) is invoked
only if ret >= 0 and out_buf != NULL (and not freed elsewhere), and remove the
unconditional free of out_buf on parser failure to avoid double-free or freeing
unowned memory.

---

Duplicate comments:
In `@plugins/in_tcp/tcp_conn.c`:
- Around line 302-306: The code currently treats a parser failure as success
(setting ret = FLB_EVENT_ENCODER_SUCCESS) which allows consumed to advance and
drops the input instead of emitting a fallback record; change the handling in
the parser-failure branches (the block referencing ctx->parser_name and the
similar block at the other site) so that on parser failure you do not set ret to
FLB_EVENT_ENCODER_SUCCESS and do not advance consumed — instead set/return a
failure indicator (or leave ret unchanged) so the fallback/emission path runs;
specifically update the branches that reference ctx->parser_name, ret,
FLB_EVENT_ENCODER_SUCCESS and consumed so parser misses are preserved for
fallback emission rather than being dropped.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e71e2cc2-dc11-4959-94c8-4a98239db66a

📥 Commits

Reviewing files that changed from the base of the PR and between b860035 and e70b524.

📒 Files selected for processing (10)
  • plugins/in_tcp/tcp.c
  • plugins/in_tcp/tcp.h
  • plugins/in_tcp/tcp_config.c
  • plugins/in_tcp/tcp_conn.c
  • plugins/in_udp/udp.c
  • plugins/in_udp/udp.h
  • plugins/in_udp/udp_config.c
  • plugins/in_udp/udp_conn.c
  • tests/runtime/in_tcp.c
  • tests/runtime/in_udp.c
✅ Files skipped from review due to trivial changes (1)
  • plugins/in_udp/udp.c
🚧 Files skipped from review as they are similar to previous changes (5)
  • plugins/in_tcp/tcp.h
  • plugins/in_tcp/tcp_config.c
  • plugins/in_udp/udp_conn.c
  • tests/runtime/in_udp.c
  • tests/runtime/in_tcp.c

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
plugins/in_tcp/tcp_conn.c (1)

215-223: ⚠️ Potential issue | 🔴 Critical

Track parser ownership separately from encoder status.

ret stops meaning “parser return code” as soon as the encoder path starts, so Line 325 ends up freeing out_buf based only on pointer state. On parser failure, that can free memory the parser already released. Keep a dedicated parser_ret and only free out_buf when parser_ret >= 0.

Suggested fix
 `#ifdef` FLB_HAVE_PARSER
     size_t out_size;
     void *out_buf;
+    int parser_ret;
     struct flb_time out_time;
     char *source_address;
     char *appended_address_buffer;
@@
-                ret = flb_parser_do(ctx->parser, buf, len,
-                                    &out_buf, &out_size, &out_time);
-                if (ret >= 0) {
+                parser_ret = flb_parser_do(ctx->parser, buf, len,
+                                           &out_buf, &out_size, &out_time);
+                if (parser_ret >= 0) {
                     ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
@@
-                if (out_buf != NULL) {
+                if (parser_ret >= 0 && out_buf != NULL) {
                     flb_free(out_buf);
                 }
Based on learnings "In Fluent Bit, when `flb_parser_do` returns an error (ret < 0), the `out_buf` parameter is either not allocated or is already deallocated by the function before returning. Callers should only call `flb_free(out_buf)` when `flb_parser_do` returns success (ret >= 0)."

Also applies to: 251-327

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/in_tcp/tcp_conn.c` around lines 215 - 223, The parser return value
`ret` is being reused for encoder logic which causes `out_buf` to be freed even
when `flb_parser_do` failed and already cleaned up memory; introduce a dedicated
`parser_ret` (or similar) to capture the result of flb_parser_do in the tcp_conn
handler (where out_buf, out_size, out_time, source_address are used) and only
call flb_free(out_buf) when parser_ret >= 0; update all checks and frees that
currently use `ret` (including the branches around flb_parser_do and subsequent
encoder/append logic) to reference parser_ret for parser-owned buffers and keep
encoder status in `ret` separate.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plugins/in_tcp/tcp_conn.c`:
- Around line 271-283: The code nulls out appended_address_buffer and resets
appended_address_size when flb_msgpack_append_message_to_record(...) returns an
error, which can leak the newly allocated buffer; change the error path in the
error-check after flb_msgpack_append_message_to_record so that any allocated
appended_address_buffer is freed before setting it to NULL (or leave it intact
so the common cleanup frees it), i.e., call the appropriate free function on
appended_address_buffer when append_result != FLB_MAP_EXPAND_SUCCESS (same fix
for the other occurrence around the later append call), and ensure
appended_address_size is reset consistently after freeing.

In `@tests/runtime/in_tcp.c`:
- Around line 677-774: Add a new test function (e.g.,
flb_test_format_none_parser_rejects) that mirrors
flb_test_format_none_with_parser but sets the input parser to "json" and sends
an invalid payload "not-json\n"; configure the test_ctx to use a callback that
asserts the emitted record contains a raw "log" field with value "not-json" (or
add/ reuse a callback like cb_check_result_log that checks for
"\"log\":\"not-json\""), start the Fluent Bit instance, send the invalid payload
over TCP, wait, verify get_output_num() > 0 and that the callback observed the
raw log record, then close the socket and destroy the context. Ensure you
reference the same symbols used in the diff: test_ctx_create,
flb_input_set(ctx->flb, ctx->i_ffd, "format","none","parser","json",...),
flb_output_set, flb_start, send, get_output_num, flb_socket_close, and
test_ctx_destroy.

In `@tests/runtime/in_udp.c`:
- Around line 436-535: Add a new UDP runtime test (sibling to
flb_test_format_none_with_parser and flb_test_format_none_with_unknown_parser)
that verifies the parser-miss fallback: create a test_ctx with cb_data.cb =
cb_check_result_json and cb_data.data set to the expected preserved field (e.g.
"\"log\":\"plain text\""), call flb_output_set(... "format","json"),
flb_input_set(... "format","none","parser","json"), start flb_start (expect
success), send a non-JSON datagram (e.g. "plain text\n") via init_udp/sendto,
sleep briefly with flb_time_msleep, assert get_output_num() > 0 and that the
callback receives the record with the non-JSON payload under "log", then clean
up sockets and test_ctx; ensure the new test name clearly indicates UDP
parser-fallback (e.g. flb_test_format_none_parser_fallback_udp).

---

Duplicate comments:
In `@plugins/in_tcp/tcp_conn.c`:
- Around line 215-223: The parser return value `ret` is being reused for encoder
logic which causes `out_buf` to be freed even when `flb_parser_do` failed and
already cleaned up memory; introduce a dedicated `parser_ret` (or similar) to
capture the result of flb_parser_do in the tcp_conn handler (where out_buf,
out_size, out_time, source_address are used) and only call flb_free(out_buf)
when parser_ret >= 0; update all checks and frees that currently use `ret`
(including the branches around flb_parser_do and subsequent encoder/append
logic) to reference parser_ret for parser-owned buffers and keep encoder status
in `ret` separate.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 49591f24-a3f2-4997-bba7-38203b8fdada

📥 Commits

Reviewing files that changed from the base of the PR and between e70b524 and 55fbc94.

📒 Files selected for processing (4)
  • plugins/in_tcp/tcp_conn.c
  • plugins/in_udp/udp_conn.c
  • tests/runtime/in_tcp.c
  • tests/runtime/in_udp.c
🚧 Files skipped from review as they are similar to previous changes (1)
  • plugins/in_udp/udp_conn.c

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
plugins/in_tcp/tcp_conn.c (1)

281-283: ⚠️ Potential issue | 🟠 Major

Don't clear appended_address_buffer before the shared cleanup.

This branch drops the only pointer the flb_free(appended_address_buffer) block can release. If the append helper allocated before returning an error, the buffer leaks; free it here or leave it intact for the common cleanup path.

🛠️ Minimal fix
                         if (append_result != FLB_MAP_EXPAND_SUCCESS) {
-                            appended_address_buffer = NULL;
+                            if (appended_address_buffer != NULL) {
+                                flb_free(appended_address_buffer);
+                                appended_address_buffer = NULL;
+                            }
                             appended_address_size = 0;
                         }

Also applies to: 329-330

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/in_tcp/tcp_conn.c` around lines 281 - 283, The branch that handles
append_result != FLB_MAP_EXPAND_SUCCESS clears appended_address_buffer and
appended_address_size, which discards the only pointer later freed by the common
cleanup and causes a leak; instead, ensure the buffer is freed when
append_result indicates failure or leave appended_address_buffer intact so the
shared flb_free cleanup can release it. Locate the failure branches around the
append_result check (symbols: append_result, FLB_MAP_EXPAND_SUCCESS,
appended_address_buffer, appended_address_size) and either call
flb_free(appended_address_buffer) before setting
appended_address_buffer=NULL/appended_address_size=0, or remove the clearing so
the later common cleanup frees it (apply the same fix to the similar branch at
the other occurrence around lines handling append_result near the second
mention).
🧹 Nitpick comments (1)
plugins/in_tcp/tcp_conn.c (1)

303-323: Factor the raw-record fallback into one helper.

The parser-miss branch and the no-parser branch now build the same "log" record with the same timestamp/commit sequence. Pulling that into a small helper will keep both paths aligned.

♻️ Refactor sketch
-                    ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
-
-                    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
-                        ret = flb_log_event_encoder_set_current_timestamp(
-                                ctx->log_encoder);
-                    }
-
-                    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
-                        ret = flb_log_event_encoder_append_body_values(
-                                ctx->log_encoder,
-                                FLB_LOG_EVENT_CSTRING_VALUE("log"),
-                                FLB_LOG_EVENT_STRING_VALUE(buf, len));
-                    }
-
-                    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
-                        ret = flb_log_event_encoder_commit_record(
-                                ctx->log_encoder);
-                    }
+                    ret = append_raw_text_record(ctx, buf, len);
...
-                ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
-
-                if (ret == FLB_EVENT_ENCODER_SUCCESS) {
-                    ret = flb_log_event_encoder_set_current_timestamp(
-                            ctx->log_encoder);
-                }
-
-                if (ret == FLB_EVENT_ENCODER_SUCCESS) {
-                    ret = flb_log_event_encoder_append_body_values(
-                            ctx->log_encoder,
-                            FLB_LOG_EVENT_CSTRING_VALUE("log"),
-                            FLB_LOG_EVENT_STRING_VALUE(buf, len));
-                }
-
-                if (ret == FLB_EVENT_ENCODER_SUCCESS) {
-                    ret = flb_log_event_encoder_commit_record(
-                            ctx->log_encoder);
-                }
+                ret = append_raw_text_record(ctx, buf, len);
static inline int append_raw_text_record(struct flb_in_tcp_config *ctx,
                                         const char *buf, size_t len)
{
    int ret;

    ret = flb_log_event_encoder_begin_record(ctx->log_encoder);

    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
        ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder);
    }

    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
        ret = flb_log_event_encoder_append_body_values(
                ctx->log_encoder,
                FLB_LOG_EVENT_CSTRING_VALUE("log"),
                FLB_LOG_EVENT_STRING_VALUE(buf, len));
    }

    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
        ret = flb_log_event_encoder_commit_record(ctx->log_encoder);
    }

    return ret;
}

Also applies to: 336-352

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/in_tcp/tcp_conn.c` around lines 303 - 323, Extract the repeated "raw
log" encoding sequence into a small helper, e.g., static inline int
append_raw_text_record(struct flb_in_tcp_config *ctx, const char *buf, size_t
len), that performs the flb_log_event_encoder_begin_record /
set_current_timestamp / append_body_values(FLB_LOG_EVENT_CSTRING_VALUE("log"),
FLB_LOG_EVENT_STRING_VALUE(buf,len)) / commit_record sequence and returns the
encoder status; then replace the duplicated blocks in the parser-miss and
no-parser branches (the sites using ctx->log_encoder and ctx->parser_name) with
calls to append_raw_text_record(ctx, buf, len) so both code paths share the same
logic and return handling.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plugins/in_udp/udp_conn.c`:
- Around line 320-339: The parser-fallback path is dropping the configured
source-address field so malformed lines lose client attribution; ensure that
whenever ctx->source_address_key is set you fetch the remote address via
flb_connection_get_remote_address(conn->connection) and call
append_message_to_record_data(ctx->source_address_key, source_address,
strlen(source_address), MSGPACK_OBJECT_STR) against the record buffer even when
parsing falls back (i.e., when ret != FLB_EVENT_ENCODER_SUCCESS or the parser
produced the simple "log" body); reuse the same append logic used for parsed
records (append_message_to_record_data,
appended_address_buffer/appended_address_size) and handle append failures the
same way (reset buffer/size) so fallback records keep the source_address_key.

---

Duplicate comments:
In `@plugins/in_tcp/tcp_conn.c`:
- Around line 281-283: The branch that handles append_result !=
FLB_MAP_EXPAND_SUCCESS clears appended_address_buffer and appended_address_size,
which discards the only pointer later freed by the common cleanup and causes a
leak; instead, ensure the buffer is freed when append_result indicates failure
or leave appended_address_buffer intact so the shared flb_free cleanup can
release it. Locate the failure branches around the append_result check (symbols:
append_result, FLB_MAP_EXPAND_SUCCESS, appended_address_buffer,
appended_address_size) and either call flb_free(appended_address_buffer) before
setting appended_address_buffer=NULL/appended_address_size=0, or remove the
clearing so the later common cleanup frees it (apply the same fix to the similar
branch at the other occurrence around lines handling append_result near the
second mention).

---

Nitpick comments:
In `@plugins/in_tcp/tcp_conn.c`:
- Around line 303-323: Extract the repeated "raw log" encoding sequence into a
small helper, e.g., static inline int append_raw_text_record(struct
flb_in_tcp_config *ctx, const char *buf, size_t len), that performs the
flb_log_event_encoder_begin_record / set_current_timestamp /
append_body_values(FLB_LOG_EVENT_CSTRING_VALUE("log"),
FLB_LOG_EVENT_STRING_VALUE(buf,len)) / commit_record sequence and returns the
encoder status; then replace the duplicated blocks in the parser-miss and
no-parser branches (the sites using ctx->log_encoder and ctx->parser_name) with
calls to append_raw_text_record(ctx, buf, len) so both code paths share the same
logic and return handling.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: fcea003e-f250-488c-85d7-139584895980

📥 Commits

Reviewing files that changed from the base of the PR and between 55fbc94 and 14296f6.

📒 Files selected for processing (2)
  • plugins/in_tcp/tcp_conn.c
  • plugins/in_udp/udp_conn.c

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
tests/runtime/in_tcp.c (1)

677-774: ⚠️ Potential issue | 🟠 Major

Add a runtime test for the parser-rejection fallback.

These additions cover parser success and unknown-parser startup failure, but not the new “configured parser rejects this line and we still emit a raw log record” branch in plugins/in_tcp/tcp_conn.c. Please add one case with parser=json and an invalid payload like not-json\n, then assert the output contains "log":"not-json".

Proposed test shape
+#ifdef FLB_HAVE_PARSER
+void flb_test_format_none_with_parser_rejection()
+{
+    struct flb_lib_out_cb cb_data;
+    struct test_ctx *ctx;
+    flb_sockfd_t fd;
+    int ret;
+    int num;
+    ssize_t w_size;
+    char *buf = "not-json\n";
+    size_t size = strlen(buf);
+
+    clear_output_num();
+
+    cb_data.cb = cb_check_result_json;
+    cb_data.data = "\"log\":\"not-json\"";
+
+    ctx = test_ctx_create(&cb_data);
+    if (!TEST_CHECK(ctx != NULL)) {
+        TEST_MSG("test_ctx_create failed");
+        exit(EXIT_FAILURE);
+    }
+
+    ret = flb_output_set(ctx->flb, ctx->o_ffd,
+                         "match", "*",
+                         "format", "json",
+                         NULL);
+    TEST_CHECK(ret == 0);
+
+    ret = flb_input_set(ctx->flb, ctx->i_ffd,
+                        "format", "none",
+                        "parser", "json",
+                        NULL);
+    TEST_CHECK(ret == 0);
+
+    ret = flb_start(ctx->flb);
+    TEST_CHECK(ret == 0);
+
+    fd = connect_tcp(NULL, -1);
+    if (!TEST_CHECK(fd >= 0)) {
+        exit(EXIT_FAILURE);
+    }
+
+    w_size = send(fd, buf, size, 0);
+    if (!TEST_CHECK(w_size == size)) {
+        TEST_MSG("failed to send, errno=%d", errno);
+        flb_socket_close(fd);
+        exit(EXIT_FAILURE);
+    }
+
+    flb_time_msleep(1500);
+
+    num = get_output_num();
+    if (!TEST_CHECK(num > 0)) {
+        TEST_MSG("no outputs");
+    }
+
+    flb_socket_close(fd);
+    test_ctx_destroy(ctx);
+}
+#endif
@@
 `#ifdef` FLB_HAVE_PARSER
     {"format_none_with_parser", flb_test_format_none_with_parser},
+    {"format_none_with_parser_rejection",
+     flb_test_format_none_with_parser_rejection},
     {"format_none_with_unknown_parser", flb_test_format_none_with_unknown_parser},
 `#endif`

As per coding guidelines "Validate both success and failure paths in tests: invalid payloads, boundary sizes, null/missing fields".

Also applies to: 945-947

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/runtime/in_tcp.c` around lines 677 - 774, Add a new runtime test
(similar to flb_test_format_none_with_parser and
flb_test_format_none_with_unknown_parser) that uses test_ctx_create and
cb_check_result_json, sets output via flb_output_set("format","json"), sets
input via flb_input_set(...,"format","none","parser","json"), starts the engine
with flb_start, connects with connect_tcp, sends an invalid JSON payload like
"not-json\n", waits, and then asserts via the callback / output checking that
the emitted record contains the raw field "\"log\":\"not-json\"" (use
clear_output_num/get_output_num or the existing cb to validate). Ensure the test
closes the socket and calls test_ctx_destroy; name it to reflect
parser-rejection behavior so it’s discoverable alongside
flb_test_format_none_with_parser and flb_test_format_none_with_unknown_parser.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plugins/in_tcp/tcp_conn.c`:
- Around line 306-326: When emitting the fallback record in the parser-miss
branch (inside the else that logs parser '%s' failed and uses ctx->log_encoder),
preserve the configured source_address_key by appending it to the record if set:
check ctx->source_address_key (non-NULL) and call
flb_log_event_encoder_append_body_values before commit to add
FLB_LOG_EVENT_CSTRING_VALUE(ctx->source_address_key) and the corresponding
address value from the ctx (e.g. ctx->source_address or whichever field holds
the peer address) using FLB_LOG_EVENT_STRING_VALUE; keep using the existing
flb_log_event_encoder_* calls (flb_log_event_encoder_begin_record,
set_current_timestamp, append_body_values, commit_record) and ensure the
source_address pair is added in the same encoder success checks before
flb_log_event_encoder_commit_record.

---

Duplicate comments:
In `@tests/runtime/in_tcp.c`:
- Around line 677-774: Add a new runtime test (similar to
flb_test_format_none_with_parser and flb_test_format_none_with_unknown_parser)
that uses test_ctx_create and cb_check_result_json, sets output via
flb_output_set("format","json"), sets input via
flb_input_set(...,"format","none","parser","json"), starts the engine with
flb_start, connects with connect_tcp, sends an invalid JSON payload like
"not-json\n", waits, and then asserts via the callback / output checking that
the emitted record contains the raw field "\"log\":\"not-json\"" (use
clear_output_num/get_output_num or the existing cb to validate). Ensure the test
closes the socket and calls test_ctx_destroy; name it to reflect
parser-rejection behavior so it’s discoverable alongside
flb_test_format_none_with_parser and flb_test_format_none_with_unknown_parser.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a9bfcb70-74d5-4f7b-af13-836b2929d6fe

📥 Commits

Reviewing files that changed from the base of the PR and between 14296f6 and 23564ca.

📒 Files selected for processing (10)
  • plugins/in_tcp/tcp.c
  • plugins/in_tcp/tcp.h
  • plugins/in_tcp/tcp_config.c
  • plugins/in_tcp/tcp_conn.c
  • plugins/in_udp/udp.c
  • plugins/in_udp/udp.h
  • plugins/in_udp/udp_config.c
  • plugins/in_udp/udp_conn.c
  • tests/runtime/in_tcp.c
  • tests/runtime/in_udp.c
🚧 Files skipped from review as they are similar to previous changes (7)
  • plugins/in_udp/udp.c
  • plugins/in_tcp/tcp.c
  • plugins/in_udp/udp.h
  • plugins/in_tcp/tcp_config.c
  • plugins/in_tcp/tcp.h
  • tests/runtime/in_udp.c
  • plugins/in_udp/udp_conn.c

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TCP, UDP and probably more inputs should allow parsers

1 participant