Skip to content

input: input_chunk: Handle flow with events metrics per window#11698

Open
cosmo0920 wants to merge 5 commits intomasterfrom
cosmo0920-handle-flow-with-rate-gate
Open

input: input_chunk: Handle flow with events metrics per window#11698
cosmo0920 wants to merge 5 commits intomasterfrom
cosmo0920-handle-flow-with-rate-gate

Conversation

@cosmo0920
Copy link
Copy Markdown
Contributor

@cosmo0920 cosmo0920 commented Apr 10, 2026

This branch adds hysteresis-based resume control for input rate gating (rate_gate.resume_ratio), centralizes backpressure-adjusted effective limit computation, and applies those limits consistently to pause/resume decisions. It also fixes filesystem-mode enforcement gaps and adds periodic scheduler-driven resume checks so paused inputs can recover without requiring new ingestion events.

Testing

Includes expanded internal tests for parsing/window/hysteresis/pause-resume safety and updated config_rate_gate integration scenarios for fanout+retry and rollout paths across memrb/filesystem cases.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

Using with integrated tests' configurations, there's no memory leaks:

fluent_bit_results_20260413_194530/valgrind.log
==40796== Memcheck, a memory error detector
==40796== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==40796== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==40796== Command: /home/cosmo/GitHub/fluent-bit/build/bin/fluent-bit -c /home/cosmo/GitHub/fluent-bit/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml -l /home/cosmo/GitHub/fluent-bit/tests/integration/results/fluent_bit_results_20260413_194530/fluent_bit.log
==40796== Parent PID: 40708
==40796== 
==40796== 
==40796== HEAP SUMMARY:
==40796==     in use at exit: 0 bytes in 0 blocks
==40796==   total heap usage: 10,690 allocs, 10,690 frees, 2,647,824 bytes allocated
==40796== 
==40796== All heap blocks were freed -- no leaks are possible
==40796== 
==40796== For lists of detected and suppressed errors, rerun with: -s
==40796== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
fluent_bit_results_20260413_194526/valgrind.log
==40750== Memcheck, a memory error detector
==40750== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==40750== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==40750== Command: /home/cosmo/GitHub/fluent-bit/build/bin/fluent-bit -c /home/cosmo/GitHub/fluent-bit/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml -l /home/cosmo/GitHub/fluent-bit/tests/integration/results/fluent_bit_results_20260413_194526/fluent_bit.log
==40750== Parent PID: 40708
==40750== 
==40750== 
==40750== HEAP SUMMARY:
==40750==     in use at exit: 0 bytes in 0 blocks
==40750==   total heap usage: 12,294 allocs, 12,294 frees, 3,004,701 bytes allocated
==40750== 
==40750== All heap blocks were freed -- no leaks are possible
==40750== 
==40750== For lists of detected and suppressed errors, rerun with: -s
==40750== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
fluent_bit_results_20260413_194523/valgrind.log
==40741== Memcheck, a memory error detector
==40741== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==40741== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==40741== Command: /home/cosmo/GitHub/fluent-bit/build/bin/fluent-bit -c /home/cosmo/GitHub/fluent-bit/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml -l /home/cosmo/GitHub/fluent-bit/tests/integration/results/fluent_bit_results_20260413_194523/fluent_bit.log
==40741== Parent PID: 40708
==40741== 
==40741== 
==40741== HEAP SUMMARY:
==40741==     in use at exit: 0 bytes in 0 blocks
==40741==   total heap usage: 9,995 allocs, 9,995 frees, 3,601,066 bytes allocated
==40741== 
==40741== All heap blocks were freed -- no leaks are possible
==40741== 
==40741== For lists of detected and suppressed errors, rerun with: -s
==40741== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
fluent_bit_results_20260413_194519/valgrind.log
==40731== Memcheck, a memory error detector
==40731== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==40731== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==40731== Command: /home/cosmo/GitHub/fluent-bit/build/bin/fluent-bit -c /home/cosmo/GitHub/fluent-bit/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml -l /home/cosmo/GitHub/fluent-bit/tests/integration/results/fluent_bit_results_20260413_194519/fluent_bit.log
==40731== Parent PID: 40708
==40731== 
==40731== 
==40731== HEAP SUMMARY:
==40731==     in use at exit: 0 bytes in 0 blocks
==40731==   total heap usage: 9,639 allocs, 9,639 frees, 2,753,979 bytes allocated
==40731== 
==40731== All heap blocks were freed -- no leaks are possible
==40731== 
==40731== For lists of detected and suppressed errors, rerun with: -s
==40731== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
fluent_bit_results_20260413_194516/valgrind.log
==40720== Memcheck, a memory error detector
==40720== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==40720== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==40720== Command: /home/cosmo/GitHub/fluent-bit/build/bin/fluent-bit -c /home/cosmo/GitHub/fluent-bit/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml -l /home/cosmo/GitHub/fluent-bit/tests/integration/results/fluent_bit_results_20260413_194516/fluent_bit.log
==40720== Parent PID: 40708
==40720== 
==40720== 
==40720== HEAP SUMMARY:
==40720==     in use at exit: 0 bytes in 0 blocks
==40720==   total heap usage: 11,180 allocs, 11,180 frees, 3,133,653 bytes allocated
==40720== 
==40720== All heap blocks were freed -- no leaks are possible
==40720== 
==40720== For lists of detected and suppressed errors, rerun with: -s
==40720== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
fluent_bit_results_20260413_194513/valgrind.log
==40710== Memcheck, a memory error detector
==40710== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==40710== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==40710== Command: /home/cosmo/GitHub/fluent-bit/build/bin/fluent-bit -c /home/cosmo/GitHub/fluent-bit/tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml -l /home/cosmo/GitHub/fluent-bit/tests/integration/results/fluent_bit_results_20260413_194513/fluent_bit.log
==40710== Parent PID: 40708
==40710== 
==40710== 
==40710== HEAP SUMMARY:
==40710==     in use at exit: 0 bytes in 0 blocks
==40710==   total heap usage: 6,779 allocs, 6,779 frees, 1,777,537 bytes allocated
==40710== 
==40710== All heap blocks were freed -- no leaks are possible
==40710== 
==40710== For lists of detected and suppressed errors, rerun with: -s
==40710== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features
    • Added input rate-gating with configurable time window, byte/record limits, backpressure and resume hysteresis; exposes runtime rate and gate-state metrics.
  • Improvements
    • Pause/resume now coordinates buffer and rate-gate state to provide more reliable ingestion backpressure and recovery.
  • Tests
    • Added integration and unit tests covering burst recovery, fanout with retries, filesystem/memory backends, steady overrate, and hysteresis behavior.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 10, 2026

📝 Walkthrough

Walkthrough

Adds a rate-gate feature for inputs: per-window ingestion accounting, rate-based pausing/resuming with hysteresis and optional backpressure, new CMetrics gauges, public rate-control APIs, configuration properties, and accompanying integration and unit tests.

Changes

Cohort / File(s) Summary
Header Declarations
include/fluent-bit/flb_input.h
Added FLB_NSEC_IN_SEC, new rate metric pointers and rate-gate/accounting fields in struct flb_input_instance, inline flb_input_paused() and declarations for three rate-control APIs.
Core Rate-Gate Implementation
src/flb_input.c
Parsed rate_window/rate_gate.* properties, initialized rate-gate state and CMetrics gauges, implemented flb_input_rate_update, flb_input_rate_gate_protect, flb_input_rate_gate_maybe_resume, and tightened flb_input_resume() to require ins->context.
Chunk Processing Integration
src/flb_input_chunk.c
Hooked rate updates into chunk map/append flows, replaced buffer-only pause checks with flb_input_paused(), short-circuited protect flow when rate gate pauses, and coordinated resume logic with rate_gate_status.
Integration Test Scenarios
tests/integration/scenarios/config_rate_gate/config/config_rate_gate_*.yaml
Added six scenario configs exercising burst recovery, fanout+retries, filesystem storage, memory ringbuffer, pipeline baseline, and steady overrate with rate-gate settings.
Integration Test Suite
tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
New test module with Service harness, HTTP receiver support, log/request polling helpers, and tests validating pause/resume, retry interactions, and rollout scenarios.
Unit Test Infrastructure
tests/internal/CMakeLists.txt, tests/internal/input_rate_gate.c
Registered new unit test executable and added unit tests covering backpressure behavior, window rollover, hysteresis resume, property parsing, and pause/resume edge cases.

Sequence Diagram(s)

sequenceDiagram
    participant Chunk as Input Chunk
    participant Input as Input Instance
    participant RateGate as Rate-Gate Control
    participant Metrics as CMetrics
    participant Pauser as Pause/Resume Handler

    Chunk->>Input: flb_input_rate_update(timestamp, records, bytes)
    Input->>RateGate: accumulate window counts
    alt window elapsed
        RateGate->>RateGate: compute per-second rates
        RateGate->>Metrics: publish rate gauges
    end

    Chunk->>Input: flb_input_chunk_protect()
    Input->>RateGate: flb_input_rate_gate_protect()
    RateGate->>RateGate: sample backpressure (busy chunks, retries)
    RateGate->>RateGate: evaluate limits
    alt limits exceeded && backpressure enabled
        RateGate->>Input: set rate_gate_status = PAUSED
        RateGate->>Metrics: update gate gauges
        RateGate-->>Input: return FLB_TRUE (pause)
    else within limits
        RateGate-->>Input: return FLB_FALSE
    end

    Chunk->>Input: flb_input_chunk_set_limits()
    Input->>RateGate: flb_input_rate_gate_maybe_resume()
    RateGate->>RateGate: apply resume_ratio hysteresis
    alt rates below resume threshold
        RateGate->>Input: set rate_gate_status = RUNNING
        RateGate->>Pauser: signal resume (flb_input_resume)
        RateGate->>Metrics: update gate gauges
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • koleini
  • fujimotos

Poem

🐰 I counted bytes beneath the moonlit log,
I gated streams where bursts would once clog,
Windows roll on, metrics hum their tune,
Hysteresis sings — resume comes soon,
A little hop: ingestion back in motion.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 16.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately describes the main change: adding rate-gating metrics collection per window and hysteresis-based resume control for input rate management.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch cosmo0920-handle-flow-with-rate-gate

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@cosmo0920 cosmo0920 force-pushed the cosmo0920-handle-flow-with-rate-gate branch from 1ba07dc to 28ad2e6 Compare April 10, 2026 10:51
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 3a16ddfdb8

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/flb_input_chunk.c (2)

2451-2496: ⚠️ Potential issue | 🟠 Major

Re-evaluate buffer pause flags after the rate gate resumes.

These branches clear mem_buf_status / storage_buf_status only while rate_gate_status is already running, but Line 2496 is what can flip the rate gate back to running on this same call. If the input is below the memory/storage limits and the hysteresis threshold is crossed in the same pass, the function exits with the rate gate reopened and the buffer pause flags still stuck at FLB_INPUT_PAUSED, which can leave the input paused until some later size-change happens to call this function again.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/flb_input_chunk.c` around lines 2451 - 2496, The code can reopen the rate
gate (via flb_input_rate_gate_maybe_resume) and leave
mem_buf_status/storage_buf_status at FLB_INPUT_PAUSED, so after potentially
changing rate_gate_status you must re-check and clear the buffer pause flags
when the corresponding limits are now below threshold: call
flb_input_chunk_is_mem_overlimit() and flb_input_chunk_is_storage_overlimit()
again (or factor the existing resume logic into a helper) after
flb_input_rate_gate_maybe_resume() returns, and if each returns FLB_FALSE and
in->config->is_running/is_ingestion_active are true and in->rate_gate_status ==
FLB_INPUT_RUNNING then set in->mem_buf_status/in->storage_buf_status to
FLB_INPUT_RUNNING and invoke flb_input_resume(in) (and logging) just like the
earlier branches; ensure flb_input_buf_paused() check still runs to cover
internal paused state.

1103-1116: ⚠️ Potential issue | 🟠 Major

Keep rate accounting outside FLB_HAVE_METRICS.

flb_input_rate_gate_protect() and flb_input_rate_gate_maybe_resume() depend on rate_window_*, rate_bytes, and rate_records, but those fields are only updated here when metrics are compiled in. In a no-metrics build, the new gate will evaluate against zeros and silently stop enforcing input rate limits.

Suggested fix
 `#ifdef` FLB_HAVE_METRICS
     if (ic->total_records > 0) {
         ts = cfl_time_now();
         cmt_counter_add(in->cmt_records, ts, ic->total_records,
                         1, (char *[]) {(char *) flb_input_name(in)});
         cmt_counter_add(in->cmt_bytes, ts, buf_size,
                         1, (char *[]) {(char *) flb_input_name(in)});
-        flb_input_rate_update(in, ts, ic->total_records, buf_size);

         flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->total_records, in->metrics);
         flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
     }
 `#endif`
+    if (ic->total_records > 0) {
+        flb_input_rate_update(in, ts, ic->total_records, buf_size);
+    }

Also applies to: 2815-2828

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/flb_input_chunk.c` around lines 1103 - 1116, The rate accounting
(calls/fields used by flb_input_rate_gate_protect and
flb_input_rate_gate_maybe_resume) must run regardless of FLB_HAVE_METRICS: move
the flb_input_rate_update call and any updates to rate_window_*, rate_bytes,
rate_records (the data used by the gate) outside the `#ifdef` FLB_HAVE_METRICS
block so they execute even in no-metrics builds; keep only the cmt_counter_add()
calls and other metric-specific code inside the FLB_HAVE_METRICS guard. Apply
the same change for the second occurrence around the flb_input_rate_update usage
(the block referenced at 2815-2828) so gate logic always sees correct non-zero
rates. Ensure ts (cfl_time_now()) is available to both metric and non-metric
updates.
🧹 Nitpick comments (2)
tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml (1)

6-6: Use an isolated storage path per test run.

Line 6 hardcodes a shared /tmp directory, which can cause cross-run contamination for filesystem backlog scenarios. Prefer a per-run/per-scenario temp path supplied by the test harness.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml`
at line 6, Replace the hardcoded shared path in the config (the storage.path
entry in config_rate_gate_filesystem.yaml) with a per-run temp path provided by
the test harness (e.g., use the harness environment variable or template
placeholder the tests inject such as TEST_TMP_DIR or a {{test_temp_dir}}
variable). Update the config template to reference that variable instead of
"/tmp/fluent-bit-rate-gate-storage" and ensure the test harness sets/creates the
directory before the scenario runs and cleans it up afterwards so each test run
uses an isolated filesystem backlog path.
tests/internal/input_rate_gate.c (1)

197-230: Add a multi-second-window regression here.

This only validates the exact 1-second rollover path. It never exercises the new pre-rollover comparisons in flb_input_rate_gate_is_limited() / flb_input_rate_gate_can_resume() with rate_window > 1s, so a broken per-second normalization can slip through unnoticed.

As per coding guidelines, tests/**: "Add or update tests for behavior changes" and "Add regression tests for: mixed signals, processor drop/modify paths, multi-route fan-out, backlog + live ingestion parity".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/internal/input_rate_gate.c` around lines 197 - 230, The test only
covers 1-second rollover and misses exercising pre-rollover logic for windows
>1s; add a new regression in or adjacent to test_rate_update_window_rollover
that sets ctx.input->rate_window_size to a multi-second value (e.g., 2s or 3s),
manipulates timestamps via cfl_time_now()/ts and calls flb_input_rate_update
with events spread across the multi-second window and across rollovers, and then
assert expected normalized values for ctx.input->rate_records, rate_bytes,
rate_window_records, and rate_window_bytes so that
flb_input_rate_gate_is_limited() and flb_input_rate_gate_can_resume() paths that
rely on multi-second normalization are exercised and validated.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/flb_input_chunk.c`:
- Around line 2519-2521: The append-side pause logic currently only consults
flb_input_buf_paused() and thus ignores rate_gate_status set by
flb_input_rate_gate_protect(); update the paused predicate so the rate-gate
pause is honored by either (a) extend flb_input_buf_paused() to also check the
input's rate_gate_status (or call flb_input_rate_gate_protect() internally), or
(b) introduce a new combined helper (e.g., flb_input_paused()) that returns true
if flb_input_buf_paused() OR rate_gate_status is paused and replace all
append-side checks that call flb_input_buf_paused() with this combined helper;
ensure you update the locations that currently call flb_input_buf_paused() so
draining/accepting paths respect the rate-gate pause.

In `@src/flb_input.c`:
- Around line 2430-2476: The code sets ins->rate_gate_status = FLB_INPUT_RUNNING
early (before checking mem_buf_status/storage_buf/config), which can mark an
input as resumed while other pause reasons still block actual resume and prevent
future retries; remove that early assignment and only set ins->rate_gate_status
= FLB_INPUT_RUNNING after the full resume condition (the big if that checks
mem_buf_status, storage_buf_status, config->is_running and
config->is_ingestion_active) succeeds and flb_input_resume() is invoked (or
confirmed possible). Also only clear/report the cmt_rate_gate_limited gauge (and
related gauges) when the input truly transitions out of the paused state (i.e.,
after the status is changed to RUNNING and resume is performed); keep the gauges
indicating limited state while any pause condition remains. Adjust
flb_input_rate_gate_maybe_resume to update status and metrics atomically after
successful resume checks.
- Around line 2276-2350: The ongoing-window counters (ins->rate_window_bytes and
ins->rate_window_records) must be normalized to per-second before comparing to
per-second limits; update both places where comparisons happen (the blocking
check and flb_input_rate_gate_can_resume) to divide rate_window_bytes and
rate_window_records by (double) ins->rate_window_size (guarding against zero or
<=0 by treating it as 1) and compare those normalized values against
effective_max_bytes/effective_max_records returned by
flb_input_rate_gate_effective_limit; change only the comparisons so
ins->rate_window_bytes and ins->rate_window_records are cast to double and
divided by the window size when evaluating breaches and resumes.

In
`@tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py`:
- Around line 81-85: The test configures the mock 500 response after calling
Service.start(), which races with Fluent Bit startup and can miss the
retry/backpressure path; move the configure_http_response call to before
invoking service.start() so the receiver is set up (i.e., call
configure_http_response(status_code=500, body={"status":"retry"}) prior to
Service("config_rate_gate_fanout_retry.yaml", with_receiver=True).start()), then
call service.wait_for_requests(...) as before.
- Around line 69-77: The test
test_rate_gate_constructs_input_and_output_pipelines currently calls
service.stop() only at the end so a failing wait will leave the Service running;
wrap the service.start() and subsequent wait_for_log_contains calls in a
try/finally and call service.stop() from the finally block to guarantee cleanup;
apply the same change to the other test function
test_rate_gate_rollout_scenarios so both Service uses are always stopped even if
an assertion or wait fails (refer to Service, start, wait_for_log_contains,
stop).

---

Outside diff comments:
In `@src/flb_input_chunk.c`:
- Around line 2451-2496: The code can reopen the rate gate (via
flb_input_rate_gate_maybe_resume) and leave mem_buf_status/storage_buf_status at
FLB_INPUT_PAUSED, so after potentially changing rate_gate_status you must
re-check and clear the buffer pause flags when the corresponding limits are now
below threshold: call flb_input_chunk_is_mem_overlimit() and
flb_input_chunk_is_storage_overlimit() again (or factor the existing resume
logic into a helper) after flb_input_rate_gate_maybe_resume() returns, and if
each returns FLB_FALSE and in->config->is_running/is_ingestion_active are true
and in->rate_gate_status == FLB_INPUT_RUNNING then set
in->mem_buf_status/in->storage_buf_status to FLB_INPUT_RUNNING and invoke
flb_input_resume(in) (and logging) just like the earlier branches; ensure
flb_input_buf_paused() check still runs to cover internal paused state.
- Around line 1103-1116: The rate accounting (calls/fields used by
flb_input_rate_gate_protect and flb_input_rate_gate_maybe_resume) must run
regardless of FLB_HAVE_METRICS: move the flb_input_rate_update call and any
updates to rate_window_*, rate_bytes, rate_records (the data used by the gate)
outside the `#ifdef` FLB_HAVE_METRICS block so they execute even in no-metrics
builds; keep only the cmt_counter_add() calls and other metric-specific code
inside the FLB_HAVE_METRICS guard. Apply the same change for the second
occurrence around the flb_input_rate_update usage (the block referenced at
2815-2828) so gate logic always sees correct non-zero rates. Ensure ts
(cfl_time_now()) is available to both metric and non-metric updates.

---

Nitpick comments:
In
`@tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml`:
- Line 6: Replace the hardcoded shared path in the config (the storage.path
entry in config_rate_gate_filesystem.yaml) with a per-run temp path provided by
the test harness (e.g., use the harness environment variable or template
placeholder the tests inject such as TEST_TMP_DIR or a {{test_temp_dir}}
variable). Update the config template to reference that variable instead of
"/tmp/fluent-bit-rate-gate-storage" and ensure the test harness sets/creates the
directory before the scenario runs and cleans it up afterwards so each test run
uses an isolated filesystem backlog path.

In `@tests/internal/input_rate_gate.c`:
- Around line 197-230: The test only covers 1-second rollover and misses
exercising pre-rollover logic for windows >1s; add a new regression in or
adjacent to test_rate_update_window_rollover that sets
ctx.input->rate_window_size to a multi-second value (e.g., 2s or 3s),
manipulates timestamps via cfl_time_now()/ts and calls flb_input_rate_update
with events spread across the multi-second window and across rollovers, and then
assert expected normalized values for ctx.input->rate_records, rate_bytes,
rate_window_records, and rate_window_bytes so that
flb_input_rate_gate_is_limited() and flb_input_rate_gate_can_resume() paths that
rely on multi-second normalization are exercised and validated.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3f76ce80-4c4e-4574-901c-10c327d23a05

📥 Commits

Reviewing files that changed from the base of the PR and between e8adc6d and 3a16ddf.

📒 Files selected for processing (13)
  • include/fluent-bit/flb_input.h
  • src/flb_engine.c
  • src/flb_input.c
  • src/flb_input_chunk.c
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
  • tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
  • tests/internal/CMakeLists.txt
  • tests/internal/input_rate_gate.c

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/flb_input_chunk.c (2)

2451-2497: ⚠️ Potential issue | 🟠 Major

The new rate-gate guard creates a circular wait.

These resume branches now require rate_gate_status == FLB_INPUT_RUNNING, but flb_input_rate_gate_maybe_resume() only clears the rate gate after mem_buf_status and storage_buf_status are already running. In a mixed-pause state, neither side can transition first, so scheduler-driven recovery stalls.

Suggested fix
-        in->mem_buf_status == FLB_INPUT_PAUSED &&
-        in->rate_gate_status == FLB_INPUT_RUNNING) {
+        in->mem_buf_status == FLB_INPUT_PAUSED) {
         in->mem_buf_status = FLB_INPUT_RUNNING;
-        if (in->p->cb_resume) {
+        if (in->rate_gate_status == FLB_INPUT_RUNNING &&
+            in->p->cb_resume) {
             flb_input_resume(in);
             flb_info("[input] %s resume (mem buf overlimit - buf size %zuB now below limit %zuB)",
                      flb_input_name(in),
                      in->mem_chunks_size,
                      in->mem_buf_limit);
         }
     }
@@
-        in->storage_buf_status == FLB_INPUT_PAUSED &&
-        in->rate_gate_status == FLB_INPUT_RUNNING) {
+        in->storage_buf_status == FLB_INPUT_PAUSED) {
         in->storage_buf_status = FLB_INPUT_RUNNING;
-        if (in->p->cb_resume) {
+        if (in->rate_gate_status == FLB_INPUT_RUNNING &&
+            in->p->cb_resume) {
             flb_input_resume(in);
             flb_info("[input] %s resume (storage buf overlimit %zu/%zu)",
                      flb_input_name(in),
                      ((struct flb_storage_input *) in->storage)->cio->total_chunks_up,
                      ((struct flb_storage_input *) in->storage)->cio->max_chunks_up);
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/flb_input_chunk.c` around lines 2451 - 2497, The new guard requiring
rate_gate_status == FLB_INPUT_RUNNING causes a circular wait between the
mem/storage resume checks and the rate-gate clearance: call
flb_input_rate_gate_maybe_resume() before the memory/storage resume blocks (or
alternatively remove the rate_gate_status == FLB_INPUT_RUNNING check from the
mem/storage resume branches) so the rate gate can clear first and allow
flb_input_chunk_is_mem_overlimit, flb_input_chunk_is_storage_overlimit and
subsequent flb_input_resume() calls to proceed; update the logic around
in->rate_gate_status, flb_input_rate_gate_maybe_resume(), flb_input_resume(),
flb_input_paused(), and the two checks that use rate_gate_status to break the
deadlock.

1103-1116: ⚠️ Potential issue | 🟠 Major

Don't count restored backlog as fresh ingress.

flb_input_chunk_map() marks these chunks as fs_backlog, so feeding them into flb_input_rate_update() makes old filesystem backlog count against the live rate gate. A large backlog can pause the input on startup/rollout before any new events arrive.

Suggested fix
-        flb_input_rate_update(in, ts, ic->total_records, buf_size);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/flb_input_chunk.c` around lines 1103 - 1116, The code is updating input
rate metrics for chunks restored from disk, which were marked as fs_backlog by
flb_input_chunk_map(), causing old backlog to be counted as fresh ingress;
change the logic so flb_input_rate_update() is not called for fs_backlog chunks
(check the chunk/backlog flag on ic, e.g. ic->fs_backlog or the corresponding
indicator set by flb_input_chunk_map()) while retaining the counters if
desired—wrap the flb_input_rate_update(in, ts, ic->total_records, buf_size) call
in a conditional that skips it when the chunk is a filesystem backlog.
🧹 Nitpick comments (1)
tests/internal/input_rate_gate.c (1)

326-335: Add a backlog-vs-live rate-gate regression.

The new accounting touches both live appends and chunk restoration, but this suite never exercises an fs_backlog chunk being mapped before live traffic. A focused case here would catch startup/filesystem false positives.

Based on learnings, "Applies to tests/internal/**/*.{c,h} : Add regression tests for: mixed signals, processor drop/modify paths, multi-route fan-out, backlog + live ingestion parity".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/internal/input_rate_gate.c` around lines 326 - 335, Add a regression
test that exercises an fs_backlog chunk being mapped before any live appends to
ensure rate-gate accounting treats backlog vs live correctly: implement a new
test function (e.g., test_rate_gate_backlog_vs_live) that maps/ restores a
backlog chunk into the filesystem, then starts live append traffic and asserts
the rate-gate metrics/state (pause/resume/hysteresis or counters) reflect both
backlog and live paths without falsely throttling new live writes; add this test
symbol to TEST_LIST (alongside existing test_rate_gate_* entries) so the suite
runs it and ensure the test covers mixed signals (backlog mapping then live
ingestion) and verifies parity between backlog accounting and live append
accounting.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/flb_input_chunk.c`:
- Around line 2451-2497: The new guard requiring rate_gate_status ==
FLB_INPUT_RUNNING causes a circular wait between the mem/storage resume checks
and the rate-gate clearance: call flb_input_rate_gate_maybe_resume() before the
memory/storage resume blocks (or alternatively remove the rate_gate_status ==
FLB_INPUT_RUNNING check from the mem/storage resume branches) so the rate gate
can clear first and allow flb_input_chunk_is_mem_overlimit,
flb_input_chunk_is_storage_overlimit and subsequent flb_input_resume() calls to
proceed; update the logic around in->rate_gate_status,
flb_input_rate_gate_maybe_resume(), flb_input_resume(), flb_input_paused(), and
the two checks that use rate_gate_status to break the deadlock.
- Around line 1103-1116: The code is updating input rate metrics for chunks
restored from disk, which were marked as fs_backlog by flb_input_chunk_map(),
causing old backlog to be counted as fresh ingress; change the logic so
flb_input_rate_update() is not called for fs_backlog chunks (check the
chunk/backlog flag on ic, e.g. ic->fs_backlog or the corresponding indicator set
by flb_input_chunk_map()) while retaining the counters if desired—wrap the
flb_input_rate_update(in, ts, ic->total_records, buf_size) call in a conditional
that skips it when the chunk is a filesystem backlog.

---

Nitpick comments:
In `@tests/internal/input_rate_gate.c`:
- Around line 326-335: Add a regression test that exercises an fs_backlog chunk
being mapped before any live appends to ensure rate-gate accounting treats
backlog vs live correctly: implement a new test function (e.g.,
test_rate_gate_backlog_vs_live) that maps/ restores a backlog chunk into the
filesystem, then starts live append traffic and asserts the rate-gate
metrics/state (pause/resume/hysteresis or counters) reflect both backlog and
live paths without falsely throttling new live writes; add this test symbol to
TEST_LIST (alongside existing test_rate_gate_* entries) so the suite runs it and
ensure the test covers mixed signals (backlog mapping then live ingestion) and
verifies parity between backlog accounting and live append accounting.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1b590a09-ca8b-412f-a77d-f630e579ae40

📥 Commits

Reviewing files that changed from the base of the PR and between 3a16ddf and b935e4b.

📒 Files selected for processing (12)
  • include/fluent-bit/flb_input.h
  • src/flb_input.c
  • src/flb_input_chunk.c
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
  • tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
  • tests/internal/CMakeLists.txt
  • tests/internal/input_rate_gate.c
✅ Files skipped from review due to trivial changes (7)
  • tests/internal/CMakeLists.txt
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
tests/internal/input_rate_gate.c (1)

233-266: Test relies on implicit default values for resume conditions.

Looking at flb_input_rate_gate_maybe_resume() in src/flb_input.c, the resume check requires multiple conditions beyond just the rate threshold:

if (can_resume == FLB_TRUE &&
    ins->mem_buf_status == FLB_INPUT_RUNNING &&
    ins->storage_buf_status == FLB_INPUT_RUNNING &&
    ins->config->is_running == FLB_TRUE &&
    ins->config->is_ingestion_active == FLB_TRUE) {

The test at line 262-263 expects rate_gate_status to become FLB_INPUT_RUNNING, but doesn't explicitly set mem_buf_status, storage_buf_status, config->is_running, or config->is_ingestion_active. This relies on default initialization values being correct.

For test robustness and clarity, consider explicitly setting these preconditions:

🔧 Suggested improvement for explicit preconditions
     ctx.input->rate_gate_max_bytes = 100;
     ctx.input->rate_gate_resume_ratio = 0.80;
     ctx.input->rate_window_start = cfl_time_now();
     ctx.input->rate_window_size = 10 * FLB_NSEC_IN_SEC;
+    ctx.input->mem_buf_status = FLB_INPUT_RUNNING;
+    ctx.input->storage_buf_status = FLB_INPUT_RUNNING;
+    ctx.config->is_running = FLB_TRUE;
+    ctx.config->is_ingestion_active = FLB_TRUE;
 
     ctx.input->rate_bytes = 110.0;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/internal/input_rate_gate.c` around lines 233 - 266, The test
test_rate_gate_hysteresis_resume relies on implicit defaults when calling
flb_input_rate_gate_maybe_resume; explicitly set the preconditions used in the
resume check: set ctx.input->mem_buf_status = FLB_INPUT_RUNNING,
ctx.input->storage_buf_status = FLB_INPUT_RUNNING, ctx.input->config->is_running
= FLB_TRUE, and ctx.input->config->is_ingestion_active = FLB_TRUE before calling
flb_input_rate_gate_maybe_resume so the resume path in
flb_input_rate_gate_maybe_resume() can be exercised deterministically.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@tests/internal/input_rate_gate.c`:
- Around line 233-266: The test test_rate_gate_hysteresis_resume relies on
implicit defaults when calling flb_input_rate_gate_maybe_resume; explicitly set
the preconditions used in the resume check: set ctx.input->mem_buf_status =
FLB_INPUT_RUNNING, ctx.input->storage_buf_status = FLB_INPUT_RUNNING,
ctx.input->config->is_running = FLB_TRUE, and
ctx.input->config->is_ingestion_active = FLB_TRUE before calling
flb_input_rate_gate_maybe_resume so the resume path in
flb_input_rate_gate_maybe_resume() can be exercised deterministically.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d488da05-b9a8-4e56-98c0-82031eca786d

📥 Commits

Reviewing files that changed from the base of the PR and between b935e4b and 03c0db8.

📒 Files selected for processing (9)
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
  • tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
  • tests/internal/CMakeLists.txt
  • tests/internal/input_rate_gate.c
✅ Files skipped from review due to trivial changes (6)
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
  • tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/internal/CMakeLists.txt
  • tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant