Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 108 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Rather than duplicating infrastructure code across multiple SDKs, Stream Android
- **Authentication**: Token lifecycle management with automatic refresh
- **State management**: Connection state, network availability, app lifecycle tracking
- **Reliability**: Retry policies, exponential backoff, and connection recovery
- **Performance**: Request deduplication, batching, serial processing queues
- **Performance**: Request deduplication, batching, throttling, serial processing queues
- **Thread safety**: Cross-thread execution utilities and subscription management
- **Observability**: Structured logging and event propagation

Expand All @@ -45,7 +45,7 @@ Rather than duplicating infrastructure code across multiple SDKs, Stream Android
│ • WebSocket connections • Serial processing │
│ • Token management • Retry logic │
│ • Lifecycle monitoring • Event batching │
│ • Network detection • Thread utilities
│ • Network detection • Throttling & debouncing
└────────────────────────┬────────────────────────────────┘
┌────────────────────────┴────────────────────────────────┐
Expand All @@ -63,7 +63,7 @@ Rather than duplicating infrastructure code across multiple SDKs, Stream Android
- Retry policies with backoff strategies
- Serial processing queues
- Single-flight execution (request deduplication)
- Batching & debouncing
- Batching, debouncing & throttling
- Thread-safe subscription management
- WebSocket connections with health monitoring
- Structured logging
Expand Down Expand Up @@ -96,6 +96,8 @@ Stream Android Core uses annotations to distinguish between stable public APIs a
- [Single-Flight Processor](#single-flight-processor)
- [Retry Processor](#retry-processor)
- [Batcher](#batcher)
- [Debouncer](#debouncer)
- [Throttler](#throttler)
- [Threading Utilities](#threading-utilities)
- [Token Management](#token-management)
- [WebSocket Connections](#websocket-connections)
Expand Down Expand Up @@ -532,6 +534,108 @@ batcher.stop().getOrThrow()

---

### Debouncer

Coalesces rapid state changes into a single settled action. Only the **last** value is delivered after a quiet period.

#### Basic Usage

```kotlin
import io.getstream.android.core.api.processing.StreamDebouncer

val debouncer = StreamDebouncer<String>(
scope = scope,
logger = logger,
delayMs = 300 // Deliver after 300ms of silence
)

debouncer.onValue { settled ->
// Only called once with the final value
updateSearch(settled)
}

// Rapid submissions — only "third" is delivered after 300ms of silence
debouncer.submit("first")
debouncer.submit("second")
debouncer.submit("third")

// Cancel pending delivery
debouncer.cancel()
```

#### Use Cases

- Search-as-you-type (wait for user to stop typing)
- Network/lifecycle state coalescing (avoid reconnection storms)
- UI state settling (wait for animations to finish)

**Pattern**: Last-write-wins with timer reset on each submission

---

### Throttler

Rate-limits bursty values with configurable strategies.

#### Strategies

```kotlin
import io.getstream.android.core.api.processing.StreamThrottler
import io.getstream.android.core.api.processing.StreamThrottlePolicy

// Leading: first value immediately, drop the rest until window expires
val typing = StreamThrottler<TypingEvent>(
scope = scope,
logger = logger,
policy = StreamThrottlePolicy.leading(windowMs = 3_000)
)

// Trailing: collect during window, deliver last value when it expires
val position = StreamThrottler<Position>(
scope = scope,
logger = logger,
policy = StreamThrottlePolicy.trailing(windowMs = 1_000)
)

// Leading + Trailing: first value immediately, last value at window end
val scroll = StreamThrottler<Int>(
scope = scope,
logger = logger,
policy = StreamThrottlePolicy.leadingAndTrailing(windowMs = 500)
)
```

#### Basic Usage

```kotlin
val throttler = StreamThrottler<String>(
scope = scope,
logger = logger,
policy = StreamThrottlePolicy.leading(windowMs = 3_000)
)

throttler.onValue { event -> api.sendTypingEvent(event) }

throttler.submit("typing") // delivered immediately
throttler.submit("typing") // dropped (within window)
// ... 3 seconds pass ...
throttler.submit("typing") // delivered immediately (new window)

// Reset window manually
throttler.reset()
```

#### Use Cases

- Typing indicators (leading — fire immediately, suppress duplicates)
- Read receipts (leading — don't flood the server)
- Position/progress updates (trailing — only latest state matters)
- Scroll tracking (leading + trailing — responsiveness + final accuracy)

**Pattern**: Configurable rate-limiting via `StreamThrottlePolicy`

---

### Threading Utilities

Safe cross-thread execution with timeout protection.
Expand Down Expand Up @@ -1136,7 +1240,7 @@ stream-android-core/
│ │ ├── observers/ # Lifecycle & network monitoring
│ │ │ ├── lifecycle/
│ │ │ └── network/
│ │ ├── processing/ # Queue, retry, single-flight, batcher
│ │ ├── processing/ # Queue, retry, single-flight, batcher, debouncer, throttler
│ │ ├── subscribe/ # Subscription management
│ │ ├── socket/ # WebSocket connections
│ │ ├── log/ # Logging infrastructure
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.android.core.api.processing

import io.getstream.android.core.annotations.StreamInternalApi

/**
* Throttle strategy that controls **when** values are delivered within a time window.
*
* ### Strategies
*
* **[Leading]** — First value in the window is delivered immediately. Subsequent values are dropped
* until the window expires.
*
* ```
* submit: A---B-C-D-----------E-F---
* output: A--------------------E-----
* |--- windowMs ---| |--- windowMs ---|
* ```
*
* **[Trailing]** — Values are collected during the window. Only the **last** value is delivered
* when the window expires.
*
* ```
* submit: A---B-C-D-----------E-F---
* output: -----------D--------------F
* |--- windowMs ---| |--- windowMs ---|
* ```
*
* **[LeadingAndTrailing]** — First value delivered immediately. The last value in the window is
* also delivered when the window expires (if different from the leading value).
*
* ```
* submit: A---B-C-D-----------E-F---
* output: A----------D--------E----F
* |--- windowMs ---| |--- windowMs ---|
* ```
*/
@StreamInternalApi
public sealed interface StreamThrottlePolicy {

/** Minimum time in milliseconds between accepted values. */
public val windowMs: Long

/**
* First value passes immediately; subsequent values within the window are dropped.
*
* @property windowMs Minimum time between delivered values. Defaults to 3000ms.
*/
@ConsistentCopyVisibility
public data class Leading
internal constructor(override val windowMs: Long = DEFAULT_WINDOW_MS) : StreamThrottlePolicy

/**
* Values collected during the window; only the last value is delivered when the window expires.
*
* @property windowMs Collection window duration. Defaults to 3000ms.
*/
@ConsistentCopyVisibility
public data class Trailing
internal constructor(override val windowMs: Long = DEFAULT_WINDOW_MS) : StreamThrottlePolicy

/**
* First value passes immediately; the last value is also delivered when the window expires (if
* a newer value was submitted during the window).
*
* @property windowMs Minimum time between delivered values. Defaults to 3000ms.
*/
@ConsistentCopyVisibility
public data class LeadingAndTrailing
internal constructor(override val windowMs: Long = DEFAULT_WINDOW_MS) : StreamThrottlePolicy

/** Factory methods for creating [StreamThrottlePolicy] instances. */
public companion object {

/** Default throttle window: 3 seconds. */
public const val DEFAULT_WINDOW_MS: Long = 3_000L

/** Creates a [Leading] policy with the given [windowMs]. */
public fun leading(windowMs: Long = DEFAULT_WINDOW_MS): StreamThrottlePolicy =
Leading(windowMs).validate()

/** Creates a [Trailing] policy with the given [windowMs]. */
public fun trailing(windowMs: Long = DEFAULT_WINDOW_MS): StreamThrottlePolicy =
Trailing(windowMs).validate()

/** Creates a [LeadingAndTrailing] policy with the given [windowMs]. */
public fun leadingAndTrailing(windowMs: Long = DEFAULT_WINDOW_MS): StreamThrottlePolicy =
LeadingAndTrailing(windowMs).validate()

private fun <T : StreamThrottlePolicy> T.validate(): T {
require(windowMs > 0) { "windowMs must be > 0, was $windowMs" }
return this
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.android.core.api.processing

import io.getstream.android.core.annotations.StreamInternalApi
import io.getstream.android.core.api.log.StreamLogger
import io.getstream.android.core.internal.processing.StreamThrottlerImpl
import kotlinx.coroutines.CoroutineScope

/**
* Rate-limits a bursty stream of values so that at most **one** value is delivered per time window.
*
* Behaviour depends on the [StreamThrottlePolicy]:
* - **[Leading][StreamThrottlePolicy.Leading]:** First value delivered immediately, rest dropped
* until window expires. Best for: typing indicators, presence updates.
* - **[Trailing][StreamThrottlePolicy.Trailing]:** Values collected during the window, only the
* **last** value delivered when the window expires. Best for: position/progress updates where
* latest state matters.
* - **[LeadingAndTrailing][StreamThrottlePolicy.LeadingAndTrailing]:** First value delivered
* immediately, AND the last value delivered when the window expires (if a newer value was
* submitted during the window). Best for: scroll position tracking, where both responsiveness and
* final accuracy matter.
*
* ### Semantics
* - **Thread-safety:** All functions are safe to call from multiple coroutines.
* - **Cancellation:** [reset] clears any pending trailing delivery and resets the window.
*
* ### Usage
*
* ```kotlin
* // Typing indicators — leading edge, fire immediately, suppress duplicates
* val throttler = StreamThrottler<TypingEvent>(scope, logger, policy = StreamThrottlePolicy.leading())
* throttler.onValue { event -> api.sendTypingEvent(event) }
*
* // Position updates — trailing edge, only latest state matters
* val throttler = StreamThrottler<Position>(scope, logger, policy = StreamThrottlePolicy.trailing())
*
* // Scroll tracking — both edges
* val throttler = StreamThrottler<Int>(scope, logger, policy = StreamThrottlePolicy.leadingAndTrailing())
* ```
*/
@StreamInternalApi
public interface StreamThrottler<T> {
/**
* Registers the handler invoked when a value passes through the throttle.
*
* @param callback Suspend function called with the accepted value.
*/
public fun onValue(callback: suspend (T) -> Unit)

/**
* Submits a value. Behaviour depends on the configured [StreamThrottlePolicy].
*
* @param value The value to throttle.
* @return `true` if the value was accepted for delivery (immediate or pending), `false` if it
* was dropped entirely.
*/
public fun submit(value: T): Boolean

/** Resets the throttle window and discards any pending trailing delivery. */
public fun reset()
}

/**
* Creates a new [StreamThrottler] instance.
*
* @param T The type of value being throttled.
* @param scope Coroutine scope for launching the delivery.
* @param logger Logger for diagnostics.
* @param policy The throttle strategy. Defaults to [StreamThrottlePolicy.leading] with 3000ms
* window.
* @return A new [StreamThrottler] instance.
*/
@StreamInternalApi
public fun <T> StreamThrottler(
scope: CoroutineScope,
logger: StreamLogger,
policy: StreamThrottlePolicy = StreamThrottlePolicy.leading(),
): StreamThrottler<T> = StreamThrottlerImpl(scope = scope, logger = logger, policy = policy)
Loading
Loading