Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions app/src/main/java/io/getstream/android/core/sample/SampleApp.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.getstream.android.core.api.StreamClient
import io.getstream.android.core.api.authentication.StreamTokenProvider
import io.getstream.android.core.api.model.StreamUser
import io.getstream.android.core.api.model.config.StreamClientSerializationConfig
import io.getstream.android.core.api.model.config.StreamSocketConfig
import io.getstream.android.core.api.model.value.StreamApiKey
import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader
import io.getstream.android.core.api.model.value.StreamToken
Expand Down Expand Up @@ -60,23 +61,8 @@ class SampleApp : Application() {
StreamClient(
context = this.applicationContext,
scope = coroutinesScope,
apiKey = StreamApiKey.fromString("pd67s34fzpgw"),
user = user,
products = listOf("feeds", "chat", "video"),
wsUrl =
StreamWsUrl.fromString(
"wss://chat-edge-frankfurt-ce1.stream-io-api.com/api/v2/connect"
),
clientInfoHeader =
StreamHttpClientInfoHeader.create(
product = "android-core",
productVersion = "1.1.0",
os = "Android",
apiLevel = Build.VERSION.SDK_INT,
deviceModel = "Pixel 7 Pro",
app = "Stream Android Core Sample",
appVersion = "1.0.0",
),
tokenProvider =
object : StreamTokenProvider {
override suspend fun loadToken(userId: StreamUserId): StreamToken {
Expand All @@ -92,6 +78,24 @@ class SampleApp : Application() {
Result.success(Unit)
}
),
socketConfig =
StreamSocketConfig.jwt(
url =
StreamWsUrl.fromString(
"wss://chat-edge-frankfurt-ce1.stream-io-api.com/api/v2/connect"
),
apiKey = StreamApiKey.fromString("pd67s34fzpgw"),
clientInfoHeader =
StreamHttpClientInfoHeader.create(
product = "android-core",
productVersion = "1.1.0",
os = "Android",
apiLevel = Build.VERSION.SDK_INT,
deviceModel = "Pixel 7 Pro",
app = "Stream Android Core Sample",
appVersion = "1.0.0",
),
),
)
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.android.core.api.model.config

import io.getstream.android.core.annotations.StreamInternalApi
import io.getstream.android.core.api.authentication.StreamTokenManager
import io.getstream.android.core.api.components.StreamAndroidComponentsProvider
import io.getstream.android.core.api.log.StreamLoggerProvider
import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor
import io.getstream.android.core.api.observers.network.StreamNetworkMonitor
import io.getstream.android.core.api.processing.StreamBatcher
import io.getstream.android.core.api.processing.StreamSerialProcessingQueue
import io.getstream.android.core.api.processing.StreamSingleFlightProcessor
import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator
import io.getstream.android.core.api.socket.StreamConnectionIdHolder
import io.getstream.android.core.api.socket.StreamWebSocketFactory
import io.getstream.android.core.api.socket.listeners.StreamClientListener
import io.getstream.android.core.api.socket.monitor.StreamHealthMonitor
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager

/**
* Optional overrides for internal components used by
* [StreamClient][io.getstream.android.core.api.StreamClient].
*
* All fields default to `null`, meaning the factory creates default instances. Provide a non-null
* value to replace a specific component — useful for sharing instances across product layers or
* injecting custom implementations.
*
* ### Usage
*
* ```kotlin
* // Share a single-flight processor between core and product API layer
* val singleFlight = StreamSingleFlightProcessor(scope)
*
* val client = StreamClient(
* ...,
* components = StreamComponentProvider(
* singleFlight = singleFlight,
* ),
* )
*
* val productApi = MyProductApi(singleFlight) // same instance
* ```
*
* @param logProvider Logger provider used to create tagged loggers for internal components.
* Defaults to Android logcat.
* @param singleFlight Request deduplication processor.
* @param serialQueue Serial processing queue for ordered execution.
* @param tokenManager Token lifecycle manager.
* @param connectionIdHolder Connection ID storage.
* @param socketFactory WebSocket factory.
* @param batcher WebSocket message batcher.
* @param healthMonitor Connection health monitor.
* @param networkMonitor Network connectivity monitor.
* @param lifecycleMonitor App lifecycle monitor.
* @param connectionRecoveryEvaluator Reconnection heuristics evaluator.
* @param clientSubscriptionManager Socket-level listener registry.
* @param androidComponentsProvider Android system service provider.
*/
@Suppress("LongParameterList")
@StreamInternalApi
public data class StreamComponentProvider(
val logProvider: StreamLoggerProvider = StreamLoggerProvider.defaultAndroidLogger(),
val singleFlight: StreamSingleFlightProcessor? = null,
val serialQueue: StreamSerialProcessingQueue? = null,
val tokenManager: StreamTokenManager? = null,
val connectionIdHolder: StreamConnectionIdHolder? = null,
val socketFactory: StreamWebSocketFactory? = null,
val batcher: StreamBatcher<String>? = null,
val healthMonitor: StreamHealthMonitor? = null,
val networkMonitor: StreamNetworkMonitor? = null,
val lifecycleMonitor: StreamLifecycleMonitor? = null,
val connectionRecoveryEvaluator: StreamConnectionRecoveryEvaluator? = null,
val clientSubscriptionManager: StreamSubscriptionManager<StreamClientListener>? = null,
val androidComponentsProvider: StreamAndroidComponentsProvider? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,80 +19,207 @@
import io.getstream.android.core.annotations.StreamInternalApi
import io.getstream.android.core.api.model.value.StreamApiKey
import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader
import io.getstream.android.core.api.model.value.StreamWsUrl

/**
* Configuration for the Stream socket.
* Configuration for the Stream WebSocket connection.
*
* @param url The URL to connect to.
* @param apiKey The API key for authentication.
* @param authType The type of authentication used (e.g., "jwt").
* @param clientInfoHeader The client info header.
* Holds both **identity** (URL, API key, auth type) and **operational** tunables (health check
* timing, batching, connection timeout). Products pass this to the [StreamClient] factory to
* describe their socket.
*
* ### Usage
*
* ```kotlin
* // Coordinator socket — standard timing
* val coordinatorSocket = StreamSocketConfig.jwt(
* url = StreamWsUrl.fromString("wss://chat.stream-io-api.com/connect"),
* apiKey = apiKey,
* clientInfoHeader = clientInfo,
* )
*
* // SFU socket — aggressive timing, no batching
* val sfuSocket = StreamSocketConfig.jwt(
* url = StreamWsUrl.fromString("wss://sfu.stream-io-api.com"),
* apiKey = apiKey,
* clientInfoHeader = clientInfo,
* healthCheckIntervalMs = 5_000,
* livenessThresholdMs = 15_000,
* connectionTimeoutMs = 2_000,
* batchSize = 1,
* )
* ```
*
* @param url WebSocket endpoint URL.
* @param apiKey Stream API key for authentication.
* @param authType Authentication type (e.g., "jwt", "anonymous").
* @param clientInfoHeader X-Stream-Client header value.
* @param healthCheckIntervalMs Interval between health check pings in milliseconds.
* @param livenessThresholdMs Time without a health check ack before the connection is considered
* unhealthy in milliseconds.
* @param connectionTimeoutMs WebSocket connection timeout in milliseconds.
* @param batchSize Maximum number of WebSocket messages to batch before flushing.
* @param batchInitialDelayMs Initial debounce window for batching in milliseconds.
* @param batchMaxDelayMs Maximum debounce window for batching in milliseconds.
*/
@Suppress("LongParameterList")
@StreamInternalApi
@ConsistentCopyVisibility
public data class StreamSocketConfig
private constructor(
val url: String,
val url: StreamWsUrl,
val apiKey: StreamApiKey,
val authType: String,
val clientInfoHeader: StreamHttpClientInfoHeader,
val healthCheckIntervalMs: Long = DEFAULT_HEALTH_INTERVAL_MS,
val livenessThresholdMs: Long = DEFAULT_LIVENESS_MS,
val connectionTimeoutMs: Long = DEFAULT_CONNECTION_TIMEOUT_MS,
val batchSize: Int = DEFAULT_BATCH_SIZE,
val batchInitialDelayMs: Long = DEFAULT_BATCH_INIT_DELAY_MS,
val batchMaxDelayMs: Long = DEFAULT_BATCH_MAX_DELAY_MS,
) {
/** Default values for [StreamSocketConfig] fields. */
public companion object {
private const val JWT_AUTH_TYPE = "jwt"
private const val ANONYMOUS_AUTH_TYPE = "anonymous"

/** Default health check ping interval: 25 seconds. */
public const val DEFAULT_HEALTH_INTERVAL_MS: Long = 25_000L

/** Default liveness threshold: 60 seconds without ack. */
public const val DEFAULT_LIVENESS_MS: Long = 60_000L

/** Default connection timeout: 10 seconds. */
public const val DEFAULT_CONNECTION_TIMEOUT_MS: Long = 10_000L

/** Default batch size: 10 messages. */
public const val DEFAULT_BATCH_SIZE: Int = 10

/** Default initial batch delay: 100ms. */
public const val DEFAULT_BATCH_INIT_DELAY_MS: Long = 100L

/** Default max batch delay: 1 second. */
public const val DEFAULT_BATCH_MAX_DELAY_MS: Long = 1_000L

/**
* Creates a JWT-based [StreamSocketConfig].
*
* @param url The URL to connect to.
* @param apiKey The API key for authentication.
* @param clientInfoHeader The client info header.
* @param url WebSocket endpoint URL.
* @param apiKey Stream API key for authentication.
* @param clientInfoHeader X-Stream-Client header value.
* @param healthCheckIntervalMs Interval between health check pings in milliseconds.
* @param livenessThresholdMs Liveness threshold in milliseconds.
* @param connectionTimeoutMs WebSocket connection timeout in milliseconds.
* @param batchSize Maximum batch size before flush.
* @param batchInitialDelayMs Initial debounce window in milliseconds.
* @param batchMaxDelayMs Maximum debounce window in milliseconds.
* @return A JWT-based [StreamSocketConfig].
*/
@Suppress("LongParameterList")
public fun jwt(

Check warning on line 119 in stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 9 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-core-android&issues=AZ2MDiq9roWKLK63Y7-4&open=AZ2MDiq9roWKLK63Y7-4&pullRequest=54
url: String,
url: StreamWsUrl,
apiKey: StreamApiKey,
clientInfoHeader: StreamHttpClientInfoHeader,
): StreamSocketConfig {
require(url.isNotBlank()) { "URL must not be blank" }
return StreamSocketConfig(url, apiKey, JWT_AUTH_TYPE, clientInfoHeader)
}
healthCheckIntervalMs: Long = DEFAULT_HEALTH_INTERVAL_MS,
livenessThresholdMs: Long = DEFAULT_LIVENESS_MS,
connectionTimeoutMs: Long = DEFAULT_CONNECTION_TIMEOUT_MS,
batchSize: Int = DEFAULT_BATCH_SIZE,
batchInitialDelayMs: Long = DEFAULT_BATCH_INIT_DELAY_MS,
batchMaxDelayMs: Long = DEFAULT_BATCH_MAX_DELAY_MS,
): StreamSocketConfig =
StreamSocketConfig(
url = url,
apiKey = apiKey,
authType = JWT_AUTH_TYPE,
clientInfoHeader = clientInfoHeader,
healthCheckIntervalMs = healthCheckIntervalMs,
livenessThresholdMs = livenessThresholdMs,
connectionTimeoutMs = connectionTimeoutMs,
batchSize = batchSize,
batchInitialDelayMs = batchInitialDelayMs,
batchMaxDelayMs = batchMaxDelayMs,
)

/**
* Creates an anonymous [StreamSocketConfig].
*
* @param url The URL to connect to.
* @param apiKey The API key for authentication.
* @param clientInfoHeader The client info header.
* @param url WebSocket endpoint URL.
* @param apiKey Stream API key for authentication.
* @param clientInfoHeader X-Stream-Client header value.
* @param healthCheckIntervalMs Interval between health check pings in milliseconds.
* @param livenessThresholdMs Liveness threshold in milliseconds.
* @param connectionTimeoutMs WebSocket connection timeout in milliseconds.
* @param batchSize Maximum batch size before flush.
* @param batchInitialDelayMs Initial debounce window in milliseconds.
* @param batchMaxDelayMs Maximum debounce window in milliseconds.
* @return An anonymous [StreamSocketConfig].
*/
@Suppress("LongParameterList")
public fun anonymous(

Check warning on line 158 in stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 9 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-core-android&issues=AZ2MDiq9roWKLK63Y7-5&open=AZ2MDiq9roWKLK63Y7-5&pullRequest=54
url: String,
url: StreamWsUrl,
apiKey: StreamApiKey,
clientInfoHeader: StreamHttpClientInfoHeader,
): StreamSocketConfig {
require(url.isNotBlank()) { "URL must not be blank" }
return StreamSocketConfig(url, apiKey, ANONYMOUS_AUTH_TYPE, clientInfoHeader)
}
healthCheckIntervalMs: Long = DEFAULT_HEALTH_INTERVAL_MS,
livenessThresholdMs: Long = DEFAULT_LIVENESS_MS,
connectionTimeoutMs: Long = DEFAULT_CONNECTION_TIMEOUT_MS,
batchSize: Int = DEFAULT_BATCH_SIZE,
batchInitialDelayMs: Long = DEFAULT_BATCH_INIT_DELAY_MS,
batchMaxDelayMs: Long = DEFAULT_BATCH_MAX_DELAY_MS,
): StreamSocketConfig =
StreamSocketConfig(
url = url,
apiKey = apiKey,
authType = ANONYMOUS_AUTH_TYPE,
clientInfoHeader = clientInfoHeader,
healthCheckIntervalMs = healthCheckIntervalMs,
livenessThresholdMs = livenessThresholdMs,
connectionTimeoutMs = connectionTimeoutMs,
batchSize = batchSize,
batchInitialDelayMs = batchInitialDelayMs,
batchMaxDelayMs = batchMaxDelayMs,
)

/**
* Creates a custom [StreamSocketConfig].
*
* @param url The URL to connect to.
* @param apiKey The API key for authentication.
* @param authType The type of authentication used (e.g., "jwt").
* @param clientInfoHeader The client info header.
* @param url WebSocket endpoint URL.
* @param apiKey Stream API key for authentication.
* @param authType Authentication type (e.g., "jwt", "anonymous").
* @param clientInfoHeader X-Stream-Client header value.
* @param healthCheckIntervalMs Interval between health check pings in milliseconds.
* @param livenessThresholdMs Liveness threshold in milliseconds.
* @param connectionTimeoutMs WebSocket connection timeout in milliseconds.
* @param batchSize Maximum batch size before flush.
* @param batchInitialDelayMs Initial debounce window in milliseconds.
* @param batchMaxDelayMs Maximum debounce window in milliseconds.
* @return A custom [StreamSocketConfig].
*/
@Suppress("LongParameterList")
public fun custom(

Check warning on line 198 in stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 10 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-core-android&issues=AZ2MDiq9roWKLK63Y7-6&open=AZ2MDiq9roWKLK63Y7-6&pullRequest=54
url: String,
url: StreamWsUrl,
apiKey: StreamApiKey,
authType: String,
clientInfoHeader: StreamHttpClientInfoHeader,
healthCheckIntervalMs: Long = DEFAULT_HEALTH_INTERVAL_MS,
livenessThresholdMs: Long = DEFAULT_LIVENESS_MS,
connectionTimeoutMs: Long = DEFAULT_CONNECTION_TIMEOUT_MS,
batchSize: Int = DEFAULT_BATCH_SIZE,
batchInitialDelayMs: Long = DEFAULT_BATCH_INIT_DELAY_MS,
batchMaxDelayMs: Long = DEFAULT_BATCH_MAX_DELAY_MS,
): StreamSocketConfig {
require(url.isNotBlank()) { "URL must not be blank" }
require(authType.isNotBlank()) { "Auth type must not be blank" }
return StreamSocketConfig(url, apiKey, authType, clientInfoHeader)
return StreamSocketConfig(
url = url,
apiKey = apiKey,
authType = authType,
clientInfoHeader = clientInfoHeader,
healthCheckIntervalMs = healthCheckIntervalMs,
livenessThresholdMs = livenessThresholdMs,
connectionTimeoutMs = connectionTimeoutMs,
batchSize = batchSize,
batchInitialDelayMs = batchInitialDelayMs,
batchMaxDelayMs = batchMaxDelayMs,
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ internal class StreamWebSocketFactoryImpl(
): Result<WebSocket> = runCatching {
logger.v { "[createSocket] config: $streamSocketConfig" }
val url =
"${streamSocketConfig.url}?" +
"${streamSocketConfig.url.rawValue}?" +
"api_key=${streamSocketConfig.apiKey.rawValue}" +
"&stream-auth-type=${streamSocketConfig.authType}" +
"&X-Stream-Client=${streamSocketConfig.clientInfoHeader.rawValue}"
Expand Down
Loading
Loading