diff --git a/README.md b/README.md index b72f1dde..8af61e9d 100644 --- a/README.md +++ b/README.md @@ -313,6 +313,70 @@ Debug.Log("Connected to " + room.Name); +## Asynchronous programming: coroutines, async/await, and UniTask + +The SDK exposes three interchangeable styles for awaiting asynchronous operations. Coroutines, async/await and UniTask. + +**1. Coroutines (default, no dependency)** — shown throughout this README. + +**2. async/await (no dependency)** — every operation returns an awaitable instruction (`ConnectInstruction`, `PublishTrackInstruction`, `PerformRpcInstruction`, the stream read instructions, …), so you can `await` it directly. As with coroutines, you inspect success/failure on the instruction (`IsError`) — `await` does not throw. Continuations resume on Unity's main thread. + +```cs +async void Start() +{ + var room = new Room(); + var connect = room.Connect("ws://localhost:7880", "", new RoomOptions()); + await connect; + if (!connect.IsError) + Debug.Log("Connected to " + room.Name); +} +``` + +> Use `async void` only for top-level event handlers (e.g. button callbacks); its exceptions surface to Unity's log rather than to a caller. Prefer `async Task`/`async UniTaskVoid` elsewhere. + +**3. UniTask (optional)** — install [UniTask](https://github.com/Cysharp/UniTask) (`com.cysharp.unitask`). The SDK auto-detects it via the `LIVEKIT_UNITASK` scripting define and enables the `LiveKit.UniTask` assembly, which adds `CancellationToken` support, composition, and async streams. + +Cancellation (abandon-awaiter semantics — the underlying request is not cancelled on the wire): + +```cs +await room.Connect("ws://localhost:7880", "", new RoomOptions()) + .AsUniTask(cancellationToken); +``` + +Run operations in parallel. `AsUniTask` does not throw on failure (matching the +coroutine path), so keep the instructions and check `IsError` on each after the +`await` — otherwise a failed operation passes silently: + +```cs +var publishCamera = room.LocalParticipant.PublishTrack(cameraTrack, cameraOptions); +var publishMicrophone = room.LocalParticipant.PublishTrack(microphoneTrack, microphoneOptions); + +await UniTask.WhenAll(publishCamera.AsUniTask(ct), publishMicrophone.AsUniTask(ct)); + +if (publishCamera.IsError || publishMicrophone.IsError) + Debug.LogError("Failed to publish one or more tracks"); +``` + +Consume an incremental stream with `await foreach`. The sequence ends at end-of-stream; if the stream ends with an error it throws a `StreamError`: + +```cs +try +{ + await foreach (var chunk in reader.ReadIncremental().AsAsyncEnumerable(ct)) + Process(chunk); +} +catch (StreamError e) +{ + Debug.LogError(e.Message); +} +``` + +> Error-handling differs by API: awaiting an instruction (and `AsUniTask`) never throws on a +> failed operation — you inspect `IsError` after the `await`. The stream enumerable is the +> exception: `await foreach` has no post-loop point to check `IsError`, so a mid-stream failure +> surfaces by throwing `StreamError`. + + ### Publishing microphone diff --git a/Runtime/Scripts/AssemblyInfo.cs b/Runtime/Scripts/AssemblyInfo.cs index e667b32e..148f70a0 100644 --- a/Runtime/Scripts/AssemblyInfo.cs +++ b/Runtime/Scripts/AssemblyInfo.cs @@ -2,3 +2,5 @@ [assembly: InternalsVisibleTo("EditModeTests")] [assembly: InternalsVisibleTo("PlayModeTests")] +[assembly: InternalsVisibleTo("PlayModeTests.UniTask")] +[assembly: InternalsVisibleTo("LiveKit.UniTask")] diff --git a/Runtime/Scripts/DataStream.cs b/Runtime/Scripts/DataStream.cs index 68b18b5a..4a0339e8 100644 --- a/Runtime/Scripts/DataStream.cs +++ b/Runtime/Scripts/DataStream.cs @@ -95,7 +95,14 @@ public abstract class ReadIncrementalInstructionBase : StreamYieldInst /// public bool IsError => Error != null; - protected TContent LatestChunk + /// + /// The chunk from the most recent completed read. Throws the captured + /// if the last read errored. Internal so the optional + /// UniTask async-enumerable adapter (which has InternalsVisibleTo access) can read + /// it generically; the typed Bytes/Text accessors on the concrete + /// readers delegate here. + /// + internal TContent LatestChunk { get { @@ -153,11 +160,15 @@ protected void OnEos(Proto.StreamError protoError) { lock (_gate) { - IsEos = true; + // Assign Error before flipping IsEos. The IsEos setter fires the awaiter + // continuation, which inspects IsError/Error on resume; when completion runs + // inline on the main thread, setting IsEos first would let the continuation + // observe IsError == false and silently swallow the stream error. if (protoError != null) { Error = new StreamError(protoError); } + IsEos = true; } } } diff --git a/Runtime/Scripts/Internal/YieldInstruction.cs b/Runtime/Scripts/Internal/YieldInstruction.cs index 748e884d..f3419001 100644 --- a/Runtime/Scripts/Internal/YieldInstruction.cs +++ b/Runtime/Scripts/Internal/YieldInstruction.cs @@ -1,8 +1,30 @@ using System; +using System.Runtime.CompilerServices; +using System.Threading; +using LiveKit.Internal; using UnityEngine; namespace LiveKit { + // Resumes awaiter continuations on Unity's main thread. Completion may be signalled on the + // FFI callback thread (operations registered dispatchToMainThread:false, and data-stream + // chunk events), but a custom awaiter otherwise resumes inline on the completing thread — + // leaving callers unable to touch Unity APIs after an await. Posting through the captured + // main-thread SynchronizationContext keeps the await path's threading identical to the + // coroutine path. When already on the main thread (e.g. Connect, which completes there) the + // continuation runs inline to avoid an extra frame of latency. + internal static class AwaiterScheduler + { + internal static void Resume(Action continuation) + { + var context = FfiClient.Instance._context; + if (context == null || SynchronizationContext.Current == context) + continuation(); + else + context.Post(static state => ((Action)state)(), continuation); + } + } + public class YieldInstruction : CustomYieldInstruction { // Backing fields are volatile because completion may run on the FFI callback @@ -13,10 +35,84 @@ public class YieldInstruction : CustomYieldInstruction private volatile bool _isDone; private volatile bool _isError; - public bool IsDone { get => _isDone; protected set => _isDone = value; } + // Sentinel published once completion has fired so any continuation registered + // afterwards runs inline instead of being silently dropped. + private static readonly Action s_completedSentinel = () => { }; + private Action? _continuation; + + public bool IsDone + { + get => _isDone; + protected set + { + _isDone = value; + if (value) InvokeContinuation(); + } + } public bool IsError { get => _isError; protected set => _isError = value; } public override bool keepWaiting => !_isDone; + + /// + /// Returns an awaiter so callers can await this instruction directly. + /// + /// + /// The awaiter completes when becomes true. As with the + /// coroutine path, success vs. failure is inspected on the instruction itself + /// ( and any subclass-specific result fields); GetResult + /// does not throw. + /// + public YieldInstructionAwaiter GetAwaiter() => new YieldInstructionAwaiter(this); + + internal void RegisterContinuation(Action continuation) + { + // Race between completion-side (FFI thread writes sentinel) and await-side + // (registers continuation): CompareExchange decides who wrote first. + // null -> we won, completion will invoke our continuation later + // sentinel -> completion already fired; invoke inline + // other -> a second awaiter beat us here, which we don't support + var prev = Interlocked.CompareExchange(ref _continuation, continuation, null); + if (prev == null) return; + if (ReferenceEquals(prev, s_completedSentinel)) + { + AwaiterScheduler.Resume(continuation); + return; + } + throw new InvalidOperationException( + "YieldInstruction does not support multiple awaiters; await it only once."); + } + + private void InvokeContinuation() + { + var prev = Interlocked.Exchange(ref _continuation, s_completedSentinel); + if (prev != null && !ReferenceEquals(prev, s_completedSentinel)) + { + AwaiterScheduler.Resume(prev); + } + } + } + + public readonly struct YieldInstructionAwaiter : ICriticalNotifyCompletion + { + private readonly YieldInstruction _instruction; + + internal YieldInstructionAwaiter(YieldInstruction instruction) + { + _instruction = instruction; + } + + public bool IsCompleted => _instruction.IsDone; + + public void OnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation); + + // ICriticalNotifyCompletion lets the async state machine skip ExecutionContext capture + // on the hot path. We don't depend on the flowed context (AwaiterScheduler marshals to + // the main thread on its own), so this is safe and avoids a per-await allocation. + public void UnsafeOnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation); + + // Intentionally a no-op. Parity with the coroutine path: callers inspect IsError + // and subclass-specific result fields on the instruction itself. + public void GetResult() { } } public class StreamYieldInstruction : CustomYieldInstruction @@ -28,12 +124,37 @@ public class StreamYieldInstruction : CustomYieldInstruction private volatile bool _isEos; private volatile bool _isCurrentReadDone; + private static readonly Action s_completedSentinel = () => { }; + private Action? _continuation; + /// /// True if the stream has reached the end. /// - public bool IsEos { get => _isEos; protected set => _isEos = value; } + public bool IsEos + { + get => _isEos; + protected set + { + _isEos = value; + if (value) InvokeContinuation(); + } + } - internal bool IsCurrentReadDone { get => _isCurrentReadDone; set => _isCurrentReadDone = value; } + /// + /// True once a chunk is ready for the current read (before is + /// called for the next one). Public getter mirrors the sibling + /// DataTrack.ReadFrameInstruction.IsCurrentReadDone; the setter stays internal + /// because only the SDK's stream readers advance this state. + /// + public bool IsCurrentReadDone + { + get => _isCurrentReadDone; + internal set + { + _isCurrentReadDone = value; + if (value) InvokeContinuation(); + } + } public override bool keepWaiting => !_isCurrentReadDone && !_isEos; @@ -50,6 +171,58 @@ public override void Reset() throw new InvalidOperationException("Cannot reset after end of stream"); } _isCurrentReadDone = false; + // Drop the sentinel published by the previous completion so the next awaiter + // can install a fresh continuation. Safe because Reset is only called after the + // previous read's await has already resumed. + Volatile.Write(ref _continuation, null); + } + + /// + /// Returns an awaiter that completes when the next chunk is ready or the stream ends. + /// Call between iterations to await the following chunk. + /// + public StreamYieldInstructionAwaiter GetAwaiter() => new StreamYieldInstructionAwaiter(this); + + internal void RegisterContinuation(Action continuation) + { + var prev = Interlocked.CompareExchange(ref _continuation, continuation, null); + if (prev == null) return; + if (ReferenceEquals(prev, s_completedSentinel)) + { + AwaiterScheduler.Resume(continuation); + return; + } + throw new InvalidOperationException( + "StreamYieldInstruction does not support multiple concurrent awaiters; await it once per chunk."); } + + private void InvokeContinuation() + { + var prev = Interlocked.Exchange(ref _continuation, s_completedSentinel); + if (prev != null && !ReferenceEquals(prev, s_completedSentinel)) + { + AwaiterScheduler.Resume(prev); + } + } + } + + public readonly struct StreamYieldInstructionAwaiter : ICriticalNotifyCompletion + { + private readonly StreamYieldInstruction _instruction; + + internal StreamYieldInstructionAwaiter(StreamYieldInstruction instruction) + { + _instruction = instruction; + } + + public bool IsCompleted => _instruction.IsCurrentReadDone || _instruction.IsEos; + + public void OnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation); + + // See YieldInstructionAwaiter.UnsafeOnCompleted — skips ExecutionContext capture; the + // continuation is marshalled to the main thread by AwaiterScheduler regardless. + public void UnsafeOnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation); + + public void GetResult() { } } } diff --git a/Runtime/Scripts/UniTask.meta b/Runtime/Scripts/UniTask.meta new file mode 100644 index 00000000..45727607 --- /dev/null +++ b/Runtime/Scripts/UniTask.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: 7f5f50e598f7646458d6958db6c7246a +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs b/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs new file mode 100644 index 00000000..45ea1284 --- /dev/null +++ b/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs @@ -0,0 +1,78 @@ +#if LIVEKIT_UNITASK +using System.Threading; +using Cysharp.Threading.Tasks; +using Cysharp.Threading.Tasks.Linq; + +namespace LiveKit +{ + /// + /// Exposes the SDK's incremental stream readers as + /// so chunks can be consumed with await foreach. Available only when the + /// com.cysharp.unitask package is installed (gated by LIVEKIT_UNITASK). + /// + public static class StreamReaderUniTaskExtensions + { + /// + /// Adapts an incremental stream read into an async sequence of chunks. Works for both + /// (byte[]) and + /// (string). + /// + /// + /// Iteration ends when the stream reaches end-of-stream. If the stream ends with an + /// error, the enumerable throws that (idiomatic for + /// await foreach; this is the one place the UniTask surface throws rather than + /// exposing IsError). Cancellation (via the token or the enumerator) surfaces as + /// with abandon-awaiter semantics — the + /// underlying FFI read is not cancelled on the wire. + /// + /// The current chunk is delivered on the iteration where end-of-stream is also observed, + /// then iteration stops. Chunks buffered beyond the current one when + /// end-of-stream arrives are not drainable, because the reader disallows Reset() + /// past end-of-stream. + /// + public static IUniTaskAsyncEnumerable AsAsyncEnumerable( + this ReadIncrementalInstructionBase instruction, + CancellationToken cancellationToken = default) + { + if (instruction == null) throw new System.ArgumentNullException(nameof(instruction)); + + return UniTaskAsyncEnumerable.Create(async (writer, token) => + { + // The enumerator hands us its own token; honor both it and the caller's. + using var linked = CancellationTokenSource.CreateLinkedTokenSource(token, cancellationToken); + var ct = linked.Token; + + while (true) + { + // Completes when a chunk is ready (IsCurrentReadDone) or the stream ends (IsEos). + await instruction.AsUniTask(ct); + + if (instruction.IsCurrentReadDone) + { + var chunk = instruction.LatestChunk; + await writer.YieldAsync(chunk); + + // Re-check IsEos AFTER yielding: end-of-stream may have arrived while + // the consumer was suspended. Reset() throws once IsEos is set, so this + // re-check (not a value captured before the yield) is what keeps the + // loop safe — mirroring the coroutine consumer's "if (IsEos) break; + // else Reset()" ordering. + if (instruction.IsEos) + { + if (instruction.IsError) throw instruction.Error; + return; + } + + instruction.Reset(); + continue; + } + + // Not IsCurrentReadDone => end-of-stream with nothing left to read. + if (instruction.IsError) throw instruction.Error; + return; + } + }); + } + } +} +#endif diff --git a/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs.meta b/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs.meta new file mode 100644 index 00000000..883d5c63 --- /dev/null +++ b/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: ba02c0c61aa014db28635be5e1cf6e64 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Runtime/Scripts/UniTask/YieldInstructionUniTaskExtensions.cs b/Runtime/Scripts/UniTask/YieldInstructionUniTaskExtensions.cs new file mode 100644 index 00000000..9ede7016 --- /dev/null +++ b/Runtime/Scripts/UniTask/YieldInstructionUniTaskExtensions.cs @@ -0,0 +1,101 @@ +#if LIVEKIT_UNITASK +using System.Threading; +using Cysharp.Threading.Tasks; + +namespace LiveKit +{ + /// + /// Bridges the SDK's / + /// surface to UniTask, adding support. Available only when + /// the com.cysharp.unitask package is installed; the assembly is otherwise excluded + /// via a defineConstraint on LIVEKIT_UNITASK. + /// + public static class YieldInstructionUniTaskExtensions + { + /// + /// Wraps the instruction as a . The task completes when the + /// instruction's transitions to true, or + /// faults with if the token fires + /// first. + /// + /// + /// Cancellation has "abandon awaiter" semantics: the underlying FFI request keeps + /// running and any result is discarded. Wire-level cancellation is not yet + /// supported. Error inspection stays on the instruction itself — the awaiter does + /// not throw on , matching the existing + /// yield return / await behavior. + /// + public static UniTask AsUniTask(this YieldInstruction instruction, CancellationToken cancellationToken = default) + { + if (instruction == null) throw new System.ArgumentNullException(nameof(instruction)); + if (instruction.IsDone) return UniTask.CompletedTask; + if (cancellationToken.IsCancellationRequested) return UniTask.FromCanceled(cancellationToken); + + var source = new UniTaskCompletionSource(); + CancellationTokenRegistration registration = default; + + if (cancellationToken.CanBeCanceled) + { + // Dispose the registration on the cancel path too, so a cancelled-but-never- + // completed awaiter doesn't keep its registration (and this closure) alive on a + // long-lived CancellationTokenSource until the instruction eventually completes. + // Safe: the early IsCancellationRequested check above means the token isn't + // already cancelled, so Register won't invoke this synchronously before + // 'registration' is assigned; disposing from within the callback does not block. + registration = cancellationToken.Register(() => + { + source.TrySetCanceled(); + registration.Dispose(); + }); + } + + // YieldInstruction.RegisterContinuation fires the callback exactly once and is + // race-safe between FFI-thread completion and main-thread registration. Either + // TrySetResult or TrySetCanceled wins; the loser is a no-op. + instruction.GetAwaiter().OnCompleted(() => + { + registration.Dispose(); + source.TrySetResult(); + }); + + return source.Task; + } + + /// + /// UniTask-bridged equivalent of awaiting a once. + /// Call between chunks; each + /// AsUniTask call awaits the next chunk or end-of-stream. + /// + public static UniTask AsUniTask(this StreamYieldInstruction instruction, CancellationToken cancellationToken = default) + { + if (instruction == null) throw new System.ArgumentNullException(nameof(instruction)); + // GetAwaiter().IsCompleted folds together IsCurrentReadDone || IsEos and is + // the only public way to check the combined state from outside the LiveKit asm. + if (instruction.GetAwaiter().IsCompleted) return UniTask.CompletedTask; + if (cancellationToken.IsCancellationRequested) return UniTask.FromCanceled(cancellationToken); + + var source = new UniTaskCompletionSource(); + CancellationTokenRegistration registration = default; + + if (cancellationToken.CanBeCanceled) + { + // See the YieldInstruction overload: dispose on cancel so the registration isn't + // pinned to a long-lived CancellationTokenSource when the read never completes. + registration = cancellationToken.Register(() => + { + source.TrySetCanceled(); + registration.Dispose(); + }); + } + + instruction.GetAwaiter().OnCompleted(() => + { + registration.Dispose(); + source.TrySetResult(); + }); + + return source.Task; + } + } +} +#endif diff --git a/Runtime/Scripts/UniTask/YieldInstructionUniTaskExtensions.cs.meta b/Runtime/Scripts/UniTask/YieldInstructionUniTaskExtensions.cs.meta new file mode 100644 index 00000000..42453b89 --- /dev/null +++ b/Runtime/Scripts/UniTask/YieldInstructionUniTaskExtensions.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 1264e1f4f8a8d4ad9ab94cdc2909a3a1 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef b/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef new file mode 100644 index 00000000..c9a8f374 --- /dev/null +++ b/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef @@ -0,0 +1,26 @@ +{ + "name": "LiveKit.UniTask", + "rootNamespace": "LiveKit", + "references": [ + "LiveKit", + "UniTask", + "UniTask.Linq" + ], + "includePlatforms": [], + "excludePlatforms": [], + "allowUnsafeCode": false, + "overrideReferences": false, + "precompiledReferences": [], + "autoReferenced": true, + "defineConstraints": [ + "LIVEKIT_UNITASK" + ], + "versionDefines": [ + { + "name": "com.cysharp.unitask", + "expression": "2.0.0", + "define": "LIVEKIT_UNITASK" + } + ], + "noEngineReferences": false +} diff --git a/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef.meta b/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef.meta new file mode 100644 index 00000000..3a7e76a5 --- /dev/null +++ b/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef.meta @@ -0,0 +1,7 @@ +fileFormatVersion: 2 +guid: a7fbb1537932e48f4a28030ab7a3ac51 +AssemblyDefinitionImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Samples~/Meet/Packages/manifest.json b/Samples~/Meet/Packages/manifest.json index 9c61b3b4..60663d1e 100644 --- a/Samples~/Meet/Packages/manifest.json +++ b/Samples~/Meet/Packages/manifest.json @@ -1,5 +1,6 @@ { "dependencies": { + "com.cysharp.unitask": "https://github.com/Cysharp/UniTask.git?path=src/UniTask/Assets/Plugins/UniTask#2.5.11", "com.unity.collab-proxy": "2.7.1", "com.unity.feature.2d": "2.0.1", "com.unity.ide.rider": "3.0.36", diff --git a/Samples~/Meet/Packages/packages-lock.json b/Samples~/Meet/Packages/packages-lock.json index 26ac6ead..99c8a782 100644 --- a/Samples~/Meet/Packages/packages-lock.json +++ b/Samples~/Meet/Packages/packages-lock.json @@ -1,11 +1,18 @@ { "dependencies": { + "com.cysharp.unitask": { + "version": "https://github.com/Cysharp/UniTask.git?path=src/UniTask/Assets/Plugins/UniTask#2.5.11", + "depth": 0, + "source": "git", + "dependencies": {}, + "hash": "2e993ff18f28c931602a07292df0b0804eebef99" + }, "com.unity.2d.animation": { - "version": "9.2.0", + "version": "9.1.0", "depth": 1, "source": "registry", "dependencies": { - "com.unity.2d.common": "8.1.0", + "com.unity.2d.common": "8.0.2", "com.unity.2d.sprite": "1.0.0", "com.unity.collections": "1.1.0", "com.unity.modules.animation": "1.0.0", @@ -14,7 +21,7 @@ "url": "https://packages.unity.com" }, "com.unity.2d.aseprite": { - "version": "1.1.9", + "version": "1.1.1", "depth": 1, "source": "registry", "dependencies": { @@ -26,7 +33,7 @@ "url": "https://packages.unity.com" }, "com.unity.2d.common": { - "version": "8.1.0", + "version": "8.0.2", "depth": 2, "source": "registry", "dependencies": { @@ -39,22 +46,20 @@ "url": "https://packages.unity.com" }, "com.unity.2d.pixel-perfect": { - "version": "5.1.0", + "version": "5.0.3", "depth": 1, "source": "registry", - "dependencies": { - "com.unity.modules.imgui": "1.0.0" - }, + "dependencies": {}, "url": "https://packages.unity.com" }, "com.unity.2d.psdimporter": { - "version": "8.1.0", + "version": "8.0.4", "depth": 1, "source": "registry", "dependencies": { - "com.unity.2d.common": "8.1.0", + "com.unity.2d.common": "8.0.2", "com.unity.2d.sprite": "1.0.0", - "com.unity.2d.animation": "9.2.0" + "com.unity.2d.animation": "9.1.0" }, "url": "https://packages.unity.com" }, @@ -65,11 +70,11 @@ "dependencies": {} }, "com.unity.2d.spriteshape": { - "version": "9.1.0", + "version": "9.0.2", "depth": 1, "source": "registry", "dependencies": { - "com.unity.2d.common": "8.1.0", + "com.unity.2d.common": "8.0.1", "com.unity.mathematics": "1.1.0", "com.unity.modules.physics2d": "1.0.0" }, @@ -85,7 +90,7 @@ } }, "com.unity.2d.tilemap.extras": { - "version": "3.1.3", + "version": "3.1.2", "depth": 1, "source": "registry", "dependencies": { @@ -97,7 +102,7 @@ "url": "https://packages.unity.com" }, "com.unity.burst": { - "version": "1.8.21", + "version": "1.8.12", "depth": 3, "source": "registry", "dependencies": { @@ -135,14 +140,14 @@ "depth": 0, "source": "builtin", "dependencies": { - "com.unity.2d.animation": "9.2.0", - "com.unity.2d.pixel-perfect": "5.1.0", - "com.unity.2d.psdimporter": "8.1.0", + "com.unity.2d.animation": "9.1.0", + "com.unity.2d.pixel-perfect": "5.0.3", + "com.unity.2d.psdimporter": "8.0.4", "com.unity.2d.sprite": "1.0.0", - "com.unity.2d.spriteshape": "9.1.0", + "com.unity.2d.spriteshape": "9.0.2", "com.unity.2d.tilemap": "1.0.0", - "com.unity.2d.tilemap.extras": "3.1.3", - "com.unity.2d.aseprite": "1.1.9" + "com.unity.2d.tilemap.extras": "3.1.2", + "com.unity.2d.aseprite": "1.1.1" } }, "com.unity.ide.rider": { diff --git a/Tests/PlayMode/RoomTests.cs b/Tests/PlayMode/RoomTests.cs index 1a871e5c..ebb303ea 100644 --- a/Tests/PlayMode/RoomTests.cs +++ b/Tests/PlayMode/RoomTests.cs @@ -1,5 +1,8 @@ using System.Collections; +using System.Threading; +using System.Threading.Tasks; using NUnit.Framework; +using UnityEngine; using UnityEngine.TestTools; using LiveKit.Proto; using LiveKit.PlayModeTests.Utils; @@ -26,6 +29,84 @@ public IEnumerator Connect_FailsWithInvalidUrl() Assert.IsNotNull(context.ConnectionError, "Expected connection to fail"); } + // Deterministic coverage of the awaiter using a synthetic instruction, so the logic is + // exercised without the FFI (no dev server needed — hence not [Category("E2E")]). A live + // connect would be non-deterministic here: the FFI emits its error log asynchronously and + // would race LogAssert. The connect-fail path itself is covered by Connect_FailsWithInvalidUrl. + private sealed class TestYieldInstruction : YieldInstruction + { + public void Complete() => IsDone = true; + public void CompleteWithError() { IsError = true; IsDone = true; } + } + + // OnCompleted path: await registers a continuation while the instruction is still + // pending, then completion fires it and IsError is visible on resume. + [UnityTest] + public IEnumerator GetAwaiter_ResumesOnCompletion_AndSurfacesIsError() + { + var instruction = new TestYieldInstruction(); + var awaitTask = AwaitInstruction(instruction); + Assert.IsFalse(awaitTask.IsCompleted, "Awaiter must not resume before IsDone"); + + instruction.CompleteWithError(); + yield return new WaitUntil(() => awaitTask.IsCompleted); + + Assert.IsNull(awaitTask.Exception, awaitTask.Exception?.ToString()); + Assert.IsTrue(instruction.IsDone, "Awaiter resumed, so IsDone must be observable"); + Assert.IsTrue(instruction.IsError, "IsError must be visible on resume"); + } + + // IsCompleted fast path: instruction is already done before it is awaited, so the + // awaiter completes without ever registering a continuation. + [UnityTest] + public IEnumerator GetAwaiter_CompletesImmediately_WhenAlreadyDone() + { + var instruction = new TestYieldInstruction(); + instruction.Complete(); + + var awaitTask = AwaitInstruction(instruction); + yield return new WaitUntil(() => awaitTask.IsCompleted); + + Assert.IsNull(awaitTask.Exception, awaitTask.Exception?.ToString()); + Assert.IsTrue(instruction.IsDone); + Assert.IsFalse(instruction.IsError); + } + + // An await continuation must resume on the Unity main thread even when the instruction + // completes on a background thread — which is what the FFI callback thread does for + // operations registered dispatchToMainThread:false (SetMetadata, stream writes, …). + // Coroutines always resume on the main thread; the await path must match so callers can + // safely touch Unity APIs after the await. RED until the awaiter marshals continuations + // to the main SynchronizationContext. + [UnityTest] + public IEnumerator GetAwaiter_ResumesOnMainThread_WhenCompletedOffThread() + { + var mainThreadId = Thread.CurrentThread.ManagedThreadId; + var instruction = new TestYieldInstruction(); + + var awaitTask = AwaitAndGetResumeThread(instruction); + Assert.IsFalse(awaitTask.IsCompleted, "Awaiter must not resume before IsDone"); + + // Complete from a thread-pool thread, mimicking the FFI callback thread. + Task.Run(() => instruction.Complete()); + + yield return new WaitUntil(() => awaitTask.IsCompleted); + + Assert.AreEqual(mainThreadId, awaitTask.Result, + "await must resume on the Unity main thread, not the thread that completed the instruction"); + } + + private static async Task AwaitInstruction(YieldInstruction instruction) + { + await instruction; + } + + private static async Task AwaitAndGetResumeThread(YieldInstruction instruction) + { + await instruction; + return Thread.CurrentThread.ManagedThreadId; + } + [UnityTest, Category("E2E")] public IEnumerator RoomName_MatchesProvided() { diff --git a/Tests/PlayMode/UniTask.meta b/Tests/PlayMode/UniTask.meta new file mode 100644 index 00000000..bc43a857 --- /dev/null +++ b/Tests/PlayMode/UniTask.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: 1ef4cd187b61c4a388e674497c3ac63d +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef b/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef new file mode 100644 index 00000000..40855316 --- /dev/null +++ b/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef @@ -0,0 +1,32 @@ +{ + "name": "PlayModeTests.UniTask", + "rootNamespace": "LiveKit.PlayModeTests.UniTaskBridge", + "references": [ + "UnityEngine.TestRunner", + "UnityEditor.TestRunner", + "LiveKit", + "LiveKit.UniTask", + "UniTask", + "UniTask.Linq" + ], + "includePlatforms": [], + "excludePlatforms": [], + "allowUnsafeCode": false, + "overrideReferences": true, + "precompiledReferences": [ + "nunit.framework.dll" + ], + "autoReferenced": false, + "defineConstraints": [ + "UNITY_INCLUDE_TESTS", + "LIVEKIT_UNITASK" + ], + "versionDefines": [ + { + "name": "com.cysharp.unitask", + "expression": "2.0.0", + "define": "LIVEKIT_UNITASK" + } + ], + "noEngineReferences": false +} diff --git a/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef.meta b/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef.meta new file mode 100644 index 00000000..ba465dbf --- /dev/null +++ b/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef.meta @@ -0,0 +1,7 @@ +fileFormatVersion: 2 +guid: 693ef0d1937d94c97aa2969770e3b59c +AssemblyDefinitionImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Tests/PlayMode/UniTask/RoomUniTaskTests.cs b/Tests/PlayMode/UniTask/RoomUniTaskTests.cs new file mode 100644 index 00000000..18bf3c60 --- /dev/null +++ b/Tests/PlayMode/UniTask/RoomUniTaskTests.cs @@ -0,0 +1,70 @@ +#if LIVEKIT_UNITASK +using System.Threading; +using Cysharp.Threading.Tasks; +using NUnit.Framework; +using UnityEngine.TestTools; + +namespace LiveKit.PlayModeTests.UniTaskBridge +{ + public class RoomUniTaskTests + { + // Synthetic instruction used by the unit tests below — they verify the + // AsUniTask extension's behavior directly against the public setter contract + // (IsError then IsDone, mirroring the production completion order in + // Room.cs / Participant.cs / Track.cs) without needing the FFI. + private sealed class TestInstruction : YieldInstruction + { + public void Complete() => IsDone = true; + public void CompleteWithError() { IsError = true; IsDone = true; } + } + + // AsUniTask completes when IsDone transitions to true, with the instruction's IsError + // visible on resume — parity with awaiting the instruction directly. + [UnityTest] + public System.Collections.IEnumerator AsUniTask_CompletesOnIsDone() => UniTask.ToCoroutine(async () => + { + var instruction = new TestInstruction(); + var task = instruction.AsUniTask(); + Assert.IsFalse(instruction.IsDone, "Sanity: instruction must not be done before Complete()"); + + instruction.CompleteWithError(); + await task; + + Assert.IsTrue(instruction.IsDone, "UniTask should not resume before IsDone"); + Assert.IsTrue(instruction.IsError, "Error state must be visible on resume"); + }); + + // Cancellation has abandon-awaiter semantics: the UniTask faults with + // OperationCanceledException, but the underlying request is not aborted. + // The synthetic instruction is never completed — only the token fires. + [UnityTest] + public System.Collections.IEnumerator AsUniTask_Cancellation_ThrowsOperationCanceled() => UniTask.ToCoroutine(async () => + { + var instruction = new TestInstruction(); + using var cts = new CancellationTokenSource(); + + var task = instruction.AsUniTask(cts.Token); + cts.Cancel(); + + bool threw = false; + try + { + await task; + } + catch (System.OperationCanceledException) + { + threw = true; + } + + Assert.IsTrue(threw, "Expected OperationCanceledException when token was cancelled"); + Assert.IsFalse(instruction.IsDone, "Abandon-awaiter semantics: underlying instruction is untouched"); + }); + + // End-to-end coverage of the FFI path is handled by the migrated Meet sample + // (Samples~/Meet/Assets/Runtime/MeetManager.cs). An additional E2E test here + // was tried and removed: FFI error logs arrive asynchronously and their delivery + // window races UniTask's synchronous resume, so the LogAssert tracking was + // brittle across test order. The unit tests above cover the extension's logic. + } +} +#endif diff --git a/Tests/PlayMode/UniTask/RoomUniTaskTests.cs.meta b/Tests/PlayMode/UniTask/RoomUniTaskTests.cs.meta new file mode 100644 index 00000000..588a85ab --- /dev/null +++ b/Tests/PlayMode/UniTask/RoomUniTaskTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 8fbde2a93f51d461ba697e4688c30e13 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Tests/PlayMode/UniTask/StreamUniTaskTests.cs b/Tests/PlayMode/UniTask/StreamUniTaskTests.cs new file mode 100644 index 00000000..56b4c4d7 --- /dev/null +++ b/Tests/PlayMode/UniTask/StreamUniTaskTests.cs @@ -0,0 +1,142 @@ +#if LIVEKIT_UNITASK +using System; +using System.Collections.Generic; +using System.Threading; +using Cysharp.Threading.Tasks; +using LiveKit.Internal; +using NUnit.Framework; +using UnityEngine.TestTools; + +namespace LiveKit.PlayModeTests.UniTaskBridge +{ + public class StreamUniTaskTests + { + // Synthetic incremental reader that drives the base chunk/EoS machinery directly, + // with no FFI — the same seam used by the EditMode DataStreamIncrementalReadTests. + // FfiHandle is public; new FfiHandle(IntPtr.Zero) is a valid dummy handle. + private sealed class TestIncrementalReader : ReadIncrementalInstructionBase + { + public TestIncrementalReader(FfiHandle h) : base(h) { } + public void PushChunk(string content) => OnChunk(content); + public void PushEos(LiveKit.Proto.StreamError error = null) => OnEos(error); + } + + // Chunks pushed and consumed one at a time arrive in order; the sequence ends when + // EoS is observed. Manual enumeration interleaves push/pull so EoS only follows a + // fully drained queue (matching how chunks arrive over time in production). + [UnityTest] + public System.Collections.IEnumerator AsAsyncEnumerable_DeliversChunksInOrder_ThenStops() => UniTask.ToCoroutine(async () => + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + var e = reader.AsAsyncEnumerable().GetAsyncEnumerator(); + try + { + reader.PushChunk("A"); + Assert.IsTrue(await e.MoveNextAsync(), "Expected chunk A"); + Assert.AreEqual("A", e.Current); + + reader.PushChunk("B"); + Assert.IsTrue(await e.MoveNextAsync(), "Expected chunk B"); + Assert.AreEqual("B", e.Current); + + reader.PushChunk("C"); + Assert.IsTrue(await e.MoveNextAsync(), "Expected chunk C"); + Assert.AreEqual("C", e.Current); + + reader.PushEos(); + Assert.IsFalse(await e.MoveNextAsync(), "Enumeration must end at EoS"); + } + finally + { + await e.DisposeAsync(); + } + }); + + // The current chunk is delivered even when EoS is already set at the time it is read, + // then the sequence ends. (Chunks buffered beyond the current one when EoS arrives are + // not drainable, because the reader disallows Reset() past EoS.) + [UnityTest] + public System.Collections.IEnumerator AsAsyncEnumerable_DeliversFinalChunkThenEos() => UniTask.ToCoroutine(async () => + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + reader.PushChunk("only"); + reader.PushEos(); + + var observed = new List(); + await foreach (var chunk in reader.AsAsyncEnumerable()) + observed.Add(chunk); + + CollectionAssert.AreEqual(new[] { "only" }, observed); + }); + + // A chunk delivered before the stream errors is observed; the subsequent error EoS + // then surfaces as a thrown StreamError. Manual enumeration models the real timeline + // (chunk arrives, is consumed, THEN the error ends the stream) — note that once the + // error is set, LatestChunk itself throws, so the error must follow chunk delivery. + [UnityTest] + public System.Collections.IEnumerator AsAsyncEnumerable_ThrowsStreamError_AfterDeliveringChunk() => UniTask.ToCoroutine(async () => + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + var e = reader.AsAsyncEnumerable().GetAsyncEnumerator(); + try + { + reader.PushChunk("partial"); + Assert.IsTrue(await e.MoveNextAsync(), "Expected the pre-error chunk"); + Assert.AreEqual("partial", e.Current); + + reader.PushEos(new LiveKit.Proto.StreamError { Description = "boom" }); + + StreamError caught = null; + try + { + await e.MoveNextAsync(); + } + catch (StreamError ex) + { + caught = ex; + } + + Assert.IsNotNull(caught, "Expected the error EoS to throw a StreamError"); + Assert.AreEqual("boom", caught.Message); + } + finally + { + await e.DisposeAsync(); + } + }); + + // A cancelled token surfaces as OperationCanceledException with abandon-awaiter + // semantics: nothing is observed and the underlying reader is untouched. + [UnityTest] + public System.Collections.IEnumerator AsAsyncEnumerable_Cancellation_ThrowsOperationCanceled() => UniTask.ToCoroutine(async () => + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var observed = new List(); + bool threw = false; + try + { + await foreach (var chunk in reader.AsAsyncEnumerable(cts.Token)) + observed.Add(chunk); + } + catch (OperationCanceledException) + { + threw = true; + } + + Assert.IsTrue(threw, "Expected OperationCanceledException for a cancelled token"); + CollectionAssert.IsEmpty(observed); + Assert.IsFalse(reader.IsEos, "Abandon-awaiter semantics: reader state is untouched"); + }); + } +} +#endif diff --git a/Tests/PlayMode/UniTask/StreamUniTaskTests.cs.meta b/Tests/PlayMode/UniTask/StreamUniTaskTests.cs.meta new file mode 100644 index 00000000..a10435c8 --- /dev/null +++ b/Tests/PlayMode/UniTask/StreamUniTaskTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: a2e6312b068f8432fa2b267f28d3e10b +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: