From 43121c94e240ce21d860d317acf30a06f5f8877d Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Thu, 9 Apr 2026 09:58:05 +0200 Subject: [PATCH 1/8] feat(processing): add StreamThrottler for rate-limiting bursty values Leading-edge throttler that delivers the first value immediately and drops subsequent values until the time window expires. Useful for typing indicators, read receipts, and notification delivery. --- .../core/api/processing/StreamThrottler.kt | 90 ++++++ .../processing/StreamThrottlerImpl.kt | 62 ++++ .../processing/StreamThrottlerImplTest.kt | 268 ++++++++++++++++++ 3 files changed, 420 insertions(+) create mode 100644 stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt create mode 100644 stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt create mode 100644 stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt new file mode 100644 index 0000000..8cbc8aa --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.processing + +import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.internal.processing.StreamThrottlerImpl +import kotlinx.coroutines.CoroutineScope + +/** + * Rate-limits a bursty stream of values so that at most **one** value is delivered per time window. + * + * The first value submitted is delivered immediately (leading edge). Subsequent values arriving + * within the [windowMs] window are dropped. After the window expires, the next submission is + * delivered immediately, starting a new window. + * + * ### Semantics + * - **Leading-edge:** The first value in each window is delivered immediately. + * - **Drop excess:** Values arriving during an active window are silently dropped. + * - **No trailing delivery:** Unlike [StreamDebouncer], there is no delayed trailing emission. + * - **Thread-safety:** All functions are safe to call from multiple coroutines. + * + * ### Usage + * - Typing indicators: Emit at most once per 3 seconds. + * - Read receipts: Don't flood the server with mark-read calls. + * - Notification delivery: Rate-limit push notification display. + * + * ```kotlin + * val throttler = StreamThrottler(scope, logger, windowMs = 3_000) + * throttler.onValue { event -> api.sendTypingEvent(event) } + * + * // Rapid calls — only the first gets through per 3s window + * throttler.submit(event1) // delivered immediately + * throttler.submit(event2) // dropped (within window) + * throttler.submit(event3) // dropped (within window) + * // ... 3 seconds pass ... + * throttler.submit(event4) // delivered immediately (new window) + * ``` + */ +@StreamInternalApi +public interface StreamThrottler { + /** + * Registers the handler invoked when a value passes through the throttle. + * + * @param callback Suspend function called with the accepted value. + */ + public fun onValue(callback: suspend (T) -> Unit) + + /** + * Submits a value. If no window is active, the value is delivered immediately and a new window + * starts. If a window is active, the value is dropped. + * + * @param value The value to throttle. + * @return `true` if the value was accepted (delivered), `false` if it was dropped. + */ + public fun submit(value: T): Boolean + + /** Resets the throttle window, allowing the next [submit] to pass through immediately. */ + public fun reset() +} + +/** + * Creates a new [StreamThrottler] instance. + * + * @param T The type of value being throttled. + * @param scope Coroutine scope for launching the delivery. + * @param logger Logger for diagnostics. + * @param windowMs Minimum time in milliseconds between accepted values. Defaults to 3000ms. + * @return A new [StreamThrottler] instance. + */ +@StreamInternalApi +public fun StreamThrottler( + scope: CoroutineScope, + logger: StreamLogger, + windowMs: Long = 3_000L, +): StreamThrottler = StreamThrottlerImpl(scope = scope, logger = logger, windowMs = windowMs) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt new file mode 100644 index 0000000..dd0128f --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.processing + +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.processing.StreamThrottler +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +/** + * Leading-edge throttler: delivers the first value immediately, drops all subsequent values until + * [windowMs] has elapsed, then allows the next value through. + */ +internal class StreamThrottlerImpl( + private val scope: CoroutineScope, + private val logger: StreamLogger, + private val windowMs: Long, +) : StreamThrottler { + + private val windowActive = AtomicBoolean(false) + private val callbackRef = AtomicReference Unit> {} + + override fun onValue(callback: suspend (T) -> Unit) { + callbackRef.set(callback) + } + + override fun submit(value: T): Boolean { + val accepted = windowActive.compareAndSet(false, true) + if (!accepted) { + logger.v { "[throttle] Dropped value (window active): $value" } + } else { + logger.v { "[throttle] Accepted value: $value" } + scope.launch { callbackRef.get().invoke(value) } + scope.launch { + delay(windowMs) + windowActive.set(false) + } + } + return accepted + } + + override fun reset() { + windowActive.set(false) + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt new file mode 100644 index 0000000..ce7670d --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) + +package io.getstream.android.core.internal.processing + +import io.mockk.mockk +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse +import org.junit.Assert.assertTrue +import org.junit.Test + +class StreamThrottlerImplTest { + + @Test + fun `first value is delivered immediately`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + windowMs = 1_000, + ) + throttler.onValue { delivered.add(it) } + + val accepted = throttler.submit("first") + testScheduler.runCurrent() + + assertTrue(accepted) + assertEquals(listOf("first"), delivered) + } + + @Test + fun `second value within window is dropped`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + windowMs = 1_000, + ) + throttler.onValue { delivered.add(it) } + + val first = throttler.submit("first") + testScheduler.runCurrent() + val second = throttler.submit("second") + testScheduler.runCurrent() + + assertTrue(first) + assertFalse(second) + assertEquals(listOf("first"), delivered) + } + + @Test + fun `value after window expires is delivered`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + windowMs = 1_000, + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("first") + testScheduler.runCurrent() + + advanceTimeBy(1_001) + testScheduler.runCurrent() + + val accepted = throttler.submit("second") + testScheduler.runCurrent() + + assertTrue(accepted) + assertEquals(listOf("first", "second"), delivered) + } + + @Test + fun `rapid burst delivers only the first value`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + windowMs = 1_000, + ) + throttler.onValue { delivered.add(it) } + + val results = (1..50).map { throttler.submit(it) } + testScheduler.runCurrent() + + assertTrue(results.first()) + assertTrue(results.drop(1).all { !it }) + assertEquals(listOf(1), delivered) + } + + @Test + fun `multiple windows deliver one value each`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + windowMs = 500, + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("a") + testScheduler.runCurrent() + + advanceTimeBy(501) + testScheduler.runCurrent() + throttler.submit("b") + testScheduler.runCurrent() + assertFalse(throttler.submit("b-dropped")) + + advanceTimeBy(501) + testScheduler.runCurrent() + throttler.submit("c") + testScheduler.runCurrent() + + assertEquals(listOf("a", "b", "c"), delivered) + } + + @Test + fun `reset allows immediate delivery`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + windowMs = 1_000, + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("first") + testScheduler.runCurrent() + + throttler.reset() + + val accepted = throttler.submit("after-reset") + testScheduler.runCurrent() + + assertTrue(accepted) + assertEquals(listOf("first", "after-reset"), delivered) + } + + @Test + fun `submit returns false when window is active`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + windowMs = 1_000, + ) + throttler.onValue {} + + assertTrue(throttler.submit("first")) + assertFalse(throttler.submit("second")) + assertFalse(throttler.submit("third")) + } + + @Test + fun `no crash when no callback registered`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + windowMs = 1_000, + ) + + val accepted = throttler.submit("orphan") + testScheduler.runCurrent() + + assertTrue(accepted) + } + + @Test + fun `reset mid-window allows new value through`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + windowMs = 1_000, + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("first") + testScheduler.runCurrent() + + advanceTimeBy(300) + throttler.reset() + + throttler.submit("mid-window") + testScheduler.runCurrent() + + assertEquals(listOf("first", "mid-window"), delivered) + } + + @Test + fun `window reopens exactly after windowMs`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + windowMs = 1_000, + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("first") + testScheduler.runCurrent() + + // At exactly 999ms, still within window + advanceTimeBy(999) + testScheduler.runCurrent() + assertFalse(throttler.submit("too-early")) + + // At 1001ms total, window should have closed + advanceTimeBy(2) + testScheduler.runCurrent() + assertTrue(throttler.submit("on-time")) + testScheduler.runCurrent() + + assertEquals(listOf("first", "on-time"), delivered) + } +} From 843c78a4cf99ea8d6b7574f96aedd069f38acf5b Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Thu, 9 Apr 2026 10:08:33 +0200 Subject: [PATCH 2/8] feat(processing): add configurable StreamThrottlePolicy strategies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace fixed leading-edge with StreamThrottlePolicy sealed interface supporting Leading, Trailing, and LeadingAndTrailing strategies. Rename mode→policy for consistency with StreamRetryPolicy. --- .../api/processing/StreamThrottlePolicy.kt | 102 +++++ .../core/api/processing/StreamThrottler.kt | 49 +-- .../processing/StreamThrottlerImpl.kt | 71 +++- .../processing/StreamThrottlerImplTest.kt | 348 +++++++++++++++--- 4 files changed, 489 insertions(+), 81 deletions(-) create mode 100644 stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt new file mode 100644 index 0000000..54d793d --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.processing + +import io.getstream.android.core.annotations.StreamInternalApi + +/** + * Throttle strategy that controls **when** values are delivered within a time window. + * + * ### Strategies + * + * **[Leading]** — First value in the window is delivered immediately. Subsequent values are dropped + * until the window expires. + * + * ``` + * submit: A---B-C-D-----------E-F--- + * output: A--------------------E----- + * |--- windowMs ---| |--- windowMs ---| + * ``` + * + * **[Trailing]** — Values are collected during the window. Only the **last** value is delivered + * when the window expires. + * + * ``` + * submit: A---B-C-D-----------E-F--- + * output: -----------D--------------F + * |--- windowMs ---| |--- windowMs ---| + * ``` + * + * **[LeadingAndTrailing]** — First value delivered immediately. The last value in the window is + * also delivered when the window expires (if different from the leading value). + * + * ``` + * submit: A---B-C-D-----------E-F--- + * output: A----------D--------E----F + * |--- windowMs ---| |--- windowMs ---| + * ``` + */ +@StreamInternalApi +public sealed interface StreamThrottlePolicy { + + /** Minimum time in milliseconds between accepted values. */ + public val windowMs: Long + + /** + * First value passes immediately; subsequent values within the window are dropped. + * + * @property windowMs Minimum time between delivered values. Defaults to 3000ms. + */ + public data class Leading(override val windowMs: Long = DEFAULT_WINDOW_MS) : + StreamThrottlePolicy + + /** + * Values collected during the window; only the last value is delivered when the window expires. + * + * @property windowMs Collection window duration. Defaults to 3000ms. + */ + public data class Trailing(override val windowMs: Long = DEFAULT_WINDOW_MS) : + StreamThrottlePolicy + + /** + * First value passes immediately; the last value is also delivered when the window expires (if + * a newer value arrived during the window). + * + * @property windowMs Minimum time between delivered values. Defaults to 3000ms. + */ + public data class LeadingAndTrailing(override val windowMs: Long = DEFAULT_WINDOW_MS) : + StreamThrottlePolicy + + /** Factory methods for creating [StreamThrottlePolicy] instances. */ + public companion object { + + /** Default throttle window: 3 seconds. */ + public const val DEFAULT_WINDOW_MS: Long = 3_000L + + /** Creates a [Leading] mode with the given [windowMs]. */ + public fun leading(windowMs: Long = DEFAULT_WINDOW_MS): StreamThrottlePolicy = + Leading(windowMs) + + /** Creates a [Trailing] mode with the given [windowMs]. */ + public fun trailing(windowMs: Long = DEFAULT_WINDOW_MS): StreamThrottlePolicy = + Trailing(windowMs) + + /** Creates a [LeadingAndTrailing] mode with the given [windowMs]. */ + public fun leadingAndTrailing(windowMs: Long = DEFAULT_WINDOW_MS): StreamThrottlePolicy = + LeadingAndTrailing(windowMs) + } +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt index 8cbc8aa..3d7d7da 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt @@ -24,31 +24,33 @@ import kotlinx.coroutines.CoroutineScope /** * Rate-limits a bursty stream of values so that at most **one** value is delivered per time window. * - * The first value submitted is delivered immediately (leading edge). Subsequent values arriving - * within the [windowMs] window are dropped. After the window expires, the next submission is - * delivered immediately, starting a new window. + * Behaviour depends on the [StreamThrottlePolicy]: + * - **[Leading][StreamThrottlePolicy.Leading]:** First value delivered immediately, rest dropped + * until window expires. Best for: typing indicators, presence updates. + * - **[Trailing][StreamThrottlePolicy.Trailing]:** Values collected during the window, only the + * **last** value delivered when the window expires. Best for: position/progress updates where + * latest state matters. + * - **[LeadingAndTrailing][StreamThrottlePolicy.LeadingAndTrailing]:** First value delivered + * immediately, AND the last value delivered when the window expires (if different from the + * leading value). Best for: scroll position tracking, where both responsiveness and final + * accuracy matter. * * ### Semantics - * - **Leading-edge:** The first value in each window is delivered immediately. - * - **Drop excess:** Values arriving during an active window are silently dropped. - * - **No trailing delivery:** Unlike [StreamDebouncer], there is no delayed trailing emission. * - **Thread-safety:** All functions are safe to call from multiple coroutines. + * - **Cancellation:** [reset] clears any pending trailing delivery and resets the window. * * ### Usage - * - Typing indicators: Emit at most once per 3 seconds. - * - Read receipts: Don't flood the server with mark-read calls. - * - Notification delivery: Rate-limit push notification display. * * ```kotlin - * val throttler = StreamThrottler(scope, logger, windowMs = 3_000) + * // Typing indicators — leading edge, fire immediately, suppress duplicates + * val throttler = StreamThrottler(scope, logger, policy = StreamThrottlePolicy.leading()) * throttler.onValue { event -> api.sendTypingEvent(event) } * - * // Rapid calls — only the first gets through per 3s window - * throttler.submit(event1) // delivered immediately - * throttler.submit(event2) // dropped (within window) - * throttler.submit(event3) // dropped (within window) - * // ... 3 seconds pass ... - * throttler.submit(event4) // delivered immediately (new window) + * // Position updates — trailing edge, only latest state matters + * val throttler = StreamThrottler(scope, logger, policy = StreamThrottlePolicy.trailing()) + * + * // Scroll tracking — both edges + * val throttler = StreamThrottler(scope, logger, policy = StreamThrottlePolicy.leadingAndTrailing()) * ``` */ @StreamInternalApi @@ -61,15 +63,15 @@ public interface StreamThrottler { public fun onValue(callback: suspend (T) -> Unit) /** - * Submits a value. If no window is active, the value is delivered immediately and a new window - * starts. If a window is active, the value is dropped. + * Submits a value. Behaviour depends on the configured [StreamThrottlePolicy]. * * @param value The value to throttle. - * @return `true` if the value was accepted (delivered), `false` if it was dropped. + * @return `true` if the value was accepted for delivery (immediate or pending), `false` if it + * was dropped entirely. */ public fun submit(value: T): Boolean - /** Resets the throttle window, allowing the next [submit] to pass through immediately. */ + /** Resets the throttle window and discards any pending trailing delivery. */ public fun reset() } @@ -79,12 +81,13 @@ public interface StreamThrottler { * @param T The type of value being throttled. * @param scope Coroutine scope for launching the delivery. * @param logger Logger for diagnostics. - * @param windowMs Minimum time in milliseconds between accepted values. Defaults to 3000ms. + * @param policy The throttle strategy. Defaults to [StreamThrottlePolicy.leading] with 3000ms + * window. * @return A new [StreamThrottler] instance. */ @StreamInternalApi public fun StreamThrottler( scope: CoroutineScope, logger: StreamLogger, - windowMs: Long = 3_000L, -): StreamThrottler = StreamThrottlerImpl(scope = scope, logger = logger, windowMs = windowMs) + policy: StreamThrottlePolicy = StreamThrottlePolicy.leading(), +): StreamThrottler = StreamThrottlerImpl(scope = scope, logger = logger, policy = policy) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt index dd0128f..4a1d154 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt @@ -17,46 +17,99 @@ package io.getstream.android.core.internal.processing import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.processing.StreamThrottlePolicy import io.getstream.android.core.api.processing.StreamThrottler import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.launch /** - * Leading-edge throttler: delivers the first value immediately, drops all subsequent values until - * [windowMs] has elapsed, then allows the next value through. + * Throttler implementation that supports [Leading][StreamThrottlePolicy.Leading], + * [Trailing][StreamThrottlePolicy.Trailing], and + * [LeadingAndTrailing][StreamThrottlePolicy.LeadingAndTrailing] strategies. */ internal class StreamThrottlerImpl( private val scope: CoroutineScope, private val logger: StreamLogger, - private val windowMs: Long, + private val policy: StreamThrottlePolicy, ) : StreamThrottler { private val windowActive = AtomicBoolean(false) private val callbackRef = AtomicReference Unit> {} + private val trailingValue = AtomicReference(null) + private val trailingJob = AtomicReference(null) override fun onValue(callback: suspend (T) -> Unit) { callbackRef.set(callback) } - override fun submit(value: T): Boolean { + override fun submit(value: T): Boolean = + when (policy) { + is StreamThrottlePolicy.Leading -> submitLeading(value) + is StreamThrottlePolicy.Trailing -> submitTrailing(value) + is StreamThrottlePolicy.LeadingAndTrailing -> submitLeadingAndTrailing(value) + } + + override fun reset() { + windowActive.set(false) + trailingValue.set(null) + trailingJob.getAndSet(null)?.cancel() + } + + private fun submitLeading(value: T): Boolean { val accepted = windowActive.compareAndSet(false, true) if (!accepted) { - logger.v { "[throttle] Dropped value (window active): $value" } + logger.v { "[throttle:leading] Dropped: $value" } } else { - logger.v { "[throttle] Accepted value: $value" } + logger.v { "[throttle:leading] Accepted: $value" } scope.launch { callbackRef.get().invoke(value) } scope.launch { - delay(windowMs) + delay(policy.windowMs) windowActive.set(false) } } return accepted } - override fun reset() { - windowActive.set(false) + private fun submitTrailing(value: T): Boolean { + trailingValue.set(value) + if (windowActive.compareAndSet(false, true)) { + logger.v { "[throttle:trailing] Window started, pending: $value" } + scheduleTrailingDelivery() + } else { + logger.v { "[throttle:trailing] Updated pending: $value" } + } + return true + } + + private fun submitLeadingAndTrailing(value: T): Boolean { + val isLeading = windowActive.compareAndSet(false, true) + if (isLeading) { + logger.v { "[throttle:leading+trailing] Leading: $value" } + trailingValue.set(null) + scope.launch { callbackRef.get().invoke(value) } + scheduleTrailingDelivery() + } else { + logger.v { "[throttle:leading+trailing] Pending trailing: $value" } + trailingValue.set(value) + } + return true + } + + private fun scheduleTrailingDelivery() { + val job = + scope.launch { + delay(policy.windowMs) + windowActive.set(false) + val pending = trailingValue.getAndSet(null) + if (pending != null) { + logger.v { "[throttle] Trailing delivery: $pending" } + callbackRef.get().invoke(pending) + } + } + trailingJob.getAndSet(job)?.cancel() } } diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt index ce7670d..6fafa35 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt @@ -18,6 +18,7 @@ package io.getstream.android.core.internal.processing +import io.getstream.android.core.api.processing.StreamThrottlePolicy import io.mockk.mockk import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -32,8 +33,10 @@ import org.junit.Test class StreamThrottlerImplTest { + // ---- Leading mode ---- + @Test - fun `first value is delivered immediately`() = runTest { + fun `leading - first value is delivered immediately`() = runTest { val dispatcher = StandardTestDispatcher(testScheduler) val scope = CoroutineScope(SupervisorJob() + dispatcher) val delivered = mutableListOf() @@ -41,7 +44,7 @@ class StreamThrottlerImplTest { StreamThrottlerImpl( scope = scope, logger = mockk(relaxed = true), - windowMs = 1_000, + policy = StreamThrottlePolicy.leading(windowMs = 1_000), ) throttler.onValue { delivered.add(it) } @@ -53,7 +56,7 @@ class StreamThrottlerImplTest { } @Test - fun `second value within window is dropped`() = runTest { + fun `leading - second value within window is dropped`() = runTest { val dispatcher = StandardTestDispatcher(testScheduler) val scope = CoroutineScope(SupervisorJob() + dispatcher) val delivered = mutableListOf() @@ -61,22 +64,20 @@ class StreamThrottlerImplTest { StreamThrottlerImpl( scope = scope, logger = mockk(relaxed = true), - windowMs = 1_000, + policy = StreamThrottlePolicy.leading(windowMs = 1_000), ) throttler.onValue { delivered.add(it) } - val first = throttler.submit("first") + assertTrue(throttler.submit("first")) testScheduler.runCurrent() - val second = throttler.submit("second") + assertFalse(throttler.submit("second")) testScheduler.runCurrent() - assertTrue(first) - assertFalse(second) assertEquals(listOf("first"), delivered) } @Test - fun `value after window expires is delivered`() = runTest { + fun `leading - value after window expires is delivered`() = runTest { val dispatcher = StandardTestDispatcher(testScheduler) val scope = CoroutineScope(SupervisorJob() + dispatcher) val delivered = mutableListOf() @@ -84,25 +85,23 @@ class StreamThrottlerImplTest { StreamThrottlerImpl( scope = scope, logger = mockk(relaxed = true), - windowMs = 1_000, + policy = StreamThrottlePolicy.leading(windowMs = 1_000), ) throttler.onValue { delivered.add(it) } throttler.submit("first") testScheduler.runCurrent() - advanceTimeBy(1_001) testScheduler.runCurrent() - val accepted = throttler.submit("second") + assertTrue(throttler.submit("second")) testScheduler.runCurrent() - assertTrue(accepted) assertEquals(listOf("first", "second"), delivered) } @Test - fun `rapid burst delivers only the first value`() = runTest { + fun `leading - rapid burst delivers only the first value`() = runTest { val dispatcher = StandardTestDispatcher(testScheduler) val scope = CoroutineScope(SupervisorJob() + dispatcher) val delivered = mutableListOf() @@ -110,7 +109,7 @@ class StreamThrottlerImplTest { StreamThrottlerImpl( scope = scope, logger = mockk(relaxed = true), - windowMs = 1_000, + policy = StreamThrottlePolicy.leading(windowMs = 1_000), ) throttler.onValue { delivered.add(it) } @@ -123,7 +122,7 @@ class StreamThrottlerImplTest { } @Test - fun `multiple windows deliver one value each`() = runTest { + fun `leading - multiple windows deliver one value each`() = runTest { val dispatcher = StandardTestDispatcher(testScheduler) val scope = CoroutineScope(SupervisorJob() + dispatcher) val delivered = mutableListOf() @@ -131,21 +130,20 @@ class StreamThrottlerImplTest { StreamThrottlerImpl( scope = scope, logger = mockk(relaxed = true), - windowMs = 500, + policy = StreamThrottlePolicy.leading(windowMs = 500), ) throttler.onValue { delivered.add(it) } throttler.submit("a") testScheduler.runCurrent() - advanceTimeBy(501) testScheduler.runCurrent() + throttler.submit("b") testScheduler.runCurrent() - assertFalse(throttler.submit("b-dropped")) - advanceTimeBy(501) testScheduler.runCurrent() + throttler.submit("c") testScheduler.runCurrent() @@ -153,7 +151,7 @@ class StreamThrottlerImplTest { } @Test - fun `reset allows immediate delivery`() = runTest { + fun `leading - reset allows immediate delivery`() = runTest { val dispatcher = StandardTestDispatcher(testScheduler) val scope = CoroutineScope(SupervisorJob() + dispatcher) val delivered = mutableListOf() @@ -161,58 +159,132 @@ class StreamThrottlerImplTest { StreamThrottlerImpl( scope = scope, logger = mockk(relaxed = true), - windowMs = 1_000, + policy = StreamThrottlePolicy.leading(windowMs = 1_000), ) throttler.onValue { delivered.add(it) } throttler.submit("first") testScheduler.runCurrent() - throttler.reset() - val accepted = throttler.submit("after-reset") + assertTrue(throttler.submit("after-reset")) testScheduler.runCurrent() - assertTrue(accepted) assertEquals(listOf("first", "after-reset"), delivered) } @Test - fun `submit returns false when window is active`() = runTest { + fun `leading - no crash when no callback registered`() = runTest { val dispatcher = StandardTestDispatcher(testScheduler) val scope = CoroutineScope(SupervisorJob() + dispatcher) val throttler = StreamThrottlerImpl( scope = scope, logger = mockk(relaxed = true), - windowMs = 1_000, + policy = StreamThrottlePolicy.leading(windowMs = 1_000), ) - throttler.onValue {} + + assertTrue(throttler.submit("orphan")) + testScheduler.runCurrent() + } + + @Test + fun `leading - window reopens exactly after windowMs`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("first") + testScheduler.runCurrent() + + advanceTimeBy(999) + testScheduler.runCurrent() + assertFalse(throttler.submit("too-early")) + + advanceTimeBy(2) + testScheduler.runCurrent() + assertTrue(throttler.submit("on-time")) + testScheduler.runCurrent() + + assertEquals(listOf("first", "on-time"), delivered) + } + + // ---- Trailing mode ---- + + @Test + fun `trailing - single value delivered after window expires`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } assertTrue(throttler.submit("first")) - assertFalse(throttler.submit("second")) - assertFalse(throttler.submit("third")) + testScheduler.runCurrent() + assertTrue(delivered.isEmpty()) + + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertEquals(listOf("first"), delivered) } @Test - fun `no crash when no callback registered`() = runTest { + fun `trailing - last value wins within window`() = runTest { val dispatcher = StandardTestDispatcher(testScheduler) val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() val throttler = StreamThrottlerImpl( scope = scope, logger = mockk(relaxed = true), - windowMs = 1_000, + policy = StreamThrottlePolicy.trailing(windowMs = 1_000), ) + throttler.onValue { delivered.add(it) } - val accepted = throttler.submit("orphan") + assertTrue(throttler.submit("first")) + assertTrue(throttler.submit("second")) + assertTrue(throttler.submit("third")) testScheduler.runCurrent() + assertTrue(delivered.isEmpty()) - assertTrue(accepted) + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertEquals(listOf("third"), delivered) } @Test - fun `reset mid-window allows new value through`() = runTest { + fun `trailing - all submits return true`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 1_000), + ) + throttler.onValue {} + + assertTrue(throttler.submit("a")) + assertTrue(throttler.submit("b")) + assertTrue(throttler.submit("c")) + } + + @Test + fun `trailing - multiple windows deliver last value each`() = runTest { val dispatcher = StandardTestDispatcher(testScheduler) val scope = CoroutineScope(SupervisorJob() + dispatcher) val delivered = mutableListOf() @@ -220,24 +292,49 @@ class StreamThrottlerImplTest { StreamThrottlerImpl( scope = scope, logger = mockk(relaxed = true), - windowMs = 1_000, + policy = StreamThrottlePolicy.trailing(windowMs = 500), ) throttler.onValue { delivered.add(it) } - throttler.submit("first") + throttler.submit("a1") + throttler.submit("a2") + advanceTimeBy(501) + testScheduler.runCurrent() + + throttler.submit("b1") + throttler.submit("b2") + throttler.submit("b3") + advanceTimeBy(501) testScheduler.runCurrent() - advanceTimeBy(300) + assertEquals(listOf("a2", "b3"), delivered) + } + + @Test + fun `trailing - reset cancels pending delivery`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("pending") + testScheduler.runCurrent() throttler.reset() - throttler.submit("mid-window") + advanceTimeBy(1_001) testScheduler.runCurrent() - assertEquals(listOf("first", "mid-window"), delivered) + assertTrue(delivered.isEmpty()) } @Test - fun `window reopens exactly after windowMs`() = runTest { + fun `trailing - submit after reset starts new window`() = runTest { val dispatcher = StandardTestDispatcher(testScheduler) val scope = CoroutineScope(SupervisorJob() + dispatcher) val delivered = mutableListOf() @@ -245,24 +342,177 @@ class StreamThrottlerImplTest { StreamThrottlerImpl( scope = scope, logger = mockk(relaxed = true), - windowMs = 1_000, + policy = StreamThrottlePolicy.trailing(windowMs = 500), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("cancelled") + throttler.reset() + + throttler.submit("fresh") + advanceTimeBy(501) + testScheduler.runCurrent() + + assertEquals(listOf("fresh"), delivered) + } + + // ---- LeadingAndTrailing mode ---- + + @Test + fun `leadingAndTrailing - first value delivered immediately`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 1_000), ) throttler.onValue { delivered.add(it) } throttler.submit("first") testScheduler.runCurrent() - // At exactly 999ms, still within window - advanceTimeBy(999) + assertEquals(listOf("first"), delivered) + } + + @Test + fun `leadingAndTrailing - trailing value delivered at window end`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("first") testScheduler.runCurrent() - assertFalse(throttler.submit("too-early")) + throttler.submit("second") + throttler.submit("third") - // At 1001ms total, window should have closed - advanceTimeBy(2) + advanceTimeBy(1_001) testScheduler.runCurrent() - assertTrue(throttler.submit("on-time")) + + assertEquals(listOf("first", "third"), delivered) + } + + @Test + fun `leadingAndTrailing - no trailing delivery if no new values`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("only-one") testScheduler.runCurrent() - assertEquals(listOf("first", "on-time"), delivered) + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertEquals(listOf("only-one"), delivered) + } + + @Test + fun `leadingAndTrailing - all submits return true`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 1_000), + ) + throttler.onValue {} + + assertTrue(throttler.submit("a")) + assertTrue(throttler.submit("b")) + assertTrue(throttler.submit("c")) + } + + @Test + fun `leadingAndTrailing - multiple windows`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 500), + ) + throttler.onValue { delivered.add(it) } + + // Window 1: leading=a, trailing=c + throttler.submit("a") + testScheduler.runCurrent() + throttler.submit("b") + throttler.submit("c") + advanceTimeBy(501) + testScheduler.runCurrent() + + // Window 2: leading=d, no trailing + throttler.submit("d") + testScheduler.runCurrent() + advanceTimeBy(501) + testScheduler.runCurrent() + + assertEquals(listOf("a", "c", "d"), delivered) + } + + @Test + fun `leadingAndTrailing - reset clears pending trailing`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("leading") + testScheduler.runCurrent() + throttler.submit("pending-trailing") + throttler.reset() + + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertEquals(listOf("leading"), delivered) + } + + @Test + fun `leadingAndTrailing - rapid burst delivers first and last`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + (1..50).forEach { throttler.submit(it) } + testScheduler.runCurrent() + + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertEquals(listOf(1, 50), delivered) } } From f80e9ab90632a8c9192eefe1ef5bfd479c7c5a36 Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Thu, 9 Apr 2026 10:10:37 +0200 Subject: [PATCH 3/8] fix(processing): cancel stale window-expiry job on throttler reset Unify window-expiry tracking across all throttle modes using a shared windowJob reference. reset() now cancels the pending coroutine, preventing a stale delay from prematurely closing a new window started after reset. Adds regression test for the exact race scenario. --- .../processing/StreamThrottlerImpl.kt | 30 +++++++------- .../processing/StreamThrottlerImplTest.kt | 41 +++++++++++++++++++ 2 files changed, 57 insertions(+), 14 deletions(-) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt index 4a1d154..9b8262a 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt @@ -30,6 +30,9 @@ import kotlinx.coroutines.launch * Throttler implementation that supports [Leading][StreamThrottlePolicy.Leading], * [Trailing][StreamThrottlePolicy.Trailing], and * [LeadingAndTrailing][StreamThrottlePolicy.LeadingAndTrailing] strategies. + * + * A shared window-expiry job tracks the active coroutine across all modes. Resetting cancels it to + * prevent stale coroutines from prematurely closing a new window. */ internal class StreamThrottlerImpl( private val scope: CoroutineScope, @@ -40,7 +43,7 @@ internal class StreamThrottlerImpl( private val windowActive = AtomicBoolean(false) private val callbackRef = AtomicReference Unit> {} private val trailingValue = AtomicReference(null) - private val trailingJob = AtomicReference(null) + private val windowJob = AtomicReference(null) override fun onValue(callback: suspend (T) -> Unit) { callbackRef.set(callback) @@ -54,9 +57,9 @@ internal class StreamThrottlerImpl( } override fun reset() { + windowJob.getAndSet(null)?.cancel() windowActive.set(false) trailingValue.set(null) - trailingJob.getAndSet(null)?.cancel() } private fun submitLeading(value: T): Boolean { @@ -66,10 +69,7 @@ internal class StreamThrottlerImpl( } else { logger.v { "[throttle:leading] Accepted: $value" } scope.launch { callbackRef.get().invoke(value) } - scope.launch { - delay(policy.windowMs) - windowActive.set(false) - } + scheduleWindowExpiry(deliverTrailing = false) } return accepted } @@ -78,7 +78,7 @@ internal class StreamThrottlerImpl( trailingValue.set(value) if (windowActive.compareAndSet(false, true)) { logger.v { "[throttle:trailing] Window started, pending: $value" } - scheduleTrailingDelivery() + scheduleWindowExpiry(deliverTrailing = true) } else { logger.v { "[throttle:trailing] Updated pending: $value" } } @@ -91,7 +91,7 @@ internal class StreamThrottlerImpl( logger.v { "[throttle:leading+trailing] Leading: $value" } trailingValue.set(null) scope.launch { callbackRef.get().invoke(value) } - scheduleTrailingDelivery() + scheduleWindowExpiry(deliverTrailing = true) } else { logger.v { "[throttle:leading+trailing] Pending trailing: $value" } trailingValue.set(value) @@ -99,17 +99,19 @@ internal class StreamThrottlerImpl( return true } - private fun scheduleTrailingDelivery() { + private fun scheduleWindowExpiry(deliverTrailing: Boolean) { val job = scope.launch { delay(policy.windowMs) windowActive.set(false) - val pending = trailingValue.getAndSet(null) - if (pending != null) { - logger.v { "[throttle] Trailing delivery: $pending" } - callbackRef.get().invoke(pending) + if (deliverTrailing) { + val pending = trailingValue.getAndSet(null) + if (pending != null) { + logger.v { "[throttle] Trailing delivery: $pending" } + callbackRef.get().invoke(pending) + } } } - trailingJob.getAndSet(job)?.cancel() + windowJob.getAndSet(job)?.cancel() } } diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt index 6fafa35..9504a8b 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt @@ -173,6 +173,47 @@ class StreamThrottlerImplTest { assertEquals(listOf("first", "after-reset"), delivered) } + @Test + fun `leading - stale window expiry does not close new window after reset`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + // t=0: submit A, starts window expiry at t=1000 + throttler.submit("A") + testScheduler.runCurrent() + + // t=300: reset cancels old window job + advanceTimeBy(300) + throttler.reset() + + // t=300: submit B, starts new window expiry at t=1300 + assertTrue(throttler.submit("B")) + testScheduler.runCurrent() + + // t=1000: old delay WOULD have fired here — must NOT clear the window + advanceTimeBy(700) + testScheduler.runCurrent() + + // C should be rejected because B's window (until t=1300) is still active + assertFalse(throttler.submit("C")) + + // t=1301: B's window expires + advanceTimeBy(301) + testScheduler.runCurrent() + assertTrue(throttler.submit("D")) + testScheduler.runCurrent() + + assertEquals(listOf("A", "B", "D"), delivered) + } + @Test fun `leading - no crash when no callback registered`() = runTest { val dispatcher = StandardTestDispatcher(testScheduler) From 76721653ae0c4c1273d7aba8975769414a4446bb Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Thu, 9 Apr 2026 10:14:36 +0200 Subject: [PATCH 4/8] fix(processing): add windowMs validation and edge case tests Validate windowMs > 0 in StreamThrottlePolicy factory methods. Add 6 edge case tests: invalid windowMs, callback exception recovery, trailing stale window race, post-trailing window restart, double reset, and submit after scope cancellation. --- .../api/processing/StreamThrottlePolicy.kt | 17 +- .../processing/StreamThrottlerImplTest.kt | 168 ++++++++++++++++++ 2 files changed, 179 insertions(+), 6 deletions(-) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt index 54d793d..eb472dd 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt @@ -87,16 +87,21 @@ public sealed interface StreamThrottlePolicy { /** Default throttle window: 3 seconds. */ public const val DEFAULT_WINDOW_MS: Long = 3_000L - /** Creates a [Leading] mode with the given [windowMs]. */ + /** Creates a [Leading] policy with the given [windowMs]. */ public fun leading(windowMs: Long = DEFAULT_WINDOW_MS): StreamThrottlePolicy = - Leading(windowMs) + Leading(windowMs).validate() - /** Creates a [Trailing] mode with the given [windowMs]. */ + /** Creates a [Trailing] policy with the given [windowMs]. */ public fun trailing(windowMs: Long = DEFAULT_WINDOW_MS): StreamThrottlePolicy = - Trailing(windowMs) + Trailing(windowMs).validate() - /** Creates a [LeadingAndTrailing] mode with the given [windowMs]. */ + /** Creates a [LeadingAndTrailing] policy with the given [windowMs]. */ public fun leadingAndTrailing(windowMs: Long = DEFAULT_WINDOW_MS): StreamThrottlePolicy = - LeadingAndTrailing(windowMs) + LeadingAndTrailing(windowMs).validate() + + private fun T.validate(): T { + require(windowMs > 0) { "windowMs must be > 0, was $windowMs" } + return this + } } } diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt index 9504a8b..371702e 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt @@ -20,6 +20,8 @@ package io.getstream.android.core.internal.processing import io.getstream.android.core.api.processing.StreamThrottlePolicy import io.mockk.mockk +import kotlin.test.assertFailsWith +import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.SupervisorJob @@ -556,4 +558,170 @@ class StreamThrottlerImplTest { assertEquals(listOf(1, 50), delivered) } + + // ---- Edge cases ---- + + @Test + fun `policy rejects zero windowMs`() { + assertFailsWith { StreamThrottlePolicy.leading(windowMs = 0) } + assertFailsWith { StreamThrottlePolicy.trailing(windowMs = 0) } + assertFailsWith { + StreamThrottlePolicy.leadingAndTrailing(windowMs = 0) + } + } + + @Test + fun `policy rejects negative windowMs`() { + assertFailsWith { StreamThrottlePolicy.leading(windowMs = -1) } + } + + @Test + fun `leading - callback exception does not permanently lock throttler`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val handler = CoroutineExceptionHandler { _, _ -> /* swallow */ } + val scope = CoroutineScope(SupervisorJob() + dispatcher + handler) + val delivered = mutableListOf() + var shouldThrow = true + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 500), + ) + throttler.onValue { + if (shouldThrow) { + @Suppress("TooGenericExceptionThrown") throw RuntimeException("boom") + } + delivered.add(it) + } + + // First submit — callback throws, but window timer still runs + throttler.submit("explode") + testScheduler.runCurrent() + + // Wait for window to expire + advanceTimeBy(501) + testScheduler.runCurrent() + + // Window should have expired, next submit should work + shouldThrow = false + assertTrue(throttler.submit("recover")) + testScheduler.runCurrent() + + assertEquals(listOf("recover"), delivered) + } + + @Test + fun `trailing - stale window expiry does not close new window after reset`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + // t=0: submit A, starts window expiry at t=1000 + throttler.submit("A") + testScheduler.runCurrent() + + // t=300: reset cancels old window + advanceTimeBy(300) + throttler.reset() + + // t=300: submit B, starts new window expiry at t=1300 + throttler.submit("B") + testScheduler.runCurrent() + + // t=1000: old delay would have fired — must NOT deliver A or close B's window + advanceTimeBy(700) + testScheduler.runCurrent() + assertTrue(delivered.isEmpty()) + + // t=1300: update to C while B's window is still active + throttler.submit("C") + + // t=1301: B's window expires, delivers latest value (C) + advanceTimeBy(301) + testScheduler.runCurrent() + + assertEquals(listOf("C"), delivered) + } + + @Test + fun `leadingAndTrailing - trailing delivery allows new window to start`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 500), + ) + throttler.onValue { delivered.add(it) } + + // Window 1 + throttler.submit("leading-1") + testScheduler.runCurrent() + throttler.submit("trailing-1") + advanceTimeBy(501) + testScheduler.runCurrent() + + assertEquals(listOf("leading-1", "trailing-1"), delivered) + + // Window 2 should start fresh + assertTrue(throttler.submit("leading-2")) + testScheduler.runCurrent() + + assertEquals(listOf("leading-1", "trailing-1", "leading-2"), delivered) + } + + @Test + fun `double reset does not crash`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + + throttler.submit("a") + testScheduler.runCurrent() + throttler.reset() + throttler.reset() + + assertTrue(throttler.submit("b")) + } + + @Test + fun `submit after scope cancellation returns true but does not deliver`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val parentJob = SupervisorJob() + val scope = CoroutineScope(parentJob + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + // Cancel the scope + parentJob.cancel() + testScheduler.runCurrent() + + // Submit returns true (CAS succeeds) but callback launch is dead + val result = throttler.submit("dead") + testScheduler.runCurrent() + + assertTrue(result) + assertTrue(delivered.isEmpty()) + } } From d1ca71fcc1f46ff7bb7dd446da7881db7657f841 Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Thu, 9 Apr 2026 10:19:42 +0200 Subject: [PATCH 5/8] fix(processing): internal constructors, reset ordering, KDoc accuracy - Make data class constructors internal to force validation via factory methods, preventing windowMs <= 0 bypass - Fix reset() ordering: clear trailingValue before opening windowActive to prevent concurrent submit from losing its value - Fix LeadingAndTrailing KDoc: trailing fires when a newer value was submitted, not when it differs from the leading value --- .../core/api/processing/StreamThrottlePolicy.kt | 14 +++++++------- .../android/core/api/processing/StreamThrottler.kt | 6 +++--- .../internal/processing/StreamThrottlerImpl.kt | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt index eb472dd..52911a9 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt @@ -61,25 +61,25 @@ public sealed interface StreamThrottlePolicy { * * @property windowMs Minimum time between delivered values. Defaults to 3000ms. */ - public data class Leading(override val windowMs: Long = DEFAULT_WINDOW_MS) : - StreamThrottlePolicy + public data class Leading + internal constructor(override val windowMs: Long = DEFAULT_WINDOW_MS) : StreamThrottlePolicy /** * Values collected during the window; only the last value is delivered when the window expires. * * @property windowMs Collection window duration. Defaults to 3000ms. */ - public data class Trailing(override val windowMs: Long = DEFAULT_WINDOW_MS) : - StreamThrottlePolicy + public data class Trailing + internal constructor(override val windowMs: Long = DEFAULT_WINDOW_MS) : StreamThrottlePolicy /** * First value passes immediately; the last value is also delivered when the window expires (if - * a newer value arrived during the window). + * a newer value was submitted during the window). * * @property windowMs Minimum time between delivered values. Defaults to 3000ms. */ - public data class LeadingAndTrailing(override val windowMs: Long = DEFAULT_WINDOW_MS) : - StreamThrottlePolicy + public data class LeadingAndTrailing + internal constructor(override val windowMs: Long = DEFAULT_WINDOW_MS) : StreamThrottlePolicy /** Factory methods for creating [StreamThrottlePolicy] instances. */ public companion object { diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt index 3d7d7da..756fd39 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt @@ -31,9 +31,9 @@ import kotlinx.coroutines.CoroutineScope * **last** value delivered when the window expires. Best for: position/progress updates where * latest state matters. * - **[LeadingAndTrailing][StreamThrottlePolicy.LeadingAndTrailing]:** First value delivered - * immediately, AND the last value delivered when the window expires (if different from the - * leading value). Best for: scroll position tracking, where both responsiveness and final - * accuracy matter. + * immediately, AND the last value delivered when the window expires (if a newer value was + * submitted during the window). Best for: scroll position tracking, where both responsiveness and + * final accuracy matter. * * ### Semantics * - **Thread-safety:** All functions are safe to call from multiple coroutines. diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt index 9b8262a..2b4ef28 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt @@ -58,8 +58,8 @@ internal class StreamThrottlerImpl( override fun reset() { windowJob.getAndSet(null)?.cancel() - windowActive.set(false) trailingValue.set(null) + windowActive.set(false) } private fun submitLeading(value: T): Boolean { From 986e97c33c8dbb4abdc77a39d4e55c076e75ee59 Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Thu, 9 Apr 2026 10:23:28 +0200 Subject: [PATCH 6/8] fix(processing): add ConsistentCopyVisibility to policy data classes Prevents bypassing validation via copy() on StreamThrottlePolicy subtypes. Constructors remain internal, copy() now matches. --- .../android/core/api/processing/StreamThrottlePolicy.kt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt index 52911a9..9943705 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt @@ -61,6 +61,7 @@ public sealed interface StreamThrottlePolicy { * * @property windowMs Minimum time between delivered values. Defaults to 3000ms. */ + @ConsistentCopyVisibility public data class Leading internal constructor(override val windowMs: Long = DEFAULT_WINDOW_MS) : StreamThrottlePolicy @@ -69,6 +70,7 @@ public sealed interface StreamThrottlePolicy { * * @property windowMs Collection window duration. Defaults to 3000ms. */ + @ConsistentCopyVisibility public data class Trailing internal constructor(override val windowMs: Long = DEFAULT_WINDOW_MS) : StreamThrottlePolicy @@ -78,6 +80,7 @@ public sealed interface StreamThrottlePolicy { * * @property windowMs Minimum time between delivered values. Defaults to 3000ms. */ + @ConsistentCopyVisibility public data class LeadingAndTrailing internal constructor(override val windowMs: Long = DEFAULT_WINDOW_MS) : StreamThrottlePolicy From c2e6ed67a6ffafc199378becf0b7fea2ae71eebd Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Thu, 9 Apr 2026 10:30:41 +0200 Subject: [PATCH 7/8] docs: add Debouncer and Throttler to README feature guides --- README.md | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 108 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index acd9af2..1b825e5 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Rather than duplicating infrastructure code across multiple SDKs, Stream Android - **Authentication**: Token lifecycle management with automatic refresh - **State management**: Connection state, network availability, app lifecycle tracking - **Reliability**: Retry policies, exponential backoff, and connection recovery -- **Performance**: Request deduplication, batching, serial processing queues +- **Performance**: Request deduplication, batching, throttling, serial processing queues - **Thread safety**: Cross-thread execution utilities and subscription management - **Observability**: Structured logging and event propagation @@ -45,7 +45,7 @@ Rather than duplicating infrastructure code across multiple SDKs, Stream Android │ • WebSocket connections • Serial processing │ │ • Token management • Retry logic │ │ • Lifecycle monitoring • Event batching │ -│ • Network detection • Thread utilities │ +│ • Network detection • Throttling & debouncing │ └────────────────────────┬────────────────────────────────┘ │ ┌────────────────────────┴────────────────────────────────┐ @@ -63,7 +63,7 @@ Rather than duplicating infrastructure code across multiple SDKs, Stream Android - Retry policies with backoff strategies - Serial processing queues - Single-flight execution (request deduplication) -- Batching & debouncing +- Batching, debouncing & throttling - Thread-safe subscription management - WebSocket connections with health monitoring - Structured logging @@ -96,6 +96,8 @@ Stream Android Core uses annotations to distinguish between stable public APIs a - [Single-Flight Processor](#single-flight-processor) - [Retry Processor](#retry-processor) - [Batcher](#batcher) + - [Debouncer](#debouncer) + - [Throttler](#throttler) - [Threading Utilities](#threading-utilities) - [Token Management](#token-management) - [WebSocket Connections](#websocket-connections) @@ -532,6 +534,108 @@ batcher.stop().getOrThrow() --- +### Debouncer + +Coalesces rapid state changes into a single settled action. Only the **last** value is delivered after a quiet period. + +#### Basic Usage + +```kotlin +import io.getstream.android.core.api.processing.StreamDebouncer + +val debouncer = StreamDebouncer( + scope = scope, + logger = logger, + delayMs = 300 // Deliver after 300ms of silence +) + +debouncer.onValue { settled -> + // Only called once with the final value + updateSearch(settled) +} + +// Rapid submissions — only "third" is delivered after 300ms of silence +debouncer.submit("first") +debouncer.submit("second") +debouncer.submit("third") + +// Cancel pending delivery +debouncer.cancel() +``` + +#### Use Cases + +- Search-as-you-type (wait for user to stop typing) +- Network/lifecycle state coalescing (avoid reconnection storms) +- UI state settling (wait for animations to finish) + +**Pattern**: Last-write-wins with timer reset on each submission + +--- + +### Throttler + +Rate-limits bursty values with configurable strategies. + +#### Strategies + +```kotlin +import io.getstream.android.core.api.processing.StreamThrottler +import io.getstream.android.core.api.processing.StreamThrottlePolicy + +// Leading: first value immediately, drop the rest until window expires +val typing = StreamThrottler( + scope = scope, + logger = logger, + policy = StreamThrottlePolicy.leading(windowMs = 3_000) +) + +// Trailing: collect during window, deliver last value when it expires +val position = StreamThrottler( + scope = scope, + logger = logger, + policy = StreamThrottlePolicy.trailing(windowMs = 1_000) +) + +// Leading + Trailing: first value immediately, last value at window end +val scroll = StreamThrottler( + scope = scope, + logger = logger, + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 500) +) +``` + +#### Basic Usage + +```kotlin +val throttler = StreamThrottler( + scope = scope, + logger = logger, + policy = StreamThrottlePolicy.leading(windowMs = 3_000) +) + +throttler.onValue { event -> api.sendTypingEvent(event) } + +throttler.submit("typing") // delivered immediately +throttler.submit("typing") // dropped (within window) +// ... 3 seconds pass ... +throttler.submit("typing") // delivered immediately (new window) + +// Reset window manually +throttler.reset() +``` + +#### Use Cases + +- Typing indicators (leading — fire immediately, suppress duplicates) +- Read receipts (leading — don't flood the server) +- Position/progress updates (trailing — only latest state matters) +- Scroll tracking (leading + trailing — responsiveness + final accuracy) + +**Pattern**: Configurable rate-limiting via `StreamThrottlePolicy` + +--- + ### Threading Utilities Safe cross-thread execution with timeout protection. @@ -1136,7 +1240,7 @@ stream-android-core/ │ │ ├── observers/ # Lifecycle & network monitoring │ │ │ ├── lifecycle/ │ │ │ └── network/ -│ │ ├── processing/ # Queue, retry, single-flight, batcher +│ │ ├── processing/ # Queue, retry, single-flight, batcher, debouncer, throttler │ │ ├── subscribe/ # Subscription management │ │ ├── socket/ # WebSocket connections │ │ ├── log/ # Logging infrastructure From 77ffb1141055535d4e28cb834c42c44557e5404e Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Thu, 9 Apr 2026 10:35:36 +0200 Subject: [PATCH 8/8] test(processing): add smoke tests for all processing factory functions Cover StreamSingleFlightProcessor, StreamRetryProcessor, StreamSerialProcessingQueue, StreamBatcher, StreamDebouncer, and StreamThrottler factory functions (all three policy modes). --- .../api/processing/ProcessingFactoryTest.kt | 180 ++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 stream-android-core/src/test/java/io/getstream/android/core/api/processing/ProcessingFactoryTest.kt diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/processing/ProcessingFactoryTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/processing/ProcessingFactoryTest.kt new file mode 100644 index 0000000..f71aa52 --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/processing/ProcessingFactoryTest.kt @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) + +package io.getstream.android.core.api.processing + +import io.getstream.android.core.api.model.StreamTypedKey.Companion.asStreamTypedKey +import io.getstream.android.core.api.model.retry.StreamRetryPolicy +import io.mockk.mockk +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Test + +/** + * Smoke tests for the public factory functions in `api/processing/`. These verify that each factory + * returns a working instance — the detailed behaviour tests live alongside the implementations in + * `internal/processing/`. + */ +class ProcessingFactoryTest { + + @Test + fun `StreamSingleFlightProcessor factory returns working instance`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + + val sf = StreamSingleFlightProcessor(scope) + + val result = sf.run("key".asStreamTypedKey()) { 42 } + testScheduler.runCurrent() + + assertTrue(result.isSuccess) + assertEquals(42, result.getOrThrow()) + } + + @Test + fun `StreamRetryProcessor factory returns working instance`() = runTest { + val rp = StreamRetryProcessor(logger = mockk(relaxed = true)) + + val result = rp.retry(StreamRetryPolicy.fixed(maxRetries = 1)) { "ok" } + + assertTrue(result.isSuccess) + assertEquals("ok", result.getOrThrow()) + } + + @Test + fun `StreamSerialProcessingQueue factory returns working instance`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + + val queue = StreamSerialProcessingQueue(logger = mockk(relaxed = true), scope = scope) + queue.start() + testScheduler.runCurrent() + + val result = queue.submit { "done" } + testScheduler.runCurrent() + + assertTrue(result.isSuccess) + assertEquals("done", result.getOrThrow()) + } + + @Test + fun `StreamBatcher factory returns working instance`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val batches = mutableListOf>() + + val batcher = StreamBatcher(scope = scope, batchSize = 2, initialDelayMs = 100) + batcher.onBatch { items, _, _ -> batches.add(items) } + batcher.start() + testScheduler.runCurrent() + + batcher.enqueue("a") + batcher.enqueue("b") + advanceTimeBy(200) + testScheduler.runCurrent() + + assertTrue(batches.isNotEmpty()) + assertTrue(batches.first().contains("a")) + } + + @Test + fun `StreamDebouncer factory returns working instance`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + + val debouncer = + StreamDebouncer(scope = scope, logger = mockk(relaxed = true), delayMs = 100) + debouncer.onValue { delivered.add(it) } + + debouncer.submit("value") + advanceTimeBy(101) + testScheduler.runCurrent() + + assertEquals(listOf("value"), delivered) + } + + @Test + fun `StreamThrottler factory returns working instance`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + + val throttler = StreamThrottler(scope = scope, logger = mockk(relaxed = true)) + throttler.onValue { delivered.add(it) } + + assertTrue(throttler.submit("first")) + testScheduler.runCurrent() + + assertEquals(listOf("first"), delivered) + } + + @Test + fun `StreamThrottler factory with trailing policy returns working instance`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + + val throttler = + StreamThrottler( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 500), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("pending") + testScheduler.runCurrent() + assertTrue(delivered.isEmpty()) + + advanceTimeBy(501) + testScheduler.runCurrent() + assertEquals(listOf("pending"), delivered) + } + + @Test + fun `StreamThrottler factory with leadingAndTrailing policy returns working instance`() = + runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + + val throttler = + StreamThrottler( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 500), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("leading") + testScheduler.runCurrent() + throttler.submit("trailing") + + advanceTimeBy(501) + testScheduler.runCurrent() + + assertEquals(listOf("leading", "trailing"), delivered) + } +}