diff --git a/app/src/main/java/io/getstream/android/core/sample/SampleApp.kt b/app/src/main/java/io/getstream/android/core/sample/SampleApp.kt index 7816278..cca17aa 100644 --- a/app/src/main/java/io/getstream/android/core/sample/SampleApp.kt +++ b/app/src/main/java/io/getstream/android/core/sample/SampleApp.kt @@ -24,6 +24,7 @@ import io.getstream.android.core.api.StreamClient import io.getstream.android.core.api.authentication.StreamTokenProvider import io.getstream.android.core.api.model.StreamUser import io.getstream.android.core.api.model.config.StreamClientSerializationConfig +import io.getstream.android.core.api.model.config.StreamSocketConfig import io.getstream.android.core.api.model.value.StreamApiKey import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader import io.getstream.android.core.api.model.value.StreamToken @@ -60,23 +61,8 @@ class SampleApp : Application() { StreamClient( context = this.applicationContext, scope = coroutinesScope, - apiKey = StreamApiKey.fromString("pd67s34fzpgw"), user = user, products = listOf("feeds", "chat", "video"), - wsUrl = - StreamWsUrl.fromString( - "wss://chat-edge-frankfurt-ce1.stream-io-api.com/api/v2/connect" - ), - clientInfoHeader = - StreamHttpClientInfoHeader.create( - product = "android-core", - productVersion = "1.1.0", - os = "Android", - apiLevel = Build.VERSION.SDK_INT, - deviceModel = "Pixel 7 Pro", - app = "Stream Android Core Sample", - appVersion = "1.0.0", - ), tokenProvider = object : StreamTokenProvider { override suspend fun loadToken(userId: StreamUserId): StreamToken { @@ -92,6 +78,24 @@ class SampleApp : Application() { Result.success(Unit) } ), + socketConfig = + StreamSocketConfig.jwt( + url = + StreamWsUrl.fromString( + "wss://chat-edge-frankfurt-ce1.stream-io-api.com/api/v2/connect" + ), + apiKey = StreamApiKey.fromString("pd67s34fzpgw"), + clientInfoHeader = + StreamHttpClientInfoHeader.create( + product = "android-core", + productVersion = "1.1.0", + os = "Android", + apiLevel = Build.VERSION.SDK_INT, + deviceModel = "Pixel 7 Pro", + app = "Stream Android Core Sample", + appVersion = "1.0.0", + ), + ), ) } } diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt index 1739897..8bd3a05 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt @@ -26,19 +26,16 @@ import io.getstream.android.core.api.http.StreamOkHttpInterceptors import io.getstream.android.core.api.log.StreamLoggerProvider import io.getstream.android.core.api.model.StreamUser import io.getstream.android.core.api.model.config.StreamClientSerializationConfig +import io.getstream.android.core.api.model.config.StreamComponentProvider import io.getstream.android.core.api.model.config.StreamHttpConfig import io.getstream.android.core.api.model.config.StreamSocketConfig import io.getstream.android.core.api.model.connection.StreamConnectedUser import io.getstream.android.core.api.model.connection.StreamConnectionState import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState import io.getstream.android.core.api.model.connection.network.StreamNetworkState -import io.getstream.android.core.api.model.value.StreamApiKey -import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader -import io.getstream.android.core.api.model.value.StreamWsUrl import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor import io.getstream.android.core.api.observers.network.StreamNetworkMonitor import io.getstream.android.core.api.processing.StreamBatcher -import io.getstream.android.core.api.processing.StreamRetryProcessor import io.getstream.android.core.api.processing.StreamSerialProcessingQueue import io.getstream.android.core.api.processing.StreamSingleFlightProcessor import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator @@ -143,83 +140,154 @@ public interface StreamClient : StreamObservable { } /** - * ### Overview + * Creates a [StreamClient] with mandatory identity parameters and optional configuration. * - * Creates a [StreamClient] with the given [apiKey], [user], [tokenProvider] and [scope]. The client - * is created in a disconnected state. You must call `connect()` to establish a connection. The - * client is automatically disconnected when the [scope] is cancelled. - * - * **Important**: The client instance **must be kept alive for the duration of the connection**. Do - * not create a new client for every operation. - * - * **Token provider:** - * - The [tokenProvider] is used to fetch tokens on demand. The first token is cached internally. - * When the first request needs to be made, the token is fetched from the provider. If you already - * have a token, you can cache it in your provider and return it as a valid token in `loadToken`. - * See [StreamTokenProvider] for more details. - * - * **Scope:** - * - The [scope] is used to launch the client's internal coroutines. It is recommended to use a - * `CoroutineScope(SupervisorJob() + Dispatchers.Default)` for this purpose. - * - * ### Security - * - The [tokenProvider] is used to fetch tokens on demand. The first token is cached internally. - * When the token expires, the provider is called again to fetch a new one. - * - The expiration is determined by a `401` response from the server at which point the request is - * retried with the new token. - * - * ### Performance - * - The client uses a single-flight pattern to deduplicate concurrent requests. - * - The client uses a serial processing queue to ensure that requests are executed in order. - * - The client uses a message batcher to coalesce high-frequency events. + * This is the primary entry point for product SDKs to create a client. All internal components are + * created with sensible defaults. Use [components] to replace specific internal components (for + * sharing instances or custom implementations). * * ### Usage * * ```kotlin + * // Minimal — all defaults * val client = StreamClient( - * apiKey = "my-api-key", - * userId = "my-user-id", - * tokenProvider = MyTokenProvider(), - * scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + * scope = scope, + * context = context, + * user = user, + * tokenProvider = tokenProvider, + * products = listOf("chat"), + * socketConfig = StreamSocketConfig.jwt( + * url = StreamWsUrl.fromString("wss://chat.stream-io-api.com/connect"), + * apiKey = apiKey, + * clientInfoHeader = clientInfoHeader, + * ), + * serializationConfig = StreamClientSerializationConfig.default(chatEventSerializer), + * ) + * + * // With tuned socket, custom logging, HTTP, and component overrides + * val singleFlight = StreamSingleFlightProcessor(scope) + * val client = StreamClient( + * scope = scope, + * context = context, + * user = user, + * tokenProvider = tokenProvider, + * products = listOf("feeds"), + * socketConfig = StreamSocketConfig.jwt( + * url = StreamWsUrl.fromString("wss://feeds.stream-io-api.com/connect"), + * apiKey = apiKey, + * clientInfoHeader = clientInfoHeader, + * healthCheckIntervalMs = 30_000, + * ), + * serializationConfig = StreamClientSerializationConfig.default(feedsEventSerializer), + * httpConfig = StreamHttpConfig(httpBuilder), + * components = StreamComponentProvider( + * logProvider = myLogProvider, + * singleFlight = singleFlight, + * ), * ) * ``` * - * @param apiKey The API key. - * @param user The user ID. - * @param wsUrl The WebSocket URL. - * @param products Stream product codes (for feature gates / telemetry) negotiated with the socket. - * @param clientInfoHeader The client info header. - * @param clientSubscriptionManager Manages socket-level listeners registered via [StreamClient]. - * @param tokenProvider The token provider. - * @param tokenManager The token manager. - * @param singleFlight The single-flight processor. - * @param serialQueue The serial processing queue. - * @param retryProcessor The retry processor. - * @param scope The coroutine scope powering internal work (usually `SupervisorJob + Dispatcher`). - * @param connectionIdHolder The connection ID holder. - * @param socketFactory The WebSocket factory. - * @param batcher The WebSocket event batcher. - * @param healthMonitor The health monitor. - * @param networkMonitor Tracks device connectivity and feeds connection recovery. - * @param httpConfig Optional HTTP client customization. - * @param serializationConfig Composite JSON / event serialization configuration. - * @param logProvider The logger provider. + * @param scope Coroutine scope powering internal work. Recommended: + * `CoroutineScope(SupervisorJob() + Dispatchers.Default)`. + * @param context Android application context. + * @param user User identity. + * @param tokenProvider Provides authentication tokens on demand. + * @param products Stream product codes negotiated with the socket (e.g. "chat", "feeds", "video"). + * @param socketConfig WebSocket connection configuration (URL, auth, timing, batching). + * @param serializationConfig JSON and event serialization configuration. + * @param httpConfig Optional HTTP client customization (OkHttp builder, interceptors). + * @param components Optional component overrides for DI. Defaults to + * [StreamComponentProvider()][StreamComponentProvider] (all defaults). */ +@Suppress("LongParameterList", "CyclomaticComplexMethod") @SuppressLint("ExposeAsStateFlow") @StreamInternalApi public fun StreamClient( + scope: CoroutineScope, + context: Context, + user: StreamUser, + tokenProvider: StreamTokenProvider, + products: List, + socketConfig: StreamSocketConfig, + serializationConfig: StreamClientSerializationConfig, + httpConfig: StreamHttpConfig? = null, + components: StreamComponentProvider = StreamComponentProvider(), +): StreamClient { + val logProvider = components.logProvider + val singleFlight = components.singleFlight ?: StreamSingleFlightProcessor(scope) + + return createStreamClientInternal( + scope = scope, + context = context, + user = user, + tokenProvider = tokenProvider, + products = products, + socketConfig = socketConfig, + serializationConfig = serializationConfig, + httpConfig = httpConfig, + androidComponentsProvider = + components.androidComponentsProvider + ?: StreamAndroidComponentsProvider(context.applicationContext), + logProvider = logProvider, + clientSubscriptionManager = + components.clientSubscriptionManager + ?: StreamSubscriptionManager( + logger = logProvider.taggedLogger("SCClientSubscriptions"), + maxStrongSubscriptions = 250, + maxWeakSubscriptions = 250, + ), + singleFlight = singleFlight, + serialQueue = + components.serialQueue + ?: StreamSerialProcessingQueue( + logger = logProvider.taggedLogger("SCSerialProcessing"), + scope = scope, + ), + tokenManager = + components.tokenManager ?: StreamTokenManager(user.id, tokenProvider, singleFlight), + connectionIdHolder = components.connectionIdHolder ?: StreamConnectionIdHolder(), + socketFactory = + components.socketFactory + ?: StreamWebSocketFactory(logger = logProvider.taggedLogger("SCWebSocketFactory")), + batcher = + components.batcher + ?: StreamBatcher( + scope = scope, + batchSize = socketConfig.batchSize, + initialDelayMs = socketConfig.batchInitialDelayMs, + maxDelayMs = socketConfig.batchMaxDelayMs, + ), + healthMonitor = + components.healthMonitor + ?: StreamHealthMonitor( + logger = logProvider.taggedLogger("SCHealthMonitor"), + scope = scope, + interval = socketConfig.healthCheckIntervalMs, + livenessThreshold = socketConfig.livenessThresholdMs, + ), + networkMonitor = components.networkMonitor, + lifecycleMonitor = components.lifecycleMonitor, + connectionRecoveryEvaluator = components.connectionRecoveryEvaluator, + ) +} + +/** + * Internal full-parameter factory. Used by the simplified [StreamClient] factory above and + * available for tests requiring full DI control. + */ +@Suppress("LongParameterList", "LongMethod") +@SuppressLint("ExposeAsStateFlow") +internal fun createStreamClientInternal( // Android scope: CoroutineScope, context: Context, // Client config - apiKey: StreamApiKey, user: StreamUser, - wsUrl: StreamWsUrl, - products: List, - clientInfoHeader: StreamHttpClientInfoHeader, tokenProvider: StreamTokenProvider, + products: List, + socketConfig: StreamSocketConfig, serializationConfig: StreamClientSerializationConfig, httpConfig: StreamHttpConfig? = null, @@ -245,9 +313,6 @@ public fun StreamClient( logger = logProvider.taggedLogger("SCSerialProcessing"), scope = scope, ), - retryProcessor: StreamRetryProcessor = - StreamRetryProcessor(logger = logProvider.taggedLogger("SCRetryProcessor")), - // Token tokenManager: StreamTokenManager = StreamTokenManager(user.id, tokenProvider, singleFlight), @@ -256,38 +321,54 @@ public fun StreamClient( socketFactory: StreamWebSocketFactory = StreamWebSocketFactory(logger = logProvider.taggedLogger("SCWebSocketFactory")), batcher: StreamBatcher = - StreamBatcher(scope = scope, batchSize = 10, initialDelayMs = 100L, maxDelayMs = 1_000L), + StreamBatcher( + scope = scope, + batchSize = socketConfig.batchSize, + initialDelayMs = socketConfig.batchInitialDelayMs, + maxDelayMs = socketConfig.batchMaxDelayMs, + ), // Monitoring healthMonitor: StreamHealthMonitor = - StreamHealthMonitor(logger = logProvider.taggedLogger("SCHealthMonitor"), scope = scope), - networkMonitor: StreamNetworkMonitor = - StreamNetworkMonitor( - logger = logProvider.taggedLogger("SCNetworkMonitor"), + StreamHealthMonitor( + logger = logProvider.taggedLogger("SCHealthMonitor"), scope = scope, - connectivityManager = androidComponentsProvider.connectivityManager().getOrThrow(), - wifiManager = androidComponentsProvider.wifiManager().getOrThrow(), - telephonyManager = androidComponentsProvider.telephonyManager().getOrThrow(), - subscriptionManager = - StreamSubscriptionManager( - logger = logProvider.taggedLogger("SCNetworkMonitorSubscriptions") - ), - ), - lifecycleMonitor: StreamLifecycleMonitor = - StreamLifecycleMonitor( - logger = logProvider.taggedLogger("SCLifecycleMonitor"), - subscriptionManager = - StreamSubscriptionManager( - logger = logProvider.taggedLogger("SCLifecycleMonitorSubscriptions") - ), - lifecycle = androidComponentsProvider.lifecycle(), - ), - connectionRecoveryEvaluator: StreamConnectionRecoveryEvaluator = - StreamConnectionRecoveryEvaluator( - logger = logProvider.taggedLogger("SCConnectionRecoveryEvaluator"), - singleFlightProcessor = singleFlight, + interval = socketConfig.healthCheckIntervalMs, + livenessThreshold = socketConfig.livenessThresholdMs, ), + networkMonitor: StreamNetworkMonitor? = null, + lifecycleMonitor: StreamLifecycleMonitor? = null, + connectionRecoveryEvaluator: StreamConnectionRecoveryEvaluator? = null, ): StreamClient { + val resolvedNetworkMonitor = + networkMonitor + ?: StreamNetworkMonitor( + logger = logProvider.taggedLogger("SCNetworkMonitor"), + scope = scope, + connectivityManager = androidComponentsProvider.connectivityManager().getOrThrow(), + wifiManager = androidComponentsProvider.wifiManager().getOrThrow(), + telephonyManager = androidComponentsProvider.telephonyManager().getOrThrow(), + subscriptionManager = + StreamSubscriptionManager( + logger = logProvider.taggedLogger("SCNetworkMonitorSubscriptions") + ), + ) + val resolvedLifecycleMonitor = + lifecycleMonitor + ?: StreamLifecycleMonitor( + logger = logProvider.taggedLogger("SCLifecycleMonitor"), + subscriptionManager = + StreamSubscriptionManager( + logger = logProvider.taggedLogger("SCLifecycleMonitorSubscriptions") + ), + lifecycle = androidComponentsProvider.lifecycle(), + ) + val resolvedRecoveryEvaluator = + connectionRecoveryEvaluator + ?: StreamConnectionRecoveryEvaluator( + logger = logProvider.taggedLogger("SCConnectionRecoveryEvaluator"), + singleFlightProcessor = singleFlight, + ) val clientLogger = logProvider.taggedLogger(tag = "SCClient") val parent = scope.coroutineContext[Job] val supervisorJob = @@ -317,11 +398,15 @@ public fun StreamClient( httpConfig?.apply { if (automaticInterceptors) { httpBuilder.apply { - addInterceptor(StreamOkHttpInterceptors.clientInfo(clientInfoHeader)) - addInterceptor(StreamOkHttpInterceptors.apiKey(apiKey)) + addInterceptor(StreamOkHttpInterceptors.clientInfo(socketConfig.clientInfoHeader)) + addInterceptor(StreamOkHttpInterceptors.apiKey(socketConfig.apiKey)) addInterceptor(StreamOkHttpInterceptors.connectionId(connectionIdHolder)) addInterceptor( - StreamOkHttpInterceptors.auth("jwt", tokenManager, compositeSerialization) + StreamOkHttpInterceptors.auth( + socketConfig.authType, + tokenManager, + compositeSerialization, + ) ) addInterceptor(StreamOkHttpInterceptors.error(compositeSerialization)) } @@ -332,8 +417,8 @@ public fun StreamClient( val networkAndLifeCycleMonitor = StreamNetworkAndLifeCycleMonitor( logger = logProvider.taggedLogger("SCNetworkAndLifecycleMonitor"), - networkMonitor = networkMonitor, - lifecycleMonitor = lifecycleMonitor, + networkMonitor = resolvedNetworkMonitor, + lifecycleMonitor = resolvedLifecycleMonitor, mutableNetworkState = MutableStateFlow(StreamNetworkState.Unknown), mutableLifecycleState = MutableStateFlow(StreamLifecycleState.Unknown), subscriptionManager = @@ -354,17 +439,12 @@ public fun StreamClient( mutableConnectionState = mutableConnectionState, subscriptionManager = clientSubscriptionManager, networkAndLifeCycleMonitor = networkAndLifeCycleMonitor, - connectionRecoveryEvaluator = connectionRecoveryEvaluator, + connectionRecoveryEvaluator = resolvedRecoveryEvaluator, socketSession = StreamSocketSession( logger = logProvider.taggedLogger("SCSocketSession"), products = products, - config = - StreamSocketConfig.jwt( - url = wsUrl.rawValue, - apiKey = apiKey, - clientInfoHeader = clientInfoHeader, - ), + config = socketConfig, jsonSerialization = compositeSerialization, eventParser = StreamCompositeEventSerializationImpl( diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt new file mode 100644 index 0000000..4512789 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.model.config + +import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.authentication.StreamTokenManager +import io.getstream.android.core.api.components.StreamAndroidComponentsProvider +import io.getstream.android.core.api.log.StreamLoggerProvider +import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor +import io.getstream.android.core.api.observers.network.StreamNetworkMonitor +import io.getstream.android.core.api.processing.StreamBatcher +import io.getstream.android.core.api.processing.StreamSerialProcessingQueue +import io.getstream.android.core.api.processing.StreamSingleFlightProcessor +import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator +import io.getstream.android.core.api.socket.StreamConnectionIdHolder +import io.getstream.android.core.api.socket.StreamWebSocketFactory +import io.getstream.android.core.api.socket.listeners.StreamClientListener +import io.getstream.android.core.api.socket.monitor.StreamHealthMonitor +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager + +/** + * Optional overrides for internal components used by + * [StreamClient][io.getstream.android.core.api.StreamClient]. + * + * All fields default to `null`, meaning the factory creates default instances. Provide a non-null + * value to replace a specific component — useful for sharing instances across product layers or + * injecting custom implementations. + * + * ### Usage + * + * ```kotlin + * // Share a single-flight processor between core and product API layer + * val singleFlight = StreamSingleFlightProcessor(scope) + * + * val client = StreamClient( + * ..., + * components = StreamComponentProvider( + * singleFlight = singleFlight, + * ), + * ) + * + * val productApi = MyProductApi(singleFlight) // same instance + * ``` + * + * @param logProvider Logger provider used to create tagged loggers for internal components. + * Defaults to Android logcat. + * @param singleFlight Request deduplication processor. + * @param serialQueue Serial processing queue for ordered execution. + * @param tokenManager Token lifecycle manager. + * @param connectionIdHolder Connection ID storage. + * @param socketFactory WebSocket factory. + * @param batcher WebSocket message batcher. + * @param healthMonitor Connection health monitor. + * @param networkMonitor Network connectivity monitor. + * @param lifecycleMonitor App lifecycle monitor. + * @param connectionRecoveryEvaluator Reconnection heuristics evaluator. + * @param clientSubscriptionManager Socket-level listener registry. + * @param androidComponentsProvider Android system service provider. + */ +@Suppress("LongParameterList") +@StreamInternalApi +public data class StreamComponentProvider( + val logProvider: StreamLoggerProvider = StreamLoggerProvider.defaultAndroidLogger(), + val singleFlight: StreamSingleFlightProcessor? = null, + val serialQueue: StreamSerialProcessingQueue? = null, + val tokenManager: StreamTokenManager? = null, + val connectionIdHolder: StreamConnectionIdHolder? = null, + val socketFactory: StreamWebSocketFactory? = null, + val batcher: StreamBatcher? = null, + val healthMonitor: StreamHealthMonitor? = null, + val networkMonitor: StreamNetworkMonitor? = null, + val lifecycleMonitor: StreamLifecycleMonitor? = null, + val connectionRecoveryEvaluator: StreamConnectionRecoveryEvaluator? = null, + val clientSubscriptionManager: StreamSubscriptionManager? = null, + val androidComponentsProvider: StreamAndroidComponentsProvider? = null, +) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt index f84bfc4..8c32079 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt @@ -19,80 +19,207 @@ package io.getstream.android.core.api.model.config import io.getstream.android.core.annotations.StreamInternalApi import io.getstream.android.core.api.model.value.StreamApiKey import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader +import io.getstream.android.core.api.model.value.StreamWsUrl /** - * Configuration for the Stream socket. + * Configuration for the Stream WebSocket connection. * - * @param url The URL to connect to. - * @param apiKey The API key for authentication. - * @param authType The type of authentication used (e.g., "jwt"). - * @param clientInfoHeader The client info header. + * Holds both **identity** (URL, API key, auth type) and **operational** tunables (health check + * timing, batching, connection timeout). Products pass this to the [StreamClient] factory to + * describe their socket. + * + * ### Usage + * + * ```kotlin + * // Coordinator socket — standard timing + * val coordinatorSocket = StreamSocketConfig.jwt( + * url = StreamWsUrl.fromString("wss://chat.stream-io-api.com/connect"), + * apiKey = apiKey, + * clientInfoHeader = clientInfo, + * ) + * + * // SFU socket — aggressive timing, no batching + * val sfuSocket = StreamSocketConfig.jwt( + * url = StreamWsUrl.fromString("wss://sfu.stream-io-api.com"), + * apiKey = apiKey, + * clientInfoHeader = clientInfo, + * healthCheckIntervalMs = 5_000, + * livenessThresholdMs = 15_000, + * connectionTimeoutMs = 2_000, + * batchSize = 1, + * ) + * ``` + * + * @param url WebSocket endpoint URL. + * @param apiKey Stream API key for authentication. + * @param authType Authentication type (e.g., "jwt", "anonymous"). + * @param clientInfoHeader X-Stream-Client header value. + * @param healthCheckIntervalMs Interval between health check pings in milliseconds. + * @param livenessThresholdMs Time without a health check ack before the connection is considered + * unhealthy in milliseconds. + * @param connectionTimeoutMs WebSocket connection timeout in milliseconds. + * @param batchSize Maximum number of WebSocket messages to batch before flushing. + * @param batchInitialDelayMs Initial debounce window for batching in milliseconds. + * @param batchMaxDelayMs Maximum debounce window for batching in milliseconds. */ +@Suppress("LongParameterList") @StreamInternalApi @ConsistentCopyVisibility public data class StreamSocketConfig private constructor( - val url: String, + val url: StreamWsUrl, val apiKey: StreamApiKey, val authType: String, val clientInfoHeader: StreamHttpClientInfoHeader, + val healthCheckIntervalMs: Long = DEFAULT_HEALTH_INTERVAL_MS, + val livenessThresholdMs: Long = DEFAULT_LIVENESS_MS, + val connectionTimeoutMs: Long = DEFAULT_CONNECTION_TIMEOUT_MS, + val batchSize: Int = DEFAULT_BATCH_SIZE, + val batchInitialDelayMs: Long = DEFAULT_BATCH_INIT_DELAY_MS, + val batchMaxDelayMs: Long = DEFAULT_BATCH_MAX_DELAY_MS, ) { + /** Default values for [StreamSocketConfig] fields. */ public companion object { private const val JWT_AUTH_TYPE = "jwt" private const val ANONYMOUS_AUTH_TYPE = "anonymous" + /** Default health check ping interval: 25 seconds. */ + public const val DEFAULT_HEALTH_INTERVAL_MS: Long = 25_000L + + /** Default liveness threshold: 60 seconds without ack. */ + public const val DEFAULT_LIVENESS_MS: Long = 60_000L + + /** Default connection timeout: 10 seconds. */ + public const val DEFAULT_CONNECTION_TIMEOUT_MS: Long = 10_000L + + /** Default batch size: 10 messages. */ + public const val DEFAULT_BATCH_SIZE: Int = 10 + + /** Default initial batch delay: 100ms. */ + public const val DEFAULT_BATCH_INIT_DELAY_MS: Long = 100L + + /** Default max batch delay: 1 second. */ + public const val DEFAULT_BATCH_MAX_DELAY_MS: Long = 1_000L + /** * Creates a JWT-based [StreamSocketConfig]. * - * @param url The URL to connect to. - * @param apiKey The API key for authentication. - * @param clientInfoHeader The client info header. + * @param url WebSocket endpoint URL. + * @param apiKey Stream API key for authentication. + * @param clientInfoHeader X-Stream-Client header value. + * @param healthCheckIntervalMs Interval between health check pings in milliseconds. + * @param livenessThresholdMs Liveness threshold in milliseconds. + * @param connectionTimeoutMs WebSocket connection timeout in milliseconds. + * @param batchSize Maximum batch size before flush. + * @param batchInitialDelayMs Initial debounce window in milliseconds. + * @param batchMaxDelayMs Maximum debounce window in milliseconds. * @return A JWT-based [StreamSocketConfig]. */ + @Suppress("LongParameterList") public fun jwt( - url: String, + url: StreamWsUrl, apiKey: StreamApiKey, clientInfoHeader: StreamHttpClientInfoHeader, - ): StreamSocketConfig { - require(url.isNotBlank()) { "URL must not be blank" } - return StreamSocketConfig(url, apiKey, JWT_AUTH_TYPE, clientInfoHeader) - } + healthCheckIntervalMs: Long = DEFAULT_HEALTH_INTERVAL_MS, + livenessThresholdMs: Long = DEFAULT_LIVENESS_MS, + connectionTimeoutMs: Long = DEFAULT_CONNECTION_TIMEOUT_MS, + batchSize: Int = DEFAULT_BATCH_SIZE, + batchInitialDelayMs: Long = DEFAULT_BATCH_INIT_DELAY_MS, + batchMaxDelayMs: Long = DEFAULT_BATCH_MAX_DELAY_MS, + ): StreamSocketConfig = + StreamSocketConfig( + url = url, + apiKey = apiKey, + authType = JWT_AUTH_TYPE, + clientInfoHeader = clientInfoHeader, + healthCheckIntervalMs = healthCheckIntervalMs, + livenessThresholdMs = livenessThresholdMs, + connectionTimeoutMs = connectionTimeoutMs, + batchSize = batchSize, + batchInitialDelayMs = batchInitialDelayMs, + batchMaxDelayMs = batchMaxDelayMs, + ) /** * Creates an anonymous [StreamSocketConfig]. * - * @param url The URL to connect to. - * @param apiKey The API key for authentication. - * @param clientInfoHeader The client info header. + * @param url WebSocket endpoint URL. + * @param apiKey Stream API key for authentication. + * @param clientInfoHeader X-Stream-Client header value. + * @param healthCheckIntervalMs Interval between health check pings in milliseconds. + * @param livenessThresholdMs Liveness threshold in milliseconds. + * @param connectionTimeoutMs WebSocket connection timeout in milliseconds. + * @param batchSize Maximum batch size before flush. + * @param batchInitialDelayMs Initial debounce window in milliseconds. + * @param batchMaxDelayMs Maximum debounce window in milliseconds. * @return An anonymous [StreamSocketConfig]. */ + @Suppress("LongParameterList") public fun anonymous( - url: String, + url: StreamWsUrl, apiKey: StreamApiKey, clientInfoHeader: StreamHttpClientInfoHeader, - ): StreamSocketConfig { - require(url.isNotBlank()) { "URL must not be blank" } - return StreamSocketConfig(url, apiKey, ANONYMOUS_AUTH_TYPE, clientInfoHeader) - } + healthCheckIntervalMs: Long = DEFAULT_HEALTH_INTERVAL_MS, + livenessThresholdMs: Long = DEFAULT_LIVENESS_MS, + connectionTimeoutMs: Long = DEFAULT_CONNECTION_TIMEOUT_MS, + batchSize: Int = DEFAULT_BATCH_SIZE, + batchInitialDelayMs: Long = DEFAULT_BATCH_INIT_DELAY_MS, + batchMaxDelayMs: Long = DEFAULT_BATCH_MAX_DELAY_MS, + ): StreamSocketConfig = + StreamSocketConfig( + url = url, + apiKey = apiKey, + authType = ANONYMOUS_AUTH_TYPE, + clientInfoHeader = clientInfoHeader, + healthCheckIntervalMs = healthCheckIntervalMs, + livenessThresholdMs = livenessThresholdMs, + connectionTimeoutMs = connectionTimeoutMs, + batchSize = batchSize, + batchInitialDelayMs = batchInitialDelayMs, + batchMaxDelayMs = batchMaxDelayMs, + ) /** * Creates a custom [StreamSocketConfig]. * - * @param url The URL to connect to. - * @param apiKey The API key for authentication. - * @param authType The type of authentication used (e.g., "jwt"). - * @param clientInfoHeader The client info header. + * @param url WebSocket endpoint URL. + * @param apiKey Stream API key for authentication. + * @param authType Authentication type (e.g., "jwt", "anonymous"). + * @param clientInfoHeader X-Stream-Client header value. + * @param healthCheckIntervalMs Interval between health check pings in milliseconds. + * @param livenessThresholdMs Liveness threshold in milliseconds. + * @param connectionTimeoutMs WebSocket connection timeout in milliseconds. + * @param batchSize Maximum batch size before flush. + * @param batchInitialDelayMs Initial debounce window in milliseconds. + * @param batchMaxDelayMs Maximum debounce window in milliseconds. * @return A custom [StreamSocketConfig]. */ + @Suppress("LongParameterList") public fun custom( - url: String, + url: StreamWsUrl, apiKey: StreamApiKey, authType: String, clientInfoHeader: StreamHttpClientInfoHeader, + healthCheckIntervalMs: Long = DEFAULT_HEALTH_INTERVAL_MS, + livenessThresholdMs: Long = DEFAULT_LIVENESS_MS, + connectionTimeoutMs: Long = DEFAULT_CONNECTION_TIMEOUT_MS, + batchSize: Int = DEFAULT_BATCH_SIZE, + batchInitialDelayMs: Long = DEFAULT_BATCH_INIT_DELAY_MS, + batchMaxDelayMs: Long = DEFAULT_BATCH_MAX_DELAY_MS, ): StreamSocketConfig { - require(url.isNotBlank()) { "URL must not be blank" } require(authType.isNotBlank()) { "Auth type must not be blank" } - return StreamSocketConfig(url, apiKey, authType, clientInfoHeader) + return StreamSocketConfig( + url = url, + apiKey = apiKey, + authType = authType, + clientInfoHeader = clientInfoHeader, + healthCheckIntervalMs = healthCheckIntervalMs, + livenessThresholdMs = livenessThresholdMs, + connectionTimeoutMs = connectionTimeoutMs, + batchSize = batchSize, + batchInitialDelayMs = batchInitialDelayMs, + batchMaxDelayMs = batchMaxDelayMs, + ) } } } diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/socket/factory/StreamWebSocketFactoryImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/socket/factory/StreamWebSocketFactoryImpl.kt index afa8b39..a93a676 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/socket/factory/StreamWebSocketFactoryImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/socket/factory/StreamWebSocketFactoryImpl.kt @@ -39,7 +39,7 @@ internal class StreamWebSocketFactoryImpl( ): Result = runCatching { logger.v { "[createSocket] config: $streamSocketConfig" } val url = - "${streamSocketConfig.url}?" + + "${streamSocketConfig.url.rawValue}?" + "api_key=${streamSocketConfig.apiKey.rawValue}" + "&stream-auth-type=${streamSocketConfig.authType}" + "&X-Stream-Client=${streamSocketConfig.clientInfoHeader.rawValue}" diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientConfigFactoryTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientConfigFactoryTest.kt new file mode 100644 index 0000000..d4734c9 --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientConfigFactoryTest.kt @@ -0,0 +1,426 @@ +/* + * Copyright (c) 2014-2025 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:OptIn(StreamInternalApi::class) + +package io.getstream.android.core.api + +import android.net.ConnectivityManager +import android.net.wifi.WifiManager +import android.telephony.TelephonyManager +import androidx.lifecycle.Lifecycle +import androidx.lifecycle.LifecycleObserver +import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.authentication.StreamTokenManager +import io.getstream.android.core.api.authentication.StreamTokenProvider +import io.getstream.android.core.api.components.StreamAndroidComponentsProvider +import io.getstream.android.core.api.log.StreamLogger +import io.getstream.android.core.api.log.StreamLoggerProvider +import io.getstream.android.core.api.model.StreamUser +import io.getstream.android.core.api.model.config.StreamClientSerializationConfig +import io.getstream.android.core.api.model.config.StreamComponentProvider +import io.getstream.android.core.api.model.config.StreamSocketConfig +import io.getstream.android.core.api.model.connection.StreamConnectionState +import io.getstream.android.core.api.model.value.StreamApiKey +import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader +import io.getstream.android.core.api.model.value.StreamToken +import io.getstream.android.core.api.model.value.StreamUserId +import io.getstream.android.core.api.model.value.StreamWsUrl +import io.getstream.android.core.api.processing.StreamBatcher +import io.getstream.android.core.api.processing.StreamSerialProcessingQueue +import io.getstream.android.core.api.processing.StreamSingleFlightProcessor +import io.getstream.android.core.api.serialization.StreamEventSerialization +import io.getstream.android.core.api.socket.StreamConnectionIdHolder +import io.getstream.android.core.api.socket.StreamWebSocketFactory +import io.getstream.android.core.api.socket.listeners.StreamClientListener +import io.getstream.android.core.api.socket.monitor.StreamHealthMonitor +import io.getstream.android.core.api.subscribe.StreamSubscriptionManager +import io.getstream.android.core.internal.client.StreamClientImpl +import io.getstream.android.core.internal.socket.StreamSocketSession +import io.getstream.android.core.testutil.assertFieldEquals +import io.getstream.android.core.testutil.readPrivateField +import io.mockk.mockk +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertTrue +import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.TestScope +import org.junit.Test + +internal class StreamClientConfigFactoryTest { + + private val dispatcher = StandardTestDispatcher() + private val testScope = TestScope(dispatcher) + private val productSerializer = mockk>(relaxed = true) + private val serializationConfig = StreamClientSerializationConfig.default(productSerializer) + private val logProvider = + object : StreamLoggerProvider { + override fun taggedLogger(tag: String): StreamLogger = + object : StreamLogger { + override fun log( + level: StreamLogger.LogLevel, + throwable: Throwable?, + message: () -> String, + ) { + // no-op for tests + } + } + } + + private val fakeAndroidComponents = + object : StreamAndroidComponentsProvider { + override fun connectivityManager(): Result = + Result.success(mockk(relaxed = true)) + + override fun wifiManager(): Result = Result.success(mockk(relaxed = true)) + + override fun telephonyManager(): Result = + Result.success(mockk(relaxed = true)) + + override fun lifecycle(): Lifecycle = + object : Lifecycle() { + override fun addObserver(observer: LifecycleObserver) {} + + override fun removeObserver(observer: LifecycleObserver) {} + + override val currentState: State + get() = State.CREATED + } + } + + private val defaultSocketConfig = + StreamSocketConfig.jwt( + url = StreamWsUrl.fromString("wss://test.stream/connect"), + apiKey = StreamApiKey.fromString("key123"), + clientInfoHeader = + StreamHttpClientInfoHeader.create( + product = "android", + productVersion = "1.0", + os = "android", + apiLevel = 33, + deviceModel = "Pixel", + app = "test-app", + appVersion = "1.0.0", + ), + ) + private val user = StreamUser(id = StreamUserId.fromString("user-123")) + + private fun buildClient( + socketConfig: StreamSocketConfig = defaultSocketConfig, + components: StreamComponentProvider = + StreamComponentProvider( + logProvider = logProvider, + androidComponentsProvider = fakeAndroidComponents, + ), + ): StreamClient = + StreamClient( + scope = testScope, + context = mockk(relaxed = true), + user = user, + tokenProvider = + object : StreamTokenProvider { + override suspend fun loadToken(userId: StreamUserId): StreamToken = + StreamToken.fromString("token") + }, + products = listOf("feeds"), + socketConfig = socketConfig, + serializationConfig = serializationConfig, + components = components, + ) + + // ── StreamSocketConfig tunables ───────────────────────────────────────── + + @Test + fun `factory with default config creates client in Idle state`() { + val client = buildClient() + + assertTrue(client is StreamClientImpl<*>) + assertTrue(client.connectionState.value is StreamConnectionState.Idle) + } + + @Test + fun `factory wires socketConfig to socket session`() { + val customConfig = + StreamSocketConfig.jwt( + url = StreamWsUrl.fromString("wss://staging.getstream.io"), + apiKey = StreamApiKey.fromString("staging-key"), + clientInfoHeader = defaultSocketConfig.clientInfoHeader, + ) + val client = buildClient(socketConfig = customConfig) + + val socketSession = + (client as StreamClientImpl<*>).readPrivateField("socketSession") + as StreamSocketSession<*> + socketSession.assertFieldEquals("config", customConfig) + } + + @Test + fun `factory wires custom health check timing from socketConfig`() { + val customConfig = + StreamSocketConfig.jwt( + url = defaultSocketConfig.url, + apiKey = defaultSocketConfig.apiKey, + clientInfoHeader = defaultSocketConfig.clientInfoHeader, + healthCheckIntervalMs = 5_000L, + livenessThresholdMs = 15_000L, + ) + val client = buildClient(socketConfig = customConfig) + + val socketSession = + (client as StreamClientImpl<*>).readPrivateField("socketSession") + as StreamSocketSession<*> + val healthMonitor = socketSession.readPrivateField("healthMonitor") as StreamHealthMonitor + assertNotNull(healthMonitor) + healthMonitor.assertFieldEquals("interval", 5_000L) + healthMonitor.assertFieldEquals("livenessThreshold", 15_000L) + } + + @Test + fun `factory wires custom batch parameters from socketConfig`() { + val customConfig = + StreamSocketConfig.jwt( + url = defaultSocketConfig.url, + apiKey = defaultSocketConfig.apiKey, + clientInfoHeader = defaultSocketConfig.clientInfoHeader, + batchSize = 20, + batchInitialDelayMs = 50L, + batchMaxDelayMs = 500L, + ) + val client = buildClient(socketConfig = customConfig) + + val socketSession = + (client as StreamClientImpl<*>).readPrivateField("socketSession") + as StreamSocketSession<*> + val batcher = socketSession.readPrivateField("batcher") as StreamBatcher<*> + assertNotNull(batcher) + batcher.assertFieldEquals("batchSize", 20) + batcher.assertFieldEquals("initialDelayMs", 50L) + batcher.assertFieldEquals("maxDelayMs", 500L) + } + + // ── StreamComponentProvider overrides ──────────────────────────────────── + + @Test + fun `factory wires injected singleFlight from components`() { + val singleFlight = mockk(relaxed = true) + val client = + buildClient( + components = + StreamComponentProvider( + logProvider = logProvider, + singleFlight = singleFlight, + androidComponentsProvider = fakeAndroidComponents, + ) + ) + + (client as StreamClientImpl<*>).assertFieldEquals("singleFlight", singleFlight) + } + + @Test + fun `factory wires injected serialQueue from components`() { + val serialQueue = mockk(relaxed = true) + val client = + buildClient( + components = + StreamComponentProvider( + logProvider = logProvider, + serialQueue = serialQueue, + androidComponentsProvider = fakeAndroidComponents, + ) + ) + + (client as StreamClientImpl<*>).assertFieldEquals("serialQueue", serialQueue) + } + + @Test + fun `factory wires injected tokenManager from components`() { + val tokenManager = mockk(relaxed = true) + val client = + buildClient( + components = + StreamComponentProvider( + logProvider = logProvider, + tokenManager = tokenManager, + androidComponentsProvider = fakeAndroidComponents, + ) + ) + + (client as StreamClientImpl<*>).assertFieldEquals("tokenManager", tokenManager) + } + + @Test + fun `factory wires injected connectionIdHolder from components`() { + val connectionIdHolder = StreamConnectionIdHolder() + val client = + buildClient( + components = + StreamComponentProvider( + logProvider = logProvider, + connectionIdHolder = connectionIdHolder, + androidComponentsProvider = fakeAndroidComponents, + ) + ) + + (client as StreamClientImpl<*>).assertFieldEquals("connectionIdHolder", connectionIdHolder) + } + + @Test + fun `factory wires injected socketFactory from components`() { + val socketFactory = mockk(relaxed = true) + val client = + buildClient( + components = + StreamComponentProvider( + logProvider = logProvider, + socketFactory = socketFactory, + androidComponentsProvider = fakeAndroidComponents, + ) + ) + + val socketSession = + (client as StreamClientImpl<*>).readPrivateField("socketSession") + as StreamSocketSession<*> + val internalSocket = socketSession.readPrivateField("internalSocket") + val wiredFactory = internalSocket?.readPrivateField("socketFactory") + assertEquals(socketFactory, wiredFactory) + } + + @Test + fun `factory wires injected healthMonitor from components`() { + val healthMonitor = mockk(relaxed = true) + val client = + buildClient( + components = + StreamComponentProvider( + logProvider = logProvider, + healthMonitor = healthMonitor, + androidComponentsProvider = fakeAndroidComponents, + ) + ) + + val socketSession = + (client as StreamClientImpl<*>).readPrivateField("socketSession") + as StreamSocketSession<*> + socketSession.assertFieldEquals("healthMonitor", healthMonitor) + } + + @Test + fun `factory wires injected batcher from components`() { + val batcher = mockk>(relaxed = true) + val client = + buildClient( + components = + StreamComponentProvider( + logProvider = logProvider, + batcher = batcher, + androidComponentsProvider = fakeAndroidComponents, + ) + ) + + val socketSession = + (client as StreamClientImpl<*>).readPrivateField("socketSession") + as StreamSocketSession<*> + socketSession.assertFieldEquals("batcher", batcher) + } + + @Test + fun `factory wires injected clientSubscriptionManager from components`() { + val subscriptionManager = + mockk>(relaxed = true) + val client = + buildClient( + components = + StreamComponentProvider( + logProvider = logProvider, + clientSubscriptionManager = subscriptionManager, + androidComponentsProvider = fakeAndroidComponents, + ) + ) + + (client as StreamClientImpl<*>).assertFieldEquals( + "subscriptionManager", + subscriptionManager, + ) + } + + @Test + fun `factory creates default components when provider fields are null`() { + val client = + buildClient( + components = + StreamComponentProvider( + logProvider = logProvider, + androidComponentsProvider = fakeAndroidComponents, + ) + ) + + val impl = client as StreamClientImpl<*> + assertNotNull(impl.readPrivateField("singleFlight")) + assertNotNull(impl.readPrivateField("serialQueue")) + assertNotNull(impl.readPrivateField("tokenManager")) + assertNotNull(impl.readPrivateField("connectionIdHolder")) + assertNotNull(impl.readPrivateField("subscriptionManager")) + + val socketSession = impl.readPrivateField("socketSession") as StreamSocketSession<*> + assertNotNull(socketSession.readPrivateField("healthMonitor")) + assertNotNull(socketSession.readPrivateField("batcher")) + assertNotNull(socketSession.readPrivateField("internalSocket")) + } + + // ── SocketConfig + Components combined ────────────────────────────────── + + @Test + fun `factory applies both socketConfig tunables and component overrides`() { + val singleFlight = mockk(relaxed = true) + val healthMonitor = mockk(relaxed = true) + val customSocketConfig = + StreamSocketConfig.jwt( + url = StreamWsUrl.fromString("wss://custom.stream.io"), + apiKey = defaultSocketConfig.apiKey, + clientInfoHeader = defaultSocketConfig.clientInfoHeader, + batchSize = 5, + batchInitialDelayMs = 25L, + batchMaxDelayMs = 250L, + ) + + val client = + buildClient( + socketConfig = customSocketConfig, + components = + StreamComponentProvider( + logProvider = logProvider, + singleFlight = singleFlight, + healthMonitor = healthMonitor, + androidComponentsProvider = fakeAndroidComponents, + ), + ) + + val impl = client as StreamClientImpl<*> + impl.assertFieldEquals("singleFlight", singleFlight) + + val socketSession = impl.readPrivateField("socketSession") as StreamSocketSession<*> + socketSession.assertFieldEquals("config", customSocketConfig) + + // Injected health monitor takes precedence over socketConfig timing + socketSession.assertFieldEquals("healthMonitor", healthMonitor) + + // Batcher still created from socketConfig since not injected + val batcher = socketSession.readPrivateField("batcher") as StreamBatcher<*> + batcher.assertFieldEquals("batchSize", 5) + batcher.assertFieldEquals("initialDelayMs", 25L) + batcher.assertFieldEquals("maxDelayMs", 250L) + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientFactoryTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientFactoryTest.kt index 7bf8871..52a73c5 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientFactoryTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientFactoryTest.kt @@ -38,7 +38,6 @@ import io.getstream.android.core.api.model.value.StreamWsUrl import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor import io.getstream.android.core.api.observers.network.StreamNetworkMonitor import io.getstream.android.core.api.processing.StreamBatcher -import io.getstream.android.core.api.processing.StreamRetryProcessor import io.getstream.android.core.api.processing.StreamSerialProcessingQueue import io.getstream.android.core.api.processing.StreamSingleFlightProcessor import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator @@ -92,16 +91,13 @@ internal class StreamClientFactoryTest { } private data class Dependencies( - val apiKey: StreamApiKey, + val socketConfig: StreamSocketConfig, val user: StreamUser, - val wsUrl: StreamWsUrl, - val clientInfo: StreamHttpClientInfoHeader, val clientSubscriptionManager: StreamSubscriptionManager, val tokenProvider: StreamTokenProvider, val tokenManager: StreamTokenManager, val singleFlight: StreamSingleFlightProcessor, val serialQueue: StreamSerialProcessingQueue, - val retryProcessor: StreamRetryProcessor, val connectionIdHolder: StreamConnectionIdHolder, val socketFactory: StreamWebSocketFactory, val healthMonitor: StreamHealthMonitor, @@ -113,25 +109,27 @@ internal class StreamClientFactoryTest { private fun createDependencies(): Dependencies = Dependencies( - apiKey = StreamApiKey.fromString("key123"), - user = StreamUser(id = StreamUserId.fromString("user-123")), - wsUrl = StreamWsUrl.fromString("wss://test.stream/video"), - clientInfo = - StreamHttpClientInfoHeader.create( - product = "android", - productVersion = "1.0", - os = "android", - apiLevel = 33, - deviceModel = "Pixel", - app = "test-app", - appVersion = "1.0.0", + socketConfig = + StreamSocketConfig.jwt( + url = StreamWsUrl.fromString("wss://test.stream/video"), + apiKey = StreamApiKey.fromString("key123"), + clientInfoHeader = + StreamHttpClientInfoHeader.create( + product = "android", + productVersion = "1.0", + os = "android", + apiLevel = 33, + deviceModel = "Pixel", + app = "test-app", + appVersion = "1.0.0", + ), ), + user = StreamUser(id = StreamUserId.fromString("user-123")), clientSubscriptionManager = mockk(relaxed = true), tokenProvider = mockk(relaxed = true), tokenManager = mockk(relaxed = true), singleFlight = mockk(relaxed = true), serialQueue = mockk(relaxed = true), - retryProcessor = mockk(relaxed = true), connectionIdHolder = mockk(relaxed = true), socketFactory = mockk(relaxed = true), healthMonitor = mockk(relaxed = true), @@ -145,19 +143,16 @@ internal class StreamClientFactoryTest { deps: Dependencies, httpConfig: StreamHttpConfig? = null, ): StreamClient { - return StreamClient( + return createStreamClientInternal( context = mockk(relaxed = true), - apiKey = deps.apiKey, user = deps.user, - wsUrl = deps.wsUrl, products = listOf("feeds"), - clientInfoHeader = deps.clientInfo, + socketConfig = deps.socketConfig, clientSubscriptionManager = deps.clientSubscriptionManager, tokenProvider = deps.tokenProvider, tokenManager = deps.tokenManager, singleFlight = deps.singleFlight, serialQueue = deps.serialQueue, - retryProcessor = deps.retryProcessor, scope = testScope, connectionIdHolder = deps.connectionIdHolder, socketFactory = deps.socketFactory, @@ -204,13 +199,7 @@ internal class StreamClientFactoryTest { // socket session wiring val socketSession = client.readPrivateField("socketSession") as StreamSocketSession<*> - val expectedConfig = - StreamSocketConfig.jwt( - url = deps.wsUrl.rawValue, - apiKey = deps.apiKey, - clientInfoHeader = deps.clientInfo, - ) - socketSession.assertFieldEquals("config", expectedConfig) + socketSession.assertFieldEquals("config", deps.socketConfig) socketSession.assertFieldEquals("healthMonitor", deps.healthMonitor) socketSession.assertFieldEquals("batcher", deps.batcher) socketSession.assertFieldEquals("products", listOf("feeds")) @@ -253,15 +242,15 @@ internal class StreamClientFactoryTest { val clientInfoInterceptor = interceptors[0] as StreamClientInfoInterceptor val storedClientInfo = clientInfoInterceptor.readPrivateField("clientInfo") when (storedClientInfo) { - is String -> assertEquals(deps.clientInfo.rawValue, storedClientInfo) - else -> assertEquals(deps.clientInfo, storedClientInfo) + is String -> assertEquals(deps.socketConfig.clientInfoHeader.rawValue, storedClientInfo) + else -> assertEquals(deps.socketConfig.clientInfoHeader, storedClientInfo) } val apiKeyInterceptor = interceptors[1] as StreamApiKeyInterceptor val storedApiKey = apiKeyInterceptor.readPrivateField("apiKey") when (storedApiKey) { - is String -> assertEquals(deps.apiKey.rawValue, storedApiKey) - else -> assertEquals(deps.apiKey, storedApiKey) + is String -> assertEquals(deps.socketConfig.apiKey.rawValue, storedApiKey) + else -> assertEquals(deps.socketConfig.apiKey, storedApiKey) } val connectionInterceptor = interceptors[2] as StreamConnectionIdInterceptor @@ -338,22 +327,25 @@ internal class StreamClientFactoryTest { customData = mapOf("custom" to "data"), ) val client = - StreamClient( + createStreamClientInternal( scope = testScope, context = context, - apiKey = StreamApiKey.fromString("key123"), user = user, - wsUrl = StreamWsUrl.fromString("wss://test.stream/video"), products = listOf("feeds"), - clientInfoHeader = - StreamHttpClientInfoHeader.create( - product = "android", - productVersion = "1.0", - os = "android", - apiLevel = 33, - deviceModel = "Pixel", - app = "test-app", - appVersion = "1.0.0", + socketConfig = + StreamSocketConfig.jwt( + url = StreamWsUrl.fromString("wss://test.stream/video"), + apiKey = StreamApiKey.fromString("key123"), + clientInfoHeader = + StreamHttpClientInfoHeader.create( + product = "android", + productVersion = "1.0", + os = "android", + apiLevel = 33, + deviceModel = "Pixel", + app = "test-app", + appVersion = "1.0.0", + ), ), tokenProvider = tokenProvider, serializationConfig = serializationConfig, diff --git a/stream-android-core/src/test/java/io/getstream/android/core/api/model/config/StreamSocketConfigTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/api/model/config/StreamSocketConfigTest.kt index 33092c6..e58b907 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/api/model/config/StreamSocketConfigTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/api/model/config/StreamSocketConfigTest.kt @@ -18,6 +18,7 @@ package io.getstream.android.core.api.model.config import io.getstream.android.core.api.model.value.StreamApiKey import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader +import io.getstream.android.core.api.model.value.StreamWsUrl import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -25,18 +26,15 @@ import kotlin.test.assertFailsWith class StreamSocketConfigTest { private val apiKey = StreamApiKey.fromString("key") + private val wsUrl = StreamWsUrl.fromString("wss://chat.stream.io") private val header = StreamHttpClientInfoHeader.create("product", "1.0", "android", 34, "pixel") @Test fun `anonymous config uses anonymous auth type`() { val config = - StreamSocketConfig.anonymous( - url = "wss://chat.stream.io", - apiKey = apiKey, - clientInfoHeader = header, - ) + StreamSocketConfig.anonymous(url = wsUrl, apiKey = apiKey, clientInfoHeader = header) - assertEquals("wss://chat.stream.io", config.url) + assertEquals(wsUrl, config.url) assertEquals(apiKey, config.apiKey) assertEquals("anonymous", config.authType) assertEquals(header, config.clientInfoHeader) @@ -44,9 +42,10 @@ class StreamSocketConfigTest { @Test fun `custom config uses provided auth type and validates input`() { + val customUrl = StreamWsUrl.fromString("wss://chat.stream.io/custom") val config = StreamSocketConfig.custom( - url = "wss://chat.stream.io/custom", + url = customUrl, apiKey = apiKey, authType = "token", clientInfoHeader = header, @@ -55,10 +54,43 @@ class StreamSocketConfigTest { assertEquals("token", config.authType) assertFailsWith { - StreamSocketConfig.custom("", apiKey, "jwt", header) - } - assertFailsWith { - StreamSocketConfig.custom("wss://chat.stream.io", apiKey, "", header) + StreamSocketConfig.custom(wsUrl, apiKey, "", header) } } + + @Test + fun `jwt config uses default operational params`() { + val config = StreamSocketConfig.jwt(url = wsUrl, apiKey = apiKey, clientInfoHeader = header) + + assertEquals("jwt", config.authType) + assertEquals(StreamSocketConfig.DEFAULT_HEALTH_INTERVAL_MS, config.healthCheckIntervalMs) + assertEquals(StreamSocketConfig.DEFAULT_LIVENESS_MS, config.livenessThresholdMs) + assertEquals(StreamSocketConfig.DEFAULT_CONNECTION_TIMEOUT_MS, config.connectionTimeoutMs) + assertEquals(StreamSocketConfig.DEFAULT_BATCH_SIZE, config.batchSize) + assertEquals(StreamSocketConfig.DEFAULT_BATCH_INIT_DELAY_MS, config.batchInitialDelayMs) + assertEquals(StreamSocketConfig.DEFAULT_BATCH_MAX_DELAY_MS, config.batchMaxDelayMs) + } + + @Test + fun `jwt config accepts custom operational params`() { + val config = + StreamSocketConfig.jwt( + url = wsUrl, + apiKey = apiKey, + clientInfoHeader = header, + healthCheckIntervalMs = 5_000L, + livenessThresholdMs = 15_000L, + connectionTimeoutMs = 2_000L, + batchSize = 1, + batchInitialDelayMs = 0L, + batchMaxDelayMs = 0L, + ) + + assertEquals(5_000L, config.healthCheckIntervalMs) + assertEquals(15_000L, config.livenessThresholdMs) + assertEquals(2_000L, config.connectionTimeoutMs) + assertEquals(1, config.batchSize) + assertEquals(0L, config.batchInitialDelayMs) + assertEquals(0L, config.batchMaxDelayMs) + } } diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt index 0456350..5766a55 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt @@ -24,6 +24,7 @@ import io.getstream.android.core.api.model.connection.StreamConnectionState import io.getstream.android.core.api.model.event.StreamClientWsEvent import io.getstream.android.core.api.model.exceptions.StreamEndpointErrorData import io.getstream.android.core.api.model.exceptions.StreamEndpointException +import io.getstream.android.core.api.model.value.StreamWsUrl import io.getstream.android.core.api.processing.StreamBatcher import io.getstream.android.core.api.serialization.StreamJsonSerialization import io.getstream.android.core.api.socket.StreamWebSocket @@ -76,7 +77,7 @@ class StreamSocketSessionTest { private val config = StreamSocketConfig.jwt( - url = "wss://example.test/connect", + url = StreamWsUrl.fromString("wss://example.test/connect"), apiKey = mockk(relaxed = true), clientInfoHeader = mockk(relaxed = true), ) diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/factory/StreamWebSocketFactoryImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/factory/StreamWebSocketFactoryImplTest.kt index 103289b..b5f12f0 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/factory/StreamWebSocketFactoryImplTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/socket/factory/StreamWebSocketFactoryImplTest.kt @@ -20,6 +20,7 @@ import io.getstream.android.core.api.log.StreamLogger import io.getstream.android.core.api.model.config.StreamSocketConfig import io.getstream.android.core.api.model.value.StreamApiKey import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader +import io.getstream.android.core.api.model.value.StreamWsUrl import io.mockk.* import okhttp3.OkHttpClient import okhttp3.Request @@ -52,7 +53,7 @@ class StreamWebSocketFactoryImplTest { private val config = StreamSocketConfig.jwt( - url = "wss://example.com/connect", + url = StreamWsUrl.fromString("wss://example.com/connect"), apiKey = StreamApiKey.fromString("test-key"), clientInfoHeader = clientInfoHeader, )