Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/late-lamps-greet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": minor
---

Add support for generic RequestResponse
15 changes: 15 additions & 0 deletions .changeset/polite-goats-refuse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
"client-sdk-android": minor
---

Add the following suspend methods to LocalParticipant:

- `setName`
- `setMetadata`
- `setAttributes`

These replace the following deprecated methods:

- `updateName`
- `updateMetadata`
- `updateAttributes`
1 change: 1 addition & 0 deletions livekit-android-sdk/detekt-baseline-release.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<ManuallySuppressedIssues/>
<CurrentIssues>
<ID>ComplexCondition:LocalParticipant.kt$LocalParticipant$(originalEncoding == null &amp;&amp; !simulcast) || width == 0 || height == 0</ID>
<ID>ComplexCondition:LocalParticipant.kt$LocalParticipant$attributes != null &amp;&amp; !attributes.all { (key, value) -> val current = this.attributes[key] current == value || (value.isEmpty() &amp;&amp; current.isNullOrEmpty()) }</ID>
<ID>ComplexCondition:PeerConnectionTransport.kt$PeerConnectionTransport$sd.type == SessionDescription.Type.ANSWER &amp;&amp; currentOfferId > 0 &amp;&amp; offerId > 0 &amp;&amp; currentOfferId > offerId</ID>
<ID>ComplexCondition:PreconnectAudioBuffer.kt$sentIdentities.contains(identity) || kind != Participant.Kind.AGENT || state != Participant.State.ACTIVE || identity == null</ID>
<ID>ComplexCondition:RTCEngine.kt$RTCEngine$(connectionState == ConnectionState.CONNECTED || connectionState == ConnectionState.RESUMING) &amp;&amp; subscriberConnected &amp;&amp; publisherConnected</ID>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ data class ConnectOptions(
/**
* the protocol version to use with the server.
*/
val protocolVersion: ProtocolVersion = ProtocolVersion.v13,
val protocolVersion: ProtocolVersion = ProtocolVersion.v15,

/**
* The client protocol version to advertise to other participants in the room
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,7 @@ internal constructor(
fun onLocalTrackUnpublished(trackUnpublished: LivekitRtc.TrackUnpublishedResponse)
fun onTranscriptionReceived(transcription: LivekitModels.Transcription)
fun onLocalTrackSubscribed(trackSubscribed: LivekitRtc.TrackSubscribed)
fun onRequestResponse(response: LivekitRtc.RequestResponse)
fun onRpcPacketReceived(dp: LivekitModels.DataPacket)
fun onDataStreamPacket(dp: LivekitModels.DataPacket, encryptionType: LivekitModels.Encryption.Type)
}
Expand Down Expand Up @@ -1280,6 +1281,10 @@ internal constructor(
listener?.onLocalTrackUnpublished(trackUnpublished)
}

override fun onRequestResponse(response: LivekitRtc.RequestResponse) {
listener?.onRequestResponse(response)
}

// --------------------------------- DataChannel.Observer ------------------------------------//

fun onBufferedAmountChange(dataChannel: DataChannel, previousAmount: Long) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,13 @@ constructor(
localParticipant.handleSubscribedQualityUpdate(subscribedQualityUpdate)
}

/**
* @suppress
*/
override fun onRequestResponse(response: LivekitRtc.RequestResponse) {
localParticipant.handleRequestResponse(response)
}

/**
* @suppress
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import okhttp3.WebSocketListener
import okio.ByteString
import okio.ByteString.Companion.toByteString
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
Expand Down Expand Up @@ -97,6 +98,7 @@ constructor(
private var lastUrl: String? = null
internal var lastOptions: ConnectOptions? = null
private var lastRoomOptions: RoomOptions? = null
private val nextRequestId = AtomicInteger(0)

// join will always return a JoinResponse.
// reconnect will return a ReconnectResponse or a Unit if a different response was received.
Expand Down Expand Up @@ -563,8 +565,16 @@ constructor(
sendRequest(request)
}

fun sendUpdateLocalMetadata(metadata: String?, name: String?, attributes: Map<String, String>? = emptyMap()) {
internal fun allocateRequestId(): Int = nextRequestId.incrementAndGet()

fun sendUpdateLocalMetadata(
metadata: String?,
name: String?,
attributes: Map<String, String> = emptyMap(),
requestId: Int = allocateRequestId(),
): Int {
val update = LivekitRtc.UpdateParticipantMetadata.newBuilder()
.setRequestId(requestId)
.setMetadata(metadata ?: "")
.setName(name ?: "")
.putAllAttributes(attributes)
Expand All @@ -574,6 +584,7 @@ constructor(
.build()

sendRequest(request)
return requestId
}

fun sendSyncState(syncState: LivekitRtc.SyncState) {
Expand Down Expand Up @@ -837,7 +848,7 @@ constructor(
}

LivekitRtc.SignalResponse.MessageCase.REQUEST_RESPONSE -> {
// TODO
listener?.onRequestResponse(response.requestResponse)
}

LivekitRtc.SignalResponse.MessageCase.ROOM_MOVED -> {
Expand Down Expand Up @@ -958,6 +969,7 @@ constructor(
fun onRefreshToken(token: String)
fun onLocalTrackUnpublished(trackUnpublished: LivekitRtc.TrackUnpublishedResponse)
fun onLocalTrackSubscribed(trackSubscribed: LivekitRtc.TrackSubscribed)
fun onRequestResponse(response: LivekitRtc.RequestResponse)
}

companion object {
Expand Down Expand Up @@ -1025,6 +1037,9 @@ enum class ProtocolVersion(val value: Int) {

// new leave request handling
v13(13),

// signal request response handling
v15(15),
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import io.livekit.android.room.isSVCCodec
import io.livekit.android.room.rpc.RpcClientManager
import io.livekit.android.room.rpc.RpcManager
import io.livekit.android.room.rpc.RpcServerManager
import io.livekit.android.room.signal.SignalRequestException
import io.livekit.android.room.track.DataPublishReliability
import io.livekit.android.room.track.LocalAudioTrack
import io.livekit.android.room.track.LocalAudioTrackOptions
Expand All @@ -59,13 +60,19 @@ import io.livekit.android.room.track.screencapture.ScreenCaptureParams
import io.livekit.android.room.util.EncodingUtils
import io.livekit.android.rpc.RpcError
import io.livekit.android.util.LKLog
import io.livekit.android.util.TimeoutException
import io.livekit.android.util.flow
import io.livekit.android.util.rethrowIfCancellationSignal
import io.livekit.android.util.withDeadline
import io.livekit.android.webrtc.sortVideoCodecPreferences
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
Expand All @@ -90,6 +97,7 @@ import javax.inject.Named
import kotlin.math.max
import kotlin.math.min
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

class LocalParticipant
@AssistedInject
Expand Down Expand Up @@ -130,6 +138,8 @@ internal constructor(

private val jobs = mutableMapOf<LocalTrackPublication, Job>()

private val pendingSignalRequests = Collections.synchronizedMap(mutableMapOf<Int, CompletableDeferred<Unit>>())

// For ensuring that only one caller can execute setTrackEnabled at a time.
// Without it, there's a potential to create multiple of the same source,
// Camera has deadlock issues with multiple CameraCapturers trying to activate/stop.
Expand Down Expand Up @@ -1109,12 +1119,59 @@ internal constructor(
}
}

/**
* Sets and updates the metadata of the local participant.
* Note: this requires `CanUpdateOwnMetadata` permission encoded in the token.
*
* @param metadata the metadata to set
* @return a [Result] that succeeds when the server confirms the update, or fails with
* [SignalRequestException] if the server rejects the request or
* [TimeoutException] if it times out
*/
@CheckResult
suspend fun setMetadata(metadata: String): Result<Unit> {
return requestMetadataUpdate(metadata = metadata)
}

/**
* Sets and updates the name of the local participant.
* Note: this requires `CanUpdateOwnMetadata` permission encoded in the token.
*
* @param name the name to set
* @return a [Result] that succeeds when the server confirms the update, or fails with
* [SignalRequestException] if the server rejects the request or
* [TimeoutException] if it times out
*/
@CheckResult
suspend fun setName(name: String): Result<Unit> {
return requestMetadataUpdate(name = name)
}

/**
* Set or update participant attributes. It will make updates only to keys that
* are present in [attributes], and will not override others.
*
* To delete a value, set the value to an empty string.
*
* Note: this requires `CanUpdateOwnMetadata` permission encoded in the token.
*
* @param attributes attributes to update
* @return a [Result] that succeeds when the server confirms the update, or fails with
* [SignalRequestException] if the server rejects the request or
* [TimeoutException] if it times out
*/
@CheckResult
suspend fun setAttributes(attributes: Map<String, String>): Result<Unit> {
return requestMetadataUpdate(attributes = attributes)
}

/**
* Updates the metadata of the local participant. Changes will not be reflected until the
* server responds confirming the update.
* Note: this requires `CanUpdateOwnMetadata` permission encoded in the token.
* @param metadata
*/
@Deprecated(message = "Use the suspend function setMetadata instead.")
fun updateMetadata(metadata: String) {
this.engine.client.sendUpdateLocalMetadata(metadata, name)
}
Expand All @@ -1125,6 +1182,7 @@ internal constructor(
* Note: this requires `CanUpdateOwnMetadata` permission encoded in the token.
* @param name
*/
@Deprecated(message = "Use the suspend function setName instead.")
fun updateName(name: String) {
this.engine.client.sendUpdateLocalMetadata(metadata, name)
}
Expand All @@ -1138,6 +1196,7 @@ internal constructor(
* Note: this requires `canUpdateOwnMetadata` permission.
* @param attributes attributes to update
*/
@Deprecated(message = "Use the suspend function setAttributes instead.")
fun updateAttributes(attributes: Map<String, String>) {
this.engine.client.sendUpdateLocalMetadata(metadata, name, attributes)
}
Expand All @@ -1147,6 +1206,88 @@ internal constructor(
pub?.muted = muted
}

internal fun handleRequestResponse(response: LivekitRtc.RequestResponse) {
val deferred = pendingSignalRequests[response.requestId] ?: return
if (response.reason != LivekitRtc.RequestResponse.Reason.OK) {
pendingSignalRequests.remove(response.requestId)
deferred.completeExceptionally(SignalRequestException.fromResponse(response))
return
}
pendingSignalRequests.remove(response.requestId)
}

private suspend fun requestMetadataUpdate(
metadata: String? = null,
name: String? = null,
attributes: Map<String, String>? = null,
): Result<Unit> {
val requestId = engine.client.allocateRequestId()
val deferred = CompletableDeferred<Unit>()
pendingSignalRequests[requestId] = deferred

return try {
engine.client.sendUpdateLocalMetadata(
metadata = metadata ?: this.metadata,
name = name ?: this.name,
attributes = attributes ?: emptyMap(),
requestId = requestId,
)
withDeadline(METADATA_UPDATE_TIMEOUT) {
coroutineScope {
val confirmationJob = launch {
combine(
::name.flow,
::metadata.flow,
::attributes.flow,
) { _, _, _ -> }
.first { isMetadataUpdateConfirmed(metadata, name, attributes) }
if (!deferred.isCompleted) {
deferred.complete(Unit)
}
}
try {
deferred.await()
} finally {
confirmationJob.cancel()
}
}
}
Result.success(Unit)
} catch (e: TimeoutException) {
deferred.completeExceptionally(e)
Result.failure(e)
} catch (e: CancellationException) {
deferred.cancel()
throw e
} catch (e: Exception) {
Result.failure(e)
} finally {
pendingSignalRequests.remove(requestId)
}
}

private fun isMetadataUpdateConfirmed(
metadata: String?,
name: String?,
attributes: Map<String, String>?,
): Boolean {
if (name != null && this.name != name) {
return false
}
if (metadata != null && this.metadata != metadata) {
return false
}
if (attributes != null &&
!attributes.all { (key, value) ->
val current = this.attributes[key]
current == value || (value.isEmpty() && current.isNullOrEmpty())
}
) {
return false
}
return true
}

internal fun handleSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate) {
if (!dynacast) {
return
Expand Down Expand Up @@ -1319,6 +1460,9 @@ internal constructor(
* @suppress
*/
fun cleanup() {
pendingSignalRequests.values.forEach { it.cancel() }
pendingSignalRequests.clear()

for (pub in trackPublications.values) {
val track = pub.track

Expand Down Expand Up @@ -1394,6 +1538,10 @@ internal constructor(
interface Factory {
fun create(dynacast: Boolean): LocalParticipant
}

companion object {
private val METADATA_UPDATE_TIMEOUT = 5_000.milliseconds
}
}

internal fun LocalParticipant.publishTracksInfo(): List<LivekitRtc.TrackPublishedResponse> {
Expand Down
Loading