input: input_chunk: Handle flow with events metrics per window#11698
input: input_chunk: Handle flow with events metrics per window#11698
Conversation
📝 WalkthroughWalkthroughAdds a rate-gate feature for inputs: per-window ingestion accounting, rate-based pausing/resuming with hysteresis and optional backpressure, new CMetrics gauges, public rate-control APIs, configuration properties, and accompanying integration and unit tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Chunk as Input Chunk
participant Input as Input Instance
participant RateGate as Rate-Gate Control
participant Metrics as CMetrics
participant Pauser as Pause/Resume Handler
Chunk->>Input: flb_input_rate_update(timestamp, records, bytes)
Input->>RateGate: accumulate window counts
alt window elapsed
RateGate->>RateGate: compute per-second rates
RateGate->>Metrics: publish rate gauges
end
Chunk->>Input: flb_input_chunk_protect()
Input->>RateGate: flb_input_rate_gate_protect()
RateGate->>RateGate: sample backpressure (busy chunks, retries)
RateGate->>RateGate: evaluate limits
alt limits exceeded && backpressure enabled
RateGate->>Input: set rate_gate_status = PAUSED
RateGate->>Metrics: update gate gauges
RateGate-->>Input: return FLB_TRUE (pause)
else within limits
RateGate-->>Input: return FLB_FALSE
end
Chunk->>Input: flb_input_chunk_set_limits()
Input->>RateGate: flb_input_rate_gate_maybe_resume()
RateGate->>RateGate: apply resume_ratio hysteresis
alt rates below resume threshold
RateGate->>Input: set rate_gate_status = RUNNING
RateGate->>Pauser: signal resume (flb_input_resume)
RateGate->>Metrics: update gate gauges
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
1ba07dc to
28ad2e6
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3a16ddfdb8
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/flb_input_chunk.c (2)
2451-2496:⚠️ Potential issue | 🟠 MajorRe-evaluate buffer pause flags after the rate gate resumes.
These branches clear
mem_buf_status/storage_buf_statusonly whilerate_gate_statusis already running, but Line 2496 is what can flip the rate gate back to running on this same call. If the input is below the memory/storage limits and the hysteresis threshold is crossed in the same pass, the function exits with the rate gate reopened and the buffer pause flags still stuck atFLB_INPUT_PAUSED, which can leave the input paused until some later size-change happens to call this function again.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/flb_input_chunk.c` around lines 2451 - 2496, The code can reopen the rate gate (via flb_input_rate_gate_maybe_resume) and leave mem_buf_status/storage_buf_status at FLB_INPUT_PAUSED, so after potentially changing rate_gate_status you must re-check and clear the buffer pause flags when the corresponding limits are now below threshold: call flb_input_chunk_is_mem_overlimit() and flb_input_chunk_is_storage_overlimit() again (or factor the existing resume logic into a helper) after flb_input_rate_gate_maybe_resume() returns, and if each returns FLB_FALSE and in->config->is_running/is_ingestion_active are true and in->rate_gate_status == FLB_INPUT_RUNNING then set in->mem_buf_status/in->storage_buf_status to FLB_INPUT_RUNNING and invoke flb_input_resume(in) (and logging) just like the earlier branches; ensure flb_input_buf_paused() check still runs to cover internal paused state.
1103-1116:⚠️ Potential issue | 🟠 MajorKeep rate accounting outside
FLB_HAVE_METRICS.
flb_input_rate_gate_protect()andflb_input_rate_gate_maybe_resume()depend onrate_window_*,rate_bytes, andrate_records, but those fields are only updated here when metrics are compiled in. In a no-metrics build, the new gate will evaluate against zeros and silently stop enforcing input rate limits.Suggested fix
`#ifdef` FLB_HAVE_METRICS if (ic->total_records > 0) { ts = cfl_time_now(); cmt_counter_add(in->cmt_records, ts, ic->total_records, 1, (char *[]) {(char *) flb_input_name(in)}); cmt_counter_add(in->cmt_bytes, ts, buf_size, 1, (char *[]) {(char *) flb_input_name(in)}); - flb_input_rate_update(in, ts, ic->total_records, buf_size); flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->total_records, in->metrics); flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics); } `#endif` + if (ic->total_records > 0) { + flb_input_rate_update(in, ts, ic->total_records, buf_size); + }Also applies to: 2815-2828
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/flb_input_chunk.c` around lines 1103 - 1116, The rate accounting (calls/fields used by flb_input_rate_gate_protect and flb_input_rate_gate_maybe_resume) must run regardless of FLB_HAVE_METRICS: move the flb_input_rate_update call and any updates to rate_window_*, rate_bytes, rate_records (the data used by the gate) outside the `#ifdef` FLB_HAVE_METRICS block so they execute even in no-metrics builds; keep only the cmt_counter_add() calls and other metric-specific code inside the FLB_HAVE_METRICS guard. Apply the same change for the second occurrence around the flb_input_rate_update usage (the block referenced at 2815-2828) so gate logic always sees correct non-zero rates. Ensure ts (cfl_time_now()) is available to both metric and non-metric updates.
🧹 Nitpick comments (2)
tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml (1)
6-6: Use an isolated storage path per test run.Line 6 hardcodes a shared
/tmpdirectory, which can cause cross-run contamination for filesystem backlog scenarios. Prefer a per-run/per-scenario temp path supplied by the test harness.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml` at line 6, Replace the hardcoded shared path in the config (the storage.path entry in config_rate_gate_filesystem.yaml) with a per-run temp path provided by the test harness (e.g., use the harness environment variable or template placeholder the tests inject such as TEST_TMP_DIR or a {{test_temp_dir}} variable). Update the config template to reference that variable instead of "/tmp/fluent-bit-rate-gate-storage" and ensure the test harness sets/creates the directory before the scenario runs and cleans it up afterwards so each test run uses an isolated filesystem backlog path.tests/internal/input_rate_gate.c (1)
197-230: Add a multi-second-window regression here.This only validates the exact 1-second rollover path. It never exercises the new pre-rollover comparisons in
flb_input_rate_gate_is_limited()/flb_input_rate_gate_can_resume()withrate_window > 1s, so a broken per-second normalization can slip through unnoticed.As per coding guidelines,
tests/**: "Add or update tests for behavior changes" and "Add regression tests for: mixed signals, processor drop/modify paths, multi-route fan-out, backlog + live ingestion parity".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/internal/input_rate_gate.c` around lines 197 - 230, The test only covers 1-second rollover and misses exercising pre-rollover logic for windows >1s; add a new regression in or adjacent to test_rate_update_window_rollover that sets ctx.input->rate_window_size to a multi-second value (e.g., 2s or 3s), manipulates timestamps via cfl_time_now()/ts and calls flb_input_rate_update with events spread across the multi-second window and across rollovers, and then assert expected normalized values for ctx.input->rate_records, rate_bytes, rate_window_records, and rate_window_bytes so that flb_input_rate_gate_is_limited() and flb_input_rate_gate_can_resume() paths that rely on multi-second normalization are exercised and validated.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/flb_input_chunk.c`:
- Around line 2519-2521: The append-side pause logic currently only consults
flb_input_buf_paused() and thus ignores rate_gate_status set by
flb_input_rate_gate_protect(); update the paused predicate so the rate-gate
pause is honored by either (a) extend flb_input_buf_paused() to also check the
input's rate_gate_status (or call flb_input_rate_gate_protect() internally), or
(b) introduce a new combined helper (e.g., flb_input_paused()) that returns true
if flb_input_buf_paused() OR rate_gate_status is paused and replace all
append-side checks that call flb_input_buf_paused() with this combined helper;
ensure you update the locations that currently call flb_input_buf_paused() so
draining/accepting paths respect the rate-gate pause.
In `@src/flb_input.c`:
- Around line 2430-2476: The code sets ins->rate_gate_status = FLB_INPUT_RUNNING
early (before checking mem_buf_status/storage_buf/config), which can mark an
input as resumed while other pause reasons still block actual resume and prevent
future retries; remove that early assignment and only set ins->rate_gate_status
= FLB_INPUT_RUNNING after the full resume condition (the big if that checks
mem_buf_status, storage_buf_status, config->is_running and
config->is_ingestion_active) succeeds and flb_input_resume() is invoked (or
confirmed possible). Also only clear/report the cmt_rate_gate_limited gauge (and
related gauges) when the input truly transitions out of the paused state (i.e.,
after the status is changed to RUNNING and resume is performed); keep the gauges
indicating limited state while any pause condition remains. Adjust
flb_input_rate_gate_maybe_resume to update status and metrics atomically after
successful resume checks.
- Around line 2276-2350: The ongoing-window counters (ins->rate_window_bytes and
ins->rate_window_records) must be normalized to per-second before comparing to
per-second limits; update both places where comparisons happen (the blocking
check and flb_input_rate_gate_can_resume) to divide rate_window_bytes and
rate_window_records by (double) ins->rate_window_size (guarding against zero or
<=0 by treating it as 1) and compare those normalized values against
effective_max_bytes/effective_max_records returned by
flb_input_rate_gate_effective_limit; change only the comparisons so
ins->rate_window_bytes and ins->rate_window_records are cast to double and
divided by the window size when evaluating breaches and resumes.
In
`@tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py`:
- Around line 81-85: The test configures the mock 500 response after calling
Service.start(), which races with Fluent Bit startup and can miss the
retry/backpressure path; move the configure_http_response call to before
invoking service.start() so the receiver is set up (i.e., call
configure_http_response(status_code=500, body={"status":"retry"}) prior to
Service("config_rate_gate_fanout_retry.yaml", with_receiver=True).start()), then
call service.wait_for_requests(...) as before.
- Around line 69-77: The test
test_rate_gate_constructs_input_and_output_pipelines currently calls
service.stop() only at the end so a failing wait will leave the Service running;
wrap the service.start() and subsequent wait_for_log_contains calls in a
try/finally and call service.stop() from the finally block to guarantee cleanup;
apply the same change to the other test function
test_rate_gate_rollout_scenarios so both Service uses are always stopped even if
an assertion or wait fails (refer to Service, start, wait_for_log_contains,
stop).
---
Outside diff comments:
In `@src/flb_input_chunk.c`:
- Around line 2451-2496: The code can reopen the rate gate (via
flb_input_rate_gate_maybe_resume) and leave mem_buf_status/storage_buf_status at
FLB_INPUT_PAUSED, so after potentially changing rate_gate_status you must
re-check and clear the buffer pause flags when the corresponding limits are now
below threshold: call flb_input_chunk_is_mem_overlimit() and
flb_input_chunk_is_storage_overlimit() again (or factor the existing resume
logic into a helper) after flb_input_rate_gate_maybe_resume() returns, and if
each returns FLB_FALSE and in->config->is_running/is_ingestion_active are true
and in->rate_gate_status == FLB_INPUT_RUNNING then set
in->mem_buf_status/in->storage_buf_status to FLB_INPUT_RUNNING and invoke
flb_input_resume(in) (and logging) just like the earlier branches; ensure
flb_input_buf_paused() check still runs to cover internal paused state.
- Around line 1103-1116: The rate accounting (calls/fields used by
flb_input_rate_gate_protect and flb_input_rate_gate_maybe_resume) must run
regardless of FLB_HAVE_METRICS: move the flb_input_rate_update call and any
updates to rate_window_*, rate_bytes, rate_records (the data used by the gate)
outside the `#ifdef` FLB_HAVE_METRICS block so they execute even in no-metrics
builds; keep only the cmt_counter_add() calls and other metric-specific code
inside the FLB_HAVE_METRICS guard. Apply the same change for the second
occurrence around the flb_input_rate_update usage (the block referenced at
2815-2828) so gate logic always sees correct non-zero rates. Ensure ts
(cfl_time_now()) is available to both metric and non-metric updates.
---
Nitpick comments:
In
`@tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml`:
- Line 6: Replace the hardcoded shared path in the config (the storage.path
entry in config_rate_gate_filesystem.yaml) with a per-run temp path provided by
the test harness (e.g., use the harness environment variable or template
placeholder the tests inject such as TEST_TMP_DIR or a {{test_temp_dir}}
variable). Update the config template to reference that variable instead of
"/tmp/fluent-bit-rate-gate-storage" and ensure the test harness sets/creates the
directory before the scenario runs and cleans it up afterwards so each test run
uses an isolated filesystem backlog path.
In `@tests/internal/input_rate_gate.c`:
- Around line 197-230: The test only covers 1-second rollover and misses
exercising pre-rollover logic for windows >1s; add a new regression in or
adjacent to test_rate_update_window_rollover that sets
ctx.input->rate_window_size to a multi-second value (e.g., 2s or 3s),
manipulates timestamps via cfl_time_now()/ts and calls flb_input_rate_update
with events spread across the multi-second window and across rollovers, and then
assert expected normalized values for ctx.input->rate_records, rate_bytes,
rate_window_records, and rate_window_bytes so that
flb_input_rate_gate_is_limited() and flb_input_rate_gate_can_resume() paths that
rely on multi-second normalization are exercised and validated.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3f76ce80-4c4e-4574-901c-10c327d23a05
📒 Files selected for processing (13)
include/fluent-bit/flb_input.hsrc/flb_engine.csrc/flb_input.csrc/flb_input_chunk.ctests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yamltests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.pytests/internal/CMakeLists.txttests/internal/input_rate_gate.c
tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
Outdated
Show resolved
Hide resolved
tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
3a16ddf to
b935e4b
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/flb_input_chunk.c (2)
2451-2497:⚠️ Potential issue | 🟠 MajorThe new rate-gate guard creates a circular wait.
These resume branches now require
rate_gate_status == FLB_INPUT_RUNNING, butflb_input_rate_gate_maybe_resume()only clears the rate gate aftermem_buf_statusandstorage_buf_statusare already running. In a mixed-pause state, neither side can transition first, so scheduler-driven recovery stalls.Suggested fix
- in->mem_buf_status == FLB_INPUT_PAUSED && - in->rate_gate_status == FLB_INPUT_RUNNING) { + in->mem_buf_status == FLB_INPUT_PAUSED) { in->mem_buf_status = FLB_INPUT_RUNNING; - if (in->p->cb_resume) { + if (in->rate_gate_status == FLB_INPUT_RUNNING && + in->p->cb_resume) { flb_input_resume(in); flb_info("[input] %s resume (mem buf overlimit - buf size %zuB now below limit %zuB)", flb_input_name(in), in->mem_chunks_size, in->mem_buf_limit); } } @@ - in->storage_buf_status == FLB_INPUT_PAUSED && - in->rate_gate_status == FLB_INPUT_RUNNING) { + in->storage_buf_status == FLB_INPUT_PAUSED) { in->storage_buf_status = FLB_INPUT_RUNNING; - if (in->p->cb_resume) { + if (in->rate_gate_status == FLB_INPUT_RUNNING && + in->p->cb_resume) { flb_input_resume(in); flb_info("[input] %s resume (storage buf overlimit %zu/%zu)", flb_input_name(in), ((struct flb_storage_input *) in->storage)->cio->total_chunks_up, ((struct flb_storage_input *) in->storage)->cio->max_chunks_up); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/flb_input_chunk.c` around lines 2451 - 2497, The new guard requiring rate_gate_status == FLB_INPUT_RUNNING causes a circular wait between the mem/storage resume checks and the rate-gate clearance: call flb_input_rate_gate_maybe_resume() before the memory/storage resume blocks (or alternatively remove the rate_gate_status == FLB_INPUT_RUNNING check from the mem/storage resume branches) so the rate gate can clear first and allow flb_input_chunk_is_mem_overlimit, flb_input_chunk_is_storage_overlimit and subsequent flb_input_resume() calls to proceed; update the logic around in->rate_gate_status, flb_input_rate_gate_maybe_resume(), flb_input_resume(), flb_input_paused(), and the two checks that use rate_gate_status to break the deadlock.
1103-1116:⚠️ Potential issue | 🟠 MajorDon't count restored backlog as fresh ingress.
flb_input_chunk_map()marks these chunks asfs_backlog, so feeding them intoflb_input_rate_update()makes old filesystem backlog count against the live rate gate. A large backlog can pause the input on startup/rollout before any new events arrive.Suggested fix
- flb_input_rate_update(in, ts, ic->total_records, buf_size);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/flb_input_chunk.c` around lines 1103 - 1116, The code is updating input rate metrics for chunks restored from disk, which were marked as fs_backlog by flb_input_chunk_map(), causing old backlog to be counted as fresh ingress; change the logic so flb_input_rate_update() is not called for fs_backlog chunks (check the chunk/backlog flag on ic, e.g. ic->fs_backlog or the corresponding indicator set by flb_input_chunk_map()) while retaining the counters if desired—wrap the flb_input_rate_update(in, ts, ic->total_records, buf_size) call in a conditional that skips it when the chunk is a filesystem backlog.
🧹 Nitpick comments (1)
tests/internal/input_rate_gate.c (1)
326-335: Add a backlog-vs-live rate-gate regression.The new accounting touches both live appends and chunk restoration, but this suite never exercises an
fs_backlogchunk being mapped before live traffic. A focused case here would catch startup/filesystem false positives.Based on learnings, "Applies to tests/internal/**/*.{c,h} : Add regression tests for: mixed signals, processor drop/modify paths, multi-route fan-out, backlog + live ingestion parity".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/internal/input_rate_gate.c` around lines 326 - 335, Add a regression test that exercises an fs_backlog chunk being mapped before any live appends to ensure rate-gate accounting treats backlog vs live correctly: implement a new test function (e.g., test_rate_gate_backlog_vs_live) that maps/ restores a backlog chunk into the filesystem, then starts live append traffic and asserts the rate-gate metrics/state (pause/resume/hysteresis or counters) reflect both backlog and live paths without falsely throttling new live writes; add this test symbol to TEST_LIST (alongside existing test_rate_gate_* entries) so the suite runs it and ensure the test covers mixed signals (backlog mapping then live ingestion) and verifies parity between backlog accounting and live append accounting.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/flb_input_chunk.c`:
- Around line 2451-2497: The new guard requiring rate_gate_status ==
FLB_INPUT_RUNNING causes a circular wait between the mem/storage resume checks
and the rate-gate clearance: call flb_input_rate_gate_maybe_resume() before the
memory/storage resume blocks (or alternatively remove the rate_gate_status ==
FLB_INPUT_RUNNING check from the mem/storage resume branches) so the rate gate
can clear first and allow flb_input_chunk_is_mem_overlimit,
flb_input_chunk_is_storage_overlimit and subsequent flb_input_resume() calls to
proceed; update the logic around in->rate_gate_status,
flb_input_rate_gate_maybe_resume(), flb_input_resume(), flb_input_paused(), and
the two checks that use rate_gate_status to break the deadlock.
- Around line 1103-1116: The code is updating input rate metrics for chunks
restored from disk, which were marked as fs_backlog by flb_input_chunk_map(),
causing old backlog to be counted as fresh ingress; change the logic so
flb_input_rate_update() is not called for fs_backlog chunks (check the
chunk/backlog flag on ic, e.g. ic->fs_backlog or the corresponding indicator set
by flb_input_chunk_map()) while retaining the counters if desired—wrap the
flb_input_rate_update(in, ts, ic->total_records, buf_size) call in a conditional
that skips it when the chunk is a filesystem backlog.
---
Nitpick comments:
In `@tests/internal/input_rate_gate.c`:
- Around line 326-335: Add a regression test that exercises an fs_backlog chunk
being mapped before any live appends to ensure rate-gate accounting treats
backlog vs live correctly: implement a new test function (e.g.,
test_rate_gate_backlog_vs_live) that maps/ restores a backlog chunk into the
filesystem, then starts live append traffic and asserts the rate-gate
metrics/state (pause/resume/hysteresis or counters) reflect both backlog and
live paths without falsely throttling new live writes; add this test symbol to
TEST_LIST (alongside existing test_rate_gate_* entries) so the suite runs it and
ensure the test covers mixed signals (backlog mapping then live ingestion) and
verifies parity between backlog accounting and live append accounting.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1b590a09-ca8b-412f-a77d-f630e579ae40
📒 Files selected for processing (12)
include/fluent-bit/flb_input.hsrc/flb_input.csrc/flb_input_chunk.ctests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yamltests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.pytests/internal/CMakeLists.txttests/internal/input_rate_gate.c
✅ Files skipped from review due to trivial changes (7)
- tests/internal/CMakeLists.txt
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
b935e4b to
03c0db8
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
tests/internal/input_rate_gate.c (1)
233-266: Test relies on implicit default values for resume conditions.Looking at
flb_input_rate_gate_maybe_resume()insrc/flb_input.c, the resume check requires multiple conditions beyond just the rate threshold:if (can_resume == FLB_TRUE && ins->mem_buf_status == FLB_INPUT_RUNNING && ins->storage_buf_status == FLB_INPUT_RUNNING && ins->config->is_running == FLB_TRUE && ins->config->is_ingestion_active == FLB_TRUE) {The test at line 262-263 expects
rate_gate_statusto becomeFLB_INPUT_RUNNING, but doesn't explicitly setmem_buf_status,storage_buf_status,config->is_running, orconfig->is_ingestion_active. This relies on default initialization values being correct.For test robustness and clarity, consider explicitly setting these preconditions:
🔧 Suggested improvement for explicit preconditions
ctx.input->rate_gate_max_bytes = 100; ctx.input->rate_gate_resume_ratio = 0.80; ctx.input->rate_window_start = cfl_time_now(); ctx.input->rate_window_size = 10 * FLB_NSEC_IN_SEC; + ctx.input->mem_buf_status = FLB_INPUT_RUNNING; + ctx.input->storage_buf_status = FLB_INPUT_RUNNING; + ctx.config->is_running = FLB_TRUE; + ctx.config->is_ingestion_active = FLB_TRUE; ctx.input->rate_bytes = 110.0;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/internal/input_rate_gate.c` around lines 233 - 266, The test test_rate_gate_hysteresis_resume relies on implicit defaults when calling flb_input_rate_gate_maybe_resume; explicitly set the preconditions used in the resume check: set ctx.input->mem_buf_status = FLB_INPUT_RUNNING, ctx.input->storage_buf_status = FLB_INPUT_RUNNING, ctx.input->config->is_running = FLB_TRUE, and ctx.input->config->is_ingestion_active = FLB_TRUE before calling flb_input_rate_gate_maybe_resume so the resume path in flb_input_rate_gate_maybe_resume() can be exercised deterministically.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@tests/internal/input_rate_gate.c`:
- Around line 233-266: The test test_rate_gate_hysteresis_resume relies on
implicit defaults when calling flb_input_rate_gate_maybe_resume; explicitly set
the preconditions used in the resume check: set ctx.input->mem_buf_status =
FLB_INPUT_RUNNING, ctx.input->storage_buf_status = FLB_INPUT_RUNNING,
ctx.input->config->is_running = FLB_TRUE, and
ctx.input->config->is_ingestion_active = FLB_TRUE before calling
flb_input_rate_gate_maybe_resume so the resume path in
flb_input_rate_gate_maybe_resume() can be exercised deterministically.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d488da05-b9a8-4e56-98c0-82031eca786d
📒 Files selected for processing (9)
tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yamltests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yamltests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.pytests/internal/CMakeLists.txttests/internal/input_rate_gate.c
✅ Files skipped from review due to trivial changes (6)
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_memrb.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_steady_overrate.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_pipeline.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_filesystem.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_fanout_retry.yaml
- tests/integration/scenarios/config_rate_gate/config/config_rate_gate_burst_recovery.yaml
🚧 Files skipped from review as they are similar to previous changes (2)
- tests/internal/CMakeLists.txt
- tests/integration/scenarios/config_rate_gate/tests/test_config_rate_gate_001.py
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
This branch adds hysteresis-based resume control for input rate gating (rate_gate.resume_ratio), centralizes backpressure-adjusted effective limit computation, and applies those limits consistently to pause/resume decisions. It also fixes filesystem-mode enforcement gaps and adds periodic scheduler-driven resume checks so paused inputs can recover without requiring new ingestion events.
Testing
Includes expanded internal tests for parsing/window/hysteresis/pause-resume safety and updated config_rate_gate integration scenarios for fanout+retry and rollout paths across memrb/filesystem cases.
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
Using with integrated tests' configurations, there's no memory leaks:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit