diff --git a/README.md b/README.md index acd9af2..1b825e5 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Rather than duplicating infrastructure code across multiple SDKs, Stream Android - **Authentication**: Token lifecycle management with automatic refresh - **State management**: Connection state, network availability, app lifecycle tracking - **Reliability**: Retry policies, exponential backoff, and connection recovery -- **Performance**: Request deduplication, batching, serial processing queues +- **Performance**: Request deduplication, batching, throttling, serial processing queues - **Thread safety**: Cross-thread execution utilities and subscription management - **Observability**: Structured logging and event propagation @@ -45,7 +45,7 @@ Rather than duplicating infrastructure code across multiple SDKs, Stream Android │ • WebSocket connections • Serial processing │ │ • Token management • Retry logic │ │ • Lifecycle monitoring • Event batching │ -│ • Network detection • Thread utilities │ +│ • Network detection • Throttling & debouncing │ └────────────────────────┬────────────────────────────────┘ │ ┌────────────────────────┴────────────────────────────────┐ @@ -63,7 +63,7 @@ Rather than duplicating infrastructure code across multiple SDKs, Stream Android - Retry policies with backoff strategies - Serial processing queues - Single-flight execution (request deduplication) -- Batching & debouncing +- Batching, debouncing & throttling - Thread-safe subscription management - WebSocket connections with health monitoring - Structured logging @@ -96,6 +96,8 @@ Stream Android Core uses annotations to distinguish between stable public APIs a - [Single-Flight Processor](#single-flight-processor) - [Retry Processor](#retry-processor) - [Batcher](#batcher) + - [Debouncer](#debouncer) + - [Throttler](#throttler) - [Threading Utilities](#threading-utilities) - [Token Management](#token-management) - [WebSocket Connections](#websocket-connections) @@ -532,6 +534,108 @@ batcher.stop().getOrThrow() --- +### Debouncer + +Coalesces rapid state changes into a single settled action. Only the **last** value is delivered after a quiet period. + +#### Basic Usage + +```kotlin +import io.getstream.android.core.api.processing.StreamDebouncer + +val debouncer = StreamDebouncer( + scope = scope, + logger = logger, + delayMs = 300 // Deliver after 300ms of silence +) + +debouncer.onValue { settled -> + // Only called once with the final value + updateSearch(settled) +} + +// Rapid submissions — only "third" is delivered after 300ms of silence +debouncer.submit("first") +debouncer.submit("second") +debouncer.submit("third") + +// Cancel pending delivery +debouncer.cancel() +``` + +#### Use Cases + +- Search-as-you-type (wait for user to stop typing) +- Network/lifecycle state coalescing (avoid reconnection storms) +- UI state settling (wait for animations to finish) + +**Pattern**: Last-write-wins with timer reset on each submission + +--- + +### Throttler + +Rate-limits bursty values with configurable strategies. + +#### Strategies + +```kotlin +import io.getstream.android.core.api.processing.StreamThrottler +import io.getstream.android.core.api.processing.StreamThrottlePolicy + +// Leading: first value immediately, drop the rest until window expires +val typing = StreamThrottler( + scope = scope, + logger = logger, + policy = StreamThrottlePolicy.leading(windowMs = 3_000) +) + +// Trailing: collect during window, deliver last value when it expires +val position = StreamThrottler( + scope = scope, + logger = logger, + policy = StreamThrottlePolicy.trailing(windowMs = 1_000) +) + +// Leading + Trailing: first value immediately, last value at window end +val scroll = StreamThrottler( + scope = scope, + logger = logger, + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 500) +) +``` + +#### Basic Usage + +```kotlin +val throttler = StreamThrottler( + scope = scope, + logger = logger, + policy = StreamThrottlePolicy.leading(windowMs = 3_000) +) + +throttler.onValue { event -> api.sendTypingEvent(event) } + +throttler.submit("typing") // delivered immediately +throttler.submit("typing") // dropped (within window) +// ... 3 seconds pass ... +throttler.submit("typing") // delivered immediately (new window) + +// Reset window manually +throttler.reset() +``` + +#### Use Cases + +- Typing indicators (leading — fire immediately, suppress duplicates) +- Read receipts (leading — don't flood the server) +- Position/progress updates (trailing — only latest state matters) +- Scroll tracking (leading + trailing — responsiveness + final accuracy) + +**Pattern**: Configurable rate-limiting via `StreamThrottlePolicy` + +--- + ### Threading Utilities Safe cross-thread execution with timeout protection. @@ -1136,7 +1240,7 @@ stream-android-core/ │ │ ├── observers/ # Lifecycle & network monitoring │ │ │ ├── lifecycle/ │ │ │ └── network/ -│ │ ├── processing/ # Queue, retry, single-flight, batcher +│ │ ├── processing/ # Queue, retry, single-flight, batcher, debouncer, throttler │ │ ├── subscribe/ # Subscription management │ │ ├── socket/ # WebSocket connections │ │ ├── log/ # Logging infrastructure diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt new file mode 100644 index 0000000..9943705 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottlePolicy.kt @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.processing + +import io.getstream.android.core.annotations.StreamInternalApi + +/** + * Throttle strategy that controls **when** values are delivered within a time window. + * + * ### Strategies + * + * **[Leading]** — First value in the window is delivered immediately. Subsequent values are dropped + * until the window expires. + * + * ``` + * submit: A---B-C-D-----------E-F--- + * output: A--------------------E----- + * |--- windowMs ---| |--- windowMs ---| + * ``` + * + * **[Trailing]** — Values are collected during the window. Only the **last** value is delivered + * when the window expires. + * + * ``` + * submit: A---B-C-D-----------E-F--- + * output: -----------D--------------F + * |--- windowMs ---| |--- windowMs ---| + * ``` + * + * **[LeadingAndTrailing]** — First value delivered immediately. The last value in the window is + * also delivered when the window expires (if different from the leading value). + * + * ``` + * submit: A---B-C-D-----------E-F--- + * output: A----------D--------E----F + * |--- windowMs ---| |--- windowMs ---| + * ``` + */ +@StreamInternalApi +public sealed interface StreamThrottlePolicy { + + /** Minimum time in milliseconds between accepted values. */ + public val windowMs: Long + + /** + * First value passes immediately; subsequent values within the window are dropped. + * + * @property windowMs Minimum time between delivered values. Defaults to 3000ms. + */ + @ConsistentCopyVisibility + public data class Leading + internal constructor(override val windowMs: Long = DEFAULT_WINDOW_MS) : StreamThrottlePolicy + + /** + * Values collected during the window; only the last value is delivered when the window expires. + * + * @property windowMs Collection window duration. Defaults to 3000ms. + */ + @ConsistentCopyVisibility + public data class Trailing + internal constructor(override val windowMs: Long = DEFAULT_WINDOW_MS) : StreamThrottlePolicy + + /** + * First value passes immediately; the last value is also delivered when the window expires (if + * a newer value was submitted during the window). + * + * @property windowMs Minimum time between delivered values. Defaults to 3000ms. + */ + @ConsistentCopyVisibility + public data class LeadingAndTrailing + internal constructor(override val windowMs: Long = DEFAULT_WINDOW_MS) : StreamThrottlePolicy + + /** Factory methods for creating [StreamThrottlePolicy] instances. */ + public companion object { + + /** Default throttle window: 3 seconds. */ + public const val DEFAULT_WINDOW_MS: Long = 3_000L + + /** Creates a [Leading] policy with the given [windowMs]. */ + public fun leading(windowMs: Long = DEFAULT_WINDOW_MS): StreamThrottlePolicy = + Leading(windowMs).validate() + + /** Creates a [Trailing] policy with the given [windowMs]. */ + public fun trailing(windowMs: Long = DEFAULT_WINDOW_MS): StreamThrottlePolicy = + Trailing(windowMs).validate() + + /** Creates a [LeadingAndTrailing] policy with the given [windowMs]. */ + public fun leadingAndTrailing(windowMs: Long = DEFAULT_WINDOW_MS): StreamThrottlePolicy = + LeadingAndTrailing(windowMs).validate() + + private fun T.validate(): T { + require(windowMs > 0) { "windowMs must be > 0, was $windowMs" } + return this + } + } +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt new file mode 100644 index 0000000..756fd39 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.processing + +import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.internal.processing.StreamThrottlerImpl +import kotlinx.coroutines.CoroutineScope + +/** + * Rate-limits a bursty stream of values so that at most **one** value is delivered per time window. + * + * Behaviour depends on the [StreamThrottlePolicy]: + * - **[Leading][StreamThrottlePolicy.Leading]:** First value delivered immediately, rest dropped + * until window expires. Best for: typing indicators, presence updates. + * - **[Trailing][StreamThrottlePolicy.Trailing]:** Values collected during the window, only the + * **last** value delivered when the window expires. Best for: position/progress updates where + * latest state matters. + * - **[LeadingAndTrailing][StreamThrottlePolicy.LeadingAndTrailing]:** First value delivered + * immediately, AND the last value delivered when the window expires (if a newer value was + * submitted during the window). Best for: scroll position tracking, where both responsiveness and + * final accuracy matter. + * + * ### Semantics + * - **Thread-safety:** All functions are safe to call from multiple coroutines. + * - **Cancellation:** [reset] clears any pending trailing delivery and resets the window. + * + * ### Usage + * + * ```kotlin + * // Typing indicators — leading edge, fire immediately, suppress duplicates + * val throttler = StreamThrottler(scope, logger, policy = StreamThrottlePolicy.leading()) + * throttler.onValue { event -> api.sendTypingEvent(event) } + * + * // Position updates — trailing edge, only latest state matters + * val throttler = StreamThrottler(scope, logger, policy = StreamThrottlePolicy.trailing()) + * + * // Scroll tracking — both edges + * val throttler = StreamThrottler(scope, logger, policy = StreamThrottlePolicy.leadingAndTrailing()) + * ``` + */ +@StreamInternalApi +public interface StreamThrottler { + /** + * Registers the handler invoked when a value passes through the throttle. + * + * @param callback Suspend function called with the accepted value. + */ + public fun onValue(callback: suspend (T) -> Unit) + + /** + * Submits a value. Behaviour depends on the configured [StreamThrottlePolicy]. + * + * @param value The value to throttle. + * @return `true` if the value was accepted for delivery (immediate or pending), `false` if it + * was dropped entirely. + */ + public fun submit(value: T): Boolean + + /** Resets the throttle window and discards any pending trailing delivery. */ + public fun reset() +} + +/** + * Creates a new [StreamThrottler] instance. + * + * @param T The type of value being throttled. + * @param scope Coroutine scope for launching the delivery. + * @param logger Logger for diagnostics. + * @param policy The throttle strategy. Defaults to [StreamThrottlePolicy.leading] with 3000ms + * window. + * @return A new [StreamThrottler] instance. + */ +@StreamInternalApi +public fun StreamThrottler( + scope: CoroutineScope, + logger: StreamLogger, + policy: StreamThrottlePolicy = StreamThrottlePolicy.leading(), +): StreamThrottler = StreamThrottlerImpl(scope = scope, logger = logger, policy = policy) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt new file mode 100644 index 0000000..2b4ef28 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.processing + +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.processing.StreamThrottlePolicy +import io.getstream.android.core.api.processing.StreamThrottler +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +/** + * Throttler implementation that supports [Leading][StreamThrottlePolicy.Leading], + * [Trailing][StreamThrottlePolicy.Trailing], and + * [LeadingAndTrailing][StreamThrottlePolicy.LeadingAndTrailing] strategies. + * + * A shared window-expiry job tracks the active coroutine across all modes. Resetting cancels it to + * prevent stale coroutines from prematurely closing a new window. + */ +internal class StreamThrottlerImpl( + private val scope: CoroutineScope, + private val logger: StreamLogger, + private val policy: StreamThrottlePolicy, +) : StreamThrottler { + + private val windowActive = AtomicBoolean(false) + private val callbackRef = AtomicReference Unit> {} + private val trailingValue = AtomicReference(null) + private val windowJob = AtomicReference(null) + + override fun onValue(callback: suspend (T) -> Unit) { + callbackRef.set(callback) + } + + override fun submit(value: T): Boolean = + when (policy) { + is StreamThrottlePolicy.Leading -> submitLeading(value) + is StreamThrottlePolicy.Trailing -> submitTrailing(value) + is StreamThrottlePolicy.LeadingAndTrailing -> submitLeadingAndTrailing(value) + } + + override fun reset() { + windowJob.getAndSet(null)?.cancel() + trailingValue.set(null) + windowActive.set(false) + } + + private fun submitLeading(value: T): Boolean { + val accepted = windowActive.compareAndSet(false, true) + if (!accepted) { + logger.v { "[throttle:leading] Dropped: $value" } + } else { + logger.v { "[throttle:leading] Accepted: $value" } + scope.launch { callbackRef.get().invoke(value) } + scheduleWindowExpiry(deliverTrailing = false) + } + return accepted + } + + private fun submitTrailing(value: T): Boolean { + trailingValue.set(value) + if (windowActive.compareAndSet(false, true)) { + logger.v { "[throttle:trailing] Window started, pending: $value" } + scheduleWindowExpiry(deliverTrailing = true) + } else { + logger.v { "[throttle:trailing] Updated pending: $value" } + } + return true + } + + private fun submitLeadingAndTrailing(value: T): Boolean { + val isLeading = windowActive.compareAndSet(false, true) + if (isLeading) { + logger.v { "[throttle:leading+trailing] Leading: $value" } + trailingValue.set(null) + scope.launch { callbackRef.get().invoke(value) } + scheduleWindowExpiry(deliverTrailing = true) + } else { + logger.v { "[throttle:leading+trailing] Pending trailing: $value" } + trailingValue.set(value) + } + return true + } + + private fun scheduleWindowExpiry(deliverTrailing: Boolean) { + val job = + scope.launch { + delay(policy.windowMs) + windowActive.set(false) + if (deliverTrailing) { + val pending = trailingValue.getAndSet(null) + if (pending != null) { + logger.v { "[throttle] Trailing delivery: $pending" } + callbackRef.get().invoke(pending) + } + } + } + windowJob.getAndSet(job)?.cancel() + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/processing/ProcessingFactoryTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/processing/ProcessingFactoryTest.kt new file mode 100644 index 0000000..f71aa52 --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/processing/ProcessingFactoryTest.kt @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) + +package io.getstream.android.core.api.processing + +import io.getstream.android.core.api.model.StreamTypedKey.Companion.asStreamTypedKey +import io.getstream.android.core.api.model.retry.StreamRetryPolicy +import io.mockk.mockk +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Test + +/** + * Smoke tests for the public factory functions in `api/processing/`. These verify that each factory + * returns a working instance — the detailed behaviour tests live alongside the implementations in + * `internal/processing/`. + */ +class ProcessingFactoryTest { + + @Test + fun `StreamSingleFlightProcessor factory returns working instance`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + + val sf = StreamSingleFlightProcessor(scope) + + val result = sf.run("key".asStreamTypedKey()) { 42 } + testScheduler.runCurrent() + + assertTrue(result.isSuccess) + assertEquals(42, result.getOrThrow()) + } + + @Test + fun `StreamRetryProcessor factory returns working instance`() = runTest { + val rp = StreamRetryProcessor(logger = mockk(relaxed = true)) + + val result = rp.retry(StreamRetryPolicy.fixed(maxRetries = 1)) { "ok" } + + assertTrue(result.isSuccess) + assertEquals("ok", result.getOrThrow()) + } + + @Test + fun `StreamSerialProcessingQueue factory returns working instance`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + + val queue = StreamSerialProcessingQueue(logger = mockk(relaxed = true), scope = scope) + queue.start() + testScheduler.runCurrent() + + val result = queue.submit { "done" } + testScheduler.runCurrent() + + assertTrue(result.isSuccess) + assertEquals("done", result.getOrThrow()) + } + + @Test + fun `StreamBatcher factory returns working instance`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val batches = mutableListOf>() + + val batcher = StreamBatcher(scope = scope, batchSize = 2, initialDelayMs = 100) + batcher.onBatch { items, _, _ -> batches.add(items) } + batcher.start() + testScheduler.runCurrent() + + batcher.enqueue("a") + batcher.enqueue("b") + advanceTimeBy(200) + testScheduler.runCurrent() + + assertTrue(batches.isNotEmpty()) + assertTrue(batches.first().contains("a")) + } + + @Test + fun `StreamDebouncer factory returns working instance`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + + val debouncer = + StreamDebouncer(scope = scope, logger = mockk(relaxed = true), delayMs = 100) + debouncer.onValue { delivered.add(it) } + + debouncer.submit("value") + advanceTimeBy(101) + testScheduler.runCurrent() + + assertEquals(listOf("value"), delivered) + } + + @Test + fun `StreamThrottler factory returns working instance`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + + val throttler = StreamThrottler(scope = scope, logger = mockk(relaxed = true)) + throttler.onValue { delivered.add(it) } + + assertTrue(throttler.submit("first")) + testScheduler.runCurrent() + + assertEquals(listOf("first"), delivered) + } + + @Test + fun `StreamThrottler factory with trailing policy returns working instance`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + + val throttler = + StreamThrottler( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 500), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("pending") + testScheduler.runCurrent() + assertTrue(delivered.isEmpty()) + + advanceTimeBy(501) + testScheduler.runCurrent() + assertEquals(listOf("pending"), delivered) + } + + @Test + fun `StreamThrottler factory with leadingAndTrailing policy returns working instance`() = + runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + + val throttler = + StreamThrottler( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 500), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("leading") + testScheduler.runCurrent() + throttler.submit("trailing") + + advanceTimeBy(501) + testScheduler.runCurrent() + + assertEquals(listOf("leading", "trailing"), delivered) + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt new file mode 100644 index 0000000..371702e --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt @@ -0,0 +1,727 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) + +package io.getstream.android.core.internal.processing + +import io.getstream.android.core.api.processing.StreamThrottlePolicy +import io.mockk.mockk +import kotlin.test.assertFailsWith +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse +import org.junit.Assert.assertTrue +import org.junit.Test + +class StreamThrottlerImplTest { + + // ---- Leading mode ---- + + @Test + fun `leading - first value is delivered immediately`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + val accepted = throttler.submit("first") + testScheduler.runCurrent() + + assertTrue(accepted) + assertEquals(listOf("first"), delivered) + } + + @Test + fun `leading - second value within window is dropped`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + assertTrue(throttler.submit("first")) + testScheduler.runCurrent() + assertFalse(throttler.submit("second")) + testScheduler.runCurrent() + + assertEquals(listOf("first"), delivered) + } + + @Test + fun `leading - value after window expires is delivered`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("first") + testScheduler.runCurrent() + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertTrue(throttler.submit("second")) + testScheduler.runCurrent() + + assertEquals(listOf("first", "second"), delivered) + } + + @Test + fun `leading - rapid burst delivers only the first value`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + val results = (1..50).map { throttler.submit(it) } + testScheduler.runCurrent() + + assertTrue(results.first()) + assertTrue(results.drop(1).all { !it }) + assertEquals(listOf(1), delivered) + } + + @Test + fun `leading - multiple windows deliver one value each`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 500), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("a") + testScheduler.runCurrent() + advanceTimeBy(501) + testScheduler.runCurrent() + + throttler.submit("b") + testScheduler.runCurrent() + advanceTimeBy(501) + testScheduler.runCurrent() + + throttler.submit("c") + testScheduler.runCurrent() + + assertEquals(listOf("a", "b", "c"), delivered) + } + + @Test + fun `leading - reset allows immediate delivery`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("first") + testScheduler.runCurrent() + throttler.reset() + + assertTrue(throttler.submit("after-reset")) + testScheduler.runCurrent() + + assertEquals(listOf("first", "after-reset"), delivered) + } + + @Test + fun `leading - stale window expiry does not close new window after reset`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + // t=0: submit A, starts window expiry at t=1000 + throttler.submit("A") + testScheduler.runCurrent() + + // t=300: reset cancels old window job + advanceTimeBy(300) + throttler.reset() + + // t=300: submit B, starts new window expiry at t=1300 + assertTrue(throttler.submit("B")) + testScheduler.runCurrent() + + // t=1000: old delay WOULD have fired here — must NOT clear the window + advanceTimeBy(700) + testScheduler.runCurrent() + + // C should be rejected because B's window (until t=1300) is still active + assertFalse(throttler.submit("C")) + + // t=1301: B's window expires + advanceTimeBy(301) + testScheduler.runCurrent() + assertTrue(throttler.submit("D")) + testScheduler.runCurrent() + + assertEquals(listOf("A", "B", "D"), delivered) + } + + @Test + fun `leading - no crash when no callback registered`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + + assertTrue(throttler.submit("orphan")) + testScheduler.runCurrent() + } + + @Test + fun `leading - window reopens exactly after windowMs`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("first") + testScheduler.runCurrent() + + advanceTimeBy(999) + testScheduler.runCurrent() + assertFalse(throttler.submit("too-early")) + + advanceTimeBy(2) + testScheduler.runCurrent() + assertTrue(throttler.submit("on-time")) + testScheduler.runCurrent() + + assertEquals(listOf("first", "on-time"), delivered) + } + + // ---- Trailing mode ---- + + @Test + fun `trailing - single value delivered after window expires`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + assertTrue(throttler.submit("first")) + testScheduler.runCurrent() + assertTrue(delivered.isEmpty()) + + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertEquals(listOf("first"), delivered) + } + + @Test + fun `trailing - last value wins within window`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + assertTrue(throttler.submit("first")) + assertTrue(throttler.submit("second")) + assertTrue(throttler.submit("third")) + testScheduler.runCurrent() + assertTrue(delivered.isEmpty()) + + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertEquals(listOf("third"), delivered) + } + + @Test + fun `trailing - all submits return true`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 1_000), + ) + throttler.onValue {} + + assertTrue(throttler.submit("a")) + assertTrue(throttler.submit("b")) + assertTrue(throttler.submit("c")) + } + + @Test + fun `trailing - multiple windows deliver last value each`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 500), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("a1") + throttler.submit("a2") + advanceTimeBy(501) + testScheduler.runCurrent() + + throttler.submit("b1") + throttler.submit("b2") + throttler.submit("b3") + advanceTimeBy(501) + testScheduler.runCurrent() + + assertEquals(listOf("a2", "b3"), delivered) + } + + @Test + fun `trailing - reset cancels pending delivery`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("pending") + testScheduler.runCurrent() + throttler.reset() + + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertTrue(delivered.isEmpty()) + } + + @Test + fun `trailing - submit after reset starts new window`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 500), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("cancelled") + throttler.reset() + + throttler.submit("fresh") + advanceTimeBy(501) + testScheduler.runCurrent() + + assertEquals(listOf("fresh"), delivered) + } + + // ---- LeadingAndTrailing mode ---- + + @Test + fun `leadingAndTrailing - first value delivered immediately`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("first") + testScheduler.runCurrent() + + assertEquals(listOf("first"), delivered) + } + + @Test + fun `leadingAndTrailing - trailing value delivered at window end`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("first") + testScheduler.runCurrent() + throttler.submit("second") + throttler.submit("third") + + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertEquals(listOf("first", "third"), delivered) + } + + @Test + fun `leadingAndTrailing - no trailing delivery if no new values`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("only-one") + testScheduler.runCurrent() + + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertEquals(listOf("only-one"), delivered) + } + + @Test + fun `leadingAndTrailing - all submits return true`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 1_000), + ) + throttler.onValue {} + + assertTrue(throttler.submit("a")) + assertTrue(throttler.submit("b")) + assertTrue(throttler.submit("c")) + } + + @Test + fun `leadingAndTrailing - multiple windows`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 500), + ) + throttler.onValue { delivered.add(it) } + + // Window 1: leading=a, trailing=c + throttler.submit("a") + testScheduler.runCurrent() + throttler.submit("b") + throttler.submit("c") + advanceTimeBy(501) + testScheduler.runCurrent() + + // Window 2: leading=d, no trailing + throttler.submit("d") + testScheduler.runCurrent() + advanceTimeBy(501) + testScheduler.runCurrent() + + assertEquals(listOf("a", "c", "d"), delivered) + } + + @Test + fun `leadingAndTrailing - reset clears pending trailing`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + throttler.submit("leading") + testScheduler.runCurrent() + throttler.submit("pending-trailing") + throttler.reset() + + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertEquals(listOf("leading"), delivered) + } + + @Test + fun `leadingAndTrailing - rapid burst delivers first and last`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + (1..50).forEach { throttler.submit(it) } + testScheduler.runCurrent() + + advanceTimeBy(1_001) + testScheduler.runCurrent() + + assertEquals(listOf(1, 50), delivered) + } + + // ---- Edge cases ---- + + @Test + fun `policy rejects zero windowMs`() { + assertFailsWith { StreamThrottlePolicy.leading(windowMs = 0) } + assertFailsWith { StreamThrottlePolicy.trailing(windowMs = 0) } + assertFailsWith { + StreamThrottlePolicy.leadingAndTrailing(windowMs = 0) + } + } + + @Test + fun `policy rejects negative windowMs`() { + assertFailsWith { StreamThrottlePolicy.leading(windowMs = -1) } + } + + @Test + fun `leading - callback exception does not permanently lock throttler`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val handler = CoroutineExceptionHandler { _, _ -> /* swallow */ } + val scope = CoroutineScope(SupervisorJob() + dispatcher + handler) + val delivered = mutableListOf() + var shouldThrow = true + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 500), + ) + throttler.onValue { + if (shouldThrow) { + @Suppress("TooGenericExceptionThrown") throw RuntimeException("boom") + } + delivered.add(it) + } + + // First submit — callback throws, but window timer still runs + throttler.submit("explode") + testScheduler.runCurrent() + + // Wait for window to expire + advanceTimeBy(501) + testScheduler.runCurrent() + + // Window should have expired, next submit should work + shouldThrow = false + assertTrue(throttler.submit("recover")) + testScheduler.runCurrent() + + assertEquals(listOf("recover"), delivered) + } + + @Test + fun `trailing - stale window expiry does not close new window after reset`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.trailing(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + // t=0: submit A, starts window expiry at t=1000 + throttler.submit("A") + testScheduler.runCurrent() + + // t=300: reset cancels old window + advanceTimeBy(300) + throttler.reset() + + // t=300: submit B, starts new window expiry at t=1300 + throttler.submit("B") + testScheduler.runCurrent() + + // t=1000: old delay would have fired — must NOT deliver A or close B's window + advanceTimeBy(700) + testScheduler.runCurrent() + assertTrue(delivered.isEmpty()) + + // t=1300: update to C while B's window is still active + throttler.submit("C") + + // t=1301: B's window expires, delivers latest value (C) + advanceTimeBy(301) + testScheduler.runCurrent() + + assertEquals(listOf("C"), delivered) + } + + @Test + fun `leadingAndTrailing - trailing delivery allows new window to start`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 500), + ) + throttler.onValue { delivered.add(it) } + + // Window 1 + throttler.submit("leading-1") + testScheduler.runCurrent() + throttler.submit("trailing-1") + advanceTimeBy(501) + testScheduler.runCurrent() + + assertEquals(listOf("leading-1", "trailing-1"), delivered) + + // Window 2 should start fresh + assertTrue(throttler.submit("leading-2")) + testScheduler.runCurrent() + + assertEquals(listOf("leading-1", "trailing-1", "leading-2"), delivered) + } + + @Test + fun `double reset does not crash`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val scope = CoroutineScope(SupervisorJob() + dispatcher) + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + + throttler.submit("a") + testScheduler.runCurrent() + throttler.reset() + throttler.reset() + + assertTrue(throttler.submit("b")) + } + + @Test + fun `submit after scope cancellation returns true but does not deliver`() = runTest { + val dispatcher = StandardTestDispatcher(testScheduler) + val parentJob = SupervisorJob() + val scope = CoroutineScope(parentJob + dispatcher) + val delivered = mutableListOf() + val throttler = + StreamThrottlerImpl( + scope = scope, + logger = mockk(relaxed = true), + policy = StreamThrottlePolicy.leading(windowMs = 1_000), + ) + throttler.onValue { delivered.add(it) } + + // Cancel the scope + parentJob.cancel() + testScheduler.runCurrent() + + // Submit returns true (CAS succeeds) but callback launch is dead + val result = throttler.submit("dead") + testScheduler.runCurrent() + + assertTrue(result) + assertTrue(delivered.isEmpty()) + } +}