Skip to content

Add StreamThrottler for rate-limiting bursty values#53

Merged
VelikovPetar merged 8 commits intodevelopfrom
feat/throttler
Apr 9, 2026
Merged

Add StreamThrottler for rate-limiting bursty values#53
VelikovPetar merged 8 commits intodevelopfrom
feat/throttler

Conversation

@aleksandar-apostolov
Copy link
Copy Markdown
Collaborator

@aleksandar-apostolov aleksandar-apostolov commented Apr 9, 2026

Goal

Add a configurable throttler primitive to core's processing toolkit. Rate-limits bursty value streams with pluggable strategies — needed for typing indicators, read receipts, and notification delivery across chat and video products.

Implementation

  • StreamThrottlePolicy sealed interface with three strategies:
    • Leading — first value delivered immediately, rest dropped until window expires
    • Trailing — last value delivered when window expires
    • LeadingAndTrailing — first value immediately + last value at window end
  • StreamThrottler<T> interface in api/processing/ with onValue, submit, reset
  • StreamThrottlerImpl<T> using AtomicBoolean for lock-free window gating
  • Callback delivery and window timer run in separate coroutines (no blocking)
  • Window-expiry job tracked and cancelled on reset() to prevent stale coroutine races
  • @ConsistentCopyVisibility + internal constructors to force validation via factory methods
  • windowMs validated > 0

Testing

  • 29 tests across all three modes + edge cases
  • Race condition regression test (reset mid-window)
  • Callback exception recovery
  • Scope cancellation behavior
  • Double reset idempotency
  • spotless clean
  • detekt clean

pr: core

Leading-edge throttler that delivers the first value immediately and
drops subsequent values until the time window expires. Useful for
typing indicators, read receipts, and notification delivery.
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 9, 2026

PR checklist ✅

All required conditions are satisfied:

  • Title length is OK (or ignored by label).
  • At least one pr: label exists.
  • Sections ### Goal, ### Implementation, and ### Testing are filled.

🎉 Great job! This PR is ready for review.

@aleksandar-apostolov aleksandar-apostolov changed the title feat(processing): add StreamThrottler for rate-limiting bursty values Add StreamThrottler for rate-limiting bursty values Apr 9, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 9, 2026

Walkthrough

Introduces a new Stream throttling API via the StreamThrottler<T> interface and StreamThrottlerImpl<T> implementation. The throttler enforces leading-edge delivery—accepting the first value per window and dropping subsequent submissions within the configurable window period. Comprehensive test coverage validates behavior across multiple scenarios.

Changes

Cohort / File(s) Summary
Throttling API
stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt
Public interface defining throttling operations: onValue() for callback registration, submit() for value delivery (returns true/false), and reset() for window clearing. Factory function constructs implementation with configurable window duration (default 3000L ms).
Throttling Implementation
stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt
Internal implementation enforcing leading-edge throttling. First submit() in an inactive window triggers callback and activates a delay window; subsequent submissions during the window are rejected with verbose logging. reset() clears window state immediately. Callback registration via onValue() is atomic.
Throttling Tests
stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt
Comprehensive test suite using StandardTestDispatcher and virtual time advancement. Validates: immediate first-value delivery, in-window rejection, post-window acceptance, rapid burst handling, multi-window sequences, mid-window reset behavior, return values, missing callback safety, and boundary conditions.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Throttler as StreamThrottlerImpl
    participant Callback
    participant Coroutine as Coroutine Delay

    Client->>Throttler: submit(value1)
    Throttler->>Throttler: window inactive?
    Throttler->>Callback: onValue(value1)
    Callback->>Client: ✓ returns true
    Throttler->>Coroutine: launch delay(windowMs)
    
    Client->>Throttler: submit(value2)
    Throttler->>Throttler: window active?
    Throttler->>Client: ✗ returns false (dropped)
    
    Coroutine->>Throttler: delay complete
    Throttler->>Throttler: clear window
    
    Client->>Throttler: submit(value3)
    Throttler->>Throttler: window inactive?
    Throttler->>Callback: onValue(value3)
    Callback->>Client: ✓ returns true
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 A throttler hops through time's domain,
Leading-edge deliveries, quite plain!
First value through, the rest must wait,
Windows reset—never too late.
Stream flows smooth, no overflow pain! 🎉

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 23.53% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title directly and clearly summarizes the main change: adding a StreamThrottler for rate-limiting bursty values, which matches the core purpose and functionality of the entire changeset.
Description check ✅ Passed PR description is well-structured with clear Goal, Implementation, and Testing sections that align with the template and provide sufficient detail.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/throttler

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt (1)

86-90: Consider validating windowMs parameter.

Negative or zero values for windowMs could cause unexpected behavior (immediate window close or no-op delay). A require(windowMs > 0) check would make the API more robust.

🛡️ Suggested validation
 public fun <T> StreamThrottler(
     scope: CoroutineScope,
     logger: StreamLogger,
     windowMs: Long = 3_000L,
-): StreamThrottler<T> = StreamThrottlerImpl(scope = scope, logger = logger, windowMs = windowMs)
+): StreamThrottler<T> {
+    require(windowMs > 0) { "windowMs must be positive, was $windowMs" }
+    return StreamThrottlerImpl(scope = scope, logger = logger, windowMs = windowMs)
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt`
around lines 86 - 90, The factory function StreamThrottler(...) accepts windowMs
but does not validate it; add a guard at the start of the factory (or inside
StreamThrottlerImpl constructor) such as require(windowMs > 0) { "windowMs must
be > 0" } (or throw IllegalArgumentException) to prevent zero/negative values
causing undefined behavior; update the StreamThrottler factory that returns
StreamThrottlerImpl (and/or StreamThrottlerImpl's constructor) to perform this
check and surface a clear error message referencing the windowMs parameter.
stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt (1)

214-237: Consider adding a test for the reset race condition.

This test verifies that reset() allows immediate delivery, but doesn't advance time past the original window boundary. A test that continues past t=1000 (the original window expiry) would reveal the race condition where the stale delay coroutine prematurely clears the new window.

📝 Suggested additional test case
`@Test`
fun `reset mid-window does not allow stale delay to close new window`() = runTest {
    val dispatcher = StandardTestDispatcher(testScheduler)
    val scope = CoroutineScope(SupervisorJob() + dispatcher)
    val delivered = mutableListOf<String>()
    val throttler = StreamThrottlerImpl<String>(
        scope = scope,
        logger = mockk(relaxed = true),
        windowMs = 1_000,
    )
    throttler.onValue { delivered.add(it) }

    throttler.submit("first")  // t=0, window until t=1000
    testScheduler.runCurrent()

    advanceTimeBy(300)
    throttler.reset()
    throttler.submit("second")  // t=300, new window until t=1300
    testScheduler.runCurrent()

    // Advance past original window expiry but before new window expiry
    advanceTimeBy(800)  // now at t=1100
    testScheduler.runCurrent()
    
    // This should be dropped (new window active until t=1300)
    assertFalse(throttler.submit("should-be-dropped"))
    
    assertEquals(listOf("first", "second"), delivered)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt`
around lines 214 - 237, The test reveals a race where a stale delay coroutine
from the original window can clear the newly started window after reset; in
StreamThrottlerImpl ensure reset() either cancels the previous window delay
coroutine/job or uses a monotonic window token/epoch that the delay coroutine
checks before clearing the window so only the matching epoch can clear it;
update the implementations of reset(), the window scheduling logic (the
coroutine started after submit()), and any window-clear path to cancel or
validate the current window token so the stale delay cannot close a newly-reset
window.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt`:
- Around line 44-57: The submit(...) logic can be racy with reset(): the delay
coroutine launched in submit() unconditionally calls windowActive.set(false)
when it completes, so a reset() or a new submit() can be undone by an earlier
pending coroutine; modify the implementation to tie the window-expiry to a
generation token or a cancellable Job so only the most recent window can clear
windowActive. Concretely, add a generation counter (AtomicInteger) or store the
delay coroutine's Job (e.g., activeDelayJob) when you scope.launch the delay,
and in reset() increment the generation or cancel the Job; in the delay
coroutine capture the current generation (or check Job.isActive) and only call
windowActive.set(false) if the generation matches (or the Job was not
cancelled). Update submit(), reset(), and the delay-launching code paths
(references: submit, reset, windowActive, windowMs, scope.launch, callbackRef)
accordingly.

---

Nitpick comments:
In
`@stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt`:
- Around line 86-90: The factory function StreamThrottler(...) accepts windowMs
but does not validate it; add a guard at the start of the factory (or inside
StreamThrottlerImpl constructor) such as require(windowMs > 0) { "windowMs must
be > 0" } (or throw IllegalArgumentException) to prevent zero/negative values
causing undefined behavior; update the StreamThrottler factory that returns
StreamThrottlerImpl (and/or StreamThrottlerImpl's constructor) to perform this
check and surface a clear error message referencing the windowMs parameter.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt`:
- Around line 214-237: The test reveals a race where a stale delay coroutine
from the original window can clear the newly started window after reset; in
StreamThrottlerImpl ensure reset() either cancels the previous window delay
coroutine/job or uses a monotonic window token/epoch that the delay coroutine
checks before clearing the window so only the matching epoch can clear it;
update the implementations of reset(), the window scheduling logic (the
coroutine started after submit()), and any window-clear path to cancel or
validate the current window token so the stale delay cannot close a newly-reset
window.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: a1fb316b-e7fa-4b12-9970-cb36a30b301d

📥 Commits

Reviewing files that changed from the base of the PR and between 872f3a5 and 43121c9.

📒 Files selected for processing (3)
  • stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt
  • stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt
  • stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt

@aleksandar-apostolov aleksandar-apostolov changed the title Add StreamThrottler for rate-limiting bursty values Add StreamThrottler for rate-limiting bursty values Apr 9, 2026
Replace fixed leading-edge with StreamThrottlePolicy sealed interface
supporting Leading, Trailing, and LeadingAndTrailing strategies.
Rename mode→policy for consistency with StreamRetryPolicy.
Unify window-expiry tracking across all throttle modes using a shared
windowJob reference. reset() now cancels the pending coroutine, preventing
a stale delay from prematurely closing a new window started after reset.

Adds regression test for the exact race scenario.
Validate windowMs > 0 in StreamThrottlePolicy factory methods.
Add 6 edge case tests: invalid windowMs, callback exception recovery,
trailing stale window race, post-trailing window restart, double reset,
and submit after scope cancellation.
- Make data class constructors internal to force validation via factory
  methods, preventing windowMs <= 0 bypass
- Fix reset() ordering: clear trailingValue before opening windowActive
  to prevent concurrent submit from losing its value
- Fix LeadingAndTrailing KDoc: trailing fires when a newer value was
  submitted, not when it differs from the leading value
Prevents bypassing validation via copy() on StreamThrottlePolicy
subtypes. Constructors remain internal, copy() now matches.
Cover StreamSingleFlightProcessor, StreamRetryProcessor,
StreamSerialProcessingQueue, StreamBatcher, StreamDebouncer, and
StreamThrottler factory functions (all three policy modes).
@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud bot commented Apr 9, 2026

@VelikovPetar VelikovPetar merged commit fe08323 into develop Apr 9, 2026
6 checks passed
@VelikovPetar VelikovPetar deleted the feat/throttler branch April 9, 2026 13:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr:new-feature New feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants