diff --git a/ui/src/stores/streamStore.test.ts b/ui/src/stores/streamStore.test.ts index 4a40674c..3ea26272 100644 --- a/ui/src/stores/streamStore.test.ts +++ b/ui/src/stores/streamStore.test.ts @@ -98,6 +98,8 @@ describe('streamStore', () => { pipelineOutputsVideo: true, errorMessage: '', configLoaded: false, + connectAbort: null, + connectingStep: '', activeSessionId: null, activeSessionName: null, activePipelineName: null, @@ -149,6 +151,12 @@ describe('streamStore', () => { expect(state.activePipelineName).toBeNull(); }); + it('should have null connectAbort and empty connectingStep initially', () => { + const state = useStreamStore.getState(); + expect(state.connectAbort).toBeNull(); + expect(state.connectingStep).toBe(''); + }); + it('should have null MoQ references initially', () => { const state = useStreamStore.getState(); expect(state.publish).toBeNull(); @@ -497,6 +505,34 @@ describe('streamStore', () => { expect(useStreamStore.getState().status).toBe('disconnected'); }); + + it('should abort in-flight connect attempt on disconnect', () => { + const abort = new AbortController(); + const abortSpy = vi.spyOn(abort, 'abort'); + useStreamStore.setState({ + status: 'connecting', + connectAbort: abort, + }); + + useStreamStore.getState().disconnect(); + + expect(abortSpy).toHaveBeenCalled(); + const state = useStreamStore.getState(); + expect(state.status).toBe('disconnected'); + expect(state.connectAbort).toBeNull(); + expect(state.connectingStep).toBe(''); + }); + + it('should clear connectingStep on disconnect', () => { + useStreamStore.setState({ + status: 'connecting', + connectingStep: 'devices', + }); + + useStreamStore.getState().disconnect(); + + expect(useStreamStore.getState().connectingStep).toBe(''); + }); }); describe('connection state machine', () => { diff --git a/ui/src/stores/streamStore.ts b/ui/src/stores/streamStore.ts index 082268d7..c9cbc011 100644 --- a/ui/src/stores/streamStore.ts +++ b/ui/src/stores/streamStore.ts @@ -69,6 +69,11 @@ interface StreamState { activeSessionName: string | null; activePipelineName: string | null; + // Connect lifecycle — abort controller for the in-flight performConnect call + connectAbort: AbortController | null; + /** Human-readable label for the current phase of a connect attempt. */ + connectingStep: string; + // MoQ references (stored but not serialized) publish: Publish.Broadcast | null; watch: Watch.Broadcast | null; @@ -148,6 +153,8 @@ export const useStreamStore = create((set, get) => ({ isExternalRelay: false, errorMessage: '', configLoaded: false, + connectAbort: null, + connectingStep: '', // Active session state activeSessionId: null, @@ -235,7 +242,15 @@ export const useStreamStore = create((set, get) => ({ return false; } + // Abort any in-flight connect attempt so it doesn't later overwrite + // our state. This is defensive — normally disconnect() already aborted, + // but a rapid connect→connect without disconnect can still race. + state.connectAbort?.abort(); + + const abort = new AbortController(); set({ + connectAbort: abort, + connectingStep: '', status: 'connecting', errorMessage: '', watchStatus: decision.shouldWatch ? 'loading' : 'disabled', @@ -243,17 +258,23 @@ export const useStreamStore = create((set, get) => ({ cameraStatus: decision.shouldPublish && state.pipelineNeedsVideo ? 'requesting' : 'disabled', }); - return performConnect(state, decision, get, set); + return performConnect(state, decision, get, set, abort.signal); }, disconnect: () => { const state = get(); + // Cancel any in-flight connect attempt so its catch/success path + // doesn't later overwrite the freshly-reset store. + state.connectAbort?.abort(); + // Reuse the same teardown logic used when a connect attempt fails. cleanupConnectAttempt(state); set({ status: 'disconnected', + connectAbort: null, + connectingStep: '', isMicEnabled: false, micStatus: 'disabled', isCameraEnabled: false, diff --git a/ui/src/stores/streamStoreHelpers.test.ts b/ui/src/stores/streamStoreHelpers.test.ts index d3ca93a2..45f0e73b 100644 --- a/ui/src/stores/streamStoreHelpers.test.ts +++ b/ui/src/stores/streamStoreHelpers.test.ts @@ -299,6 +299,53 @@ describe('waitForSignalValue', () => { vi.useRealTimers(); }); + + it('should reject immediately when abortSignal is already aborted', async () => { + const signal = createMockSignal(0); + const abort = new AbortController(); + abort.abort(); + + await expect( + waitForSignalValue(signal, (v) => v > 0, 5_000, 'timeout', abort.signal) + ).rejects.toThrow('Aborted'); + }); + + it('should reject with AbortError when abortSignal fires during wait', async () => { + const signal = createMockSignal(0); + const abort = new AbortController(); + + const promise = waitForSignalValue(signal, (v) => v > 0, 5_000, 'timeout', abort.signal); + + // Abort before the signal value changes + abort.abort(); + + await expect(promise).rejects.toThrow('Aborted'); + }); + + it('should not reject on abort if predicate already matched', async () => { + const signal = createMockSignal(42); + const abort = new AbortController(); + + // Predicate matches initial value — resolves synchronously before abort + const value = await waitForSignalValue(signal, (v) => v === 42, 5_000, 'timeout', abort.signal); + expect(value).toBe(42); + + // Aborting after resolution should be harmless + abort.abort(); + }); + + it('should clean up subscription when abortSignal fires', async () => { + const signal = createMockSignal(0); + const abort = new AbortController(); + + const promise = waitForSignalValue(signal, (v) => v > 100, 5_000, 'timeout', abort.signal); + abort.abort(); + + await expect(promise).rejects.toThrow('Aborted'); + + // After abort, emitting new values should be harmless (no dangling listeners) + expect(() => signal.set(200)).not.toThrow(); + }); }); // --------------------------------------------------------------------------- diff --git a/ui/src/stores/streamStoreHelpers.ts b/ui/src/stores/streamStoreHelpers.ts index b80e7e6f..97244030 100644 --- a/ui/src/stores/streamStoreHelpers.ts +++ b/ui/src/stores/streamStoreHelpers.ts @@ -64,6 +64,9 @@ export interface ConnectableState { micStatus: MicStatus; cameraStatus: CameraStatus; watchStatus: WatchStatus; + /** Human-readable label for the current phase of a connect attempt + * (e.g. 'devices', 'relay', 'pipeline'). Empty when idle. */ + connectingStep: string; } type StateSetter = (partial: Partial) => void; @@ -89,8 +92,13 @@ export function waitForSignalValue( signal: Getter, predicate: (value: T) => boolean, timeoutMs: number, - timeoutMessage: string + timeoutMessage: string, + abortSignal?: AbortSignal ): Promise { + if (abortSignal?.aborted) { + return Promise.reject(new DOMException('Aborted', 'AbortError')); + } + const initial = signal.peek(); if (predicate(initial)) { return Promise.resolve(initial); @@ -98,15 +106,31 @@ export function waitForSignalValue( return new Promise((resolve, reject) => { let dispose: () => void = () => {}; - const timeoutId = setTimeout(() => { + + const cleanup = () => { + clearTimeout(timeoutId); dispose(); + }; + + const timeoutId = setTimeout(() => { + cleanup(); reject(new Error(timeoutMessage)); }, timeoutMs); + if (abortSignal) { + abortSignal.addEventListener( + 'abort', + () => { + cleanup(); + reject(new DOMException('Aborted', 'AbortError')); + }, + { once: true } + ); + } + dispose = signal.subscribe((value) => { if (predicate(value)) { - clearTimeout(timeoutId); - dispose(); + cleanup(); resolve(value); } }); @@ -122,20 +146,39 @@ export function waitForSignalValue( async function waitForBroadcastAnnouncement( connection: Hang.Moq.Connection.Reload, broadcastName: string, - timeoutMs = 15_000 + timeoutMs = 15_000, + abortSignal?: AbortSignal ): Promise { + if (abortSignal?.aborted) throw new DOMException('Aborted', 'AbortError'); const conn = connection.established.peek(); if (!conn) return; logger.info(`Waiting for broadcast '${broadcastName}' announcement...`); const announcements = conn.announced(); const deadline = Date.now() + timeoutMs; try { + // Build a promise that rejects when the abort signal fires so the + // Promise.race below reacts immediately instead of polling. + const abortPromise = abortSignal + ? new Promise((_, reject) => { + abortSignal.addEventListener( + 'abort', + () => reject(new DOMException('Aborted', 'AbortError')), + { once: true } + ); + }) + : null; + while (Date.now() < deadline) { const remaining = deadline - Date.now(); - const entry = await Promise.race([ + const racers: Promise[] = [ announcements.next(), new Promise((r) => setTimeout(() => r(null), remaining)), - ]); + ]; + if (abortPromise) racers.push(abortPromise); + + const entry = (await Promise.race(racers)) as Awaited< + ReturnType + > | null; if (!entry) break; if (entry.active && entry.path.toString() === broadcastName) { logger.info(`Broadcast '${broadcastName}' announced`); @@ -320,7 +363,8 @@ async function setupMediaSources( healthEffect: Effect, needsAudio: boolean, needsVideo: boolean, - set: StateSetter + set: StateSetter, + abortSignal?: AbortSignal ): Promise<{ microphone: Publish.Source.Microphone | null; camera: Publish.Source.Camera | null; @@ -346,7 +390,8 @@ async function setupMediaSources( camera.source, (v) => v !== undefined, 15_000, - 'Camera not available' + 'Camera not available', + abortSignal ); } catch (e) { shutdownMediaSource(camera); @@ -364,13 +409,20 @@ async function setupPublishPath( inputBroadcast: string, needsAudio: boolean, needsVideo: boolean, - set: StateSetter + set: StateSetter, + abortSignal?: AbortSignal ): Promise<{ microphone: Publish.Source.Microphone | null; camera: Publish.Source.Camera | null; publish: Publish.Broadcast; }> { - const { microphone, camera } = await setupMediaSources(healthEffect, needsAudio, needsVideo, set); + const { microphone, camera } = await setupMediaSources( + healthEffect, + needsAudio, + needsVideo, + set, + abortSignal + ); logger.info('Step 5: Creating publish broadcast'); const broadcastConfig: ConstructorParameters[0] = { @@ -398,7 +450,8 @@ async function setupPublishPath( publish.video.catalog, (v) => v !== undefined, 10_000, - 'Video encoder failed to initialize' + 'Video encoder failed to initialize', + abortSignal ); } catch (e) { publish.close(); @@ -499,32 +552,102 @@ function applyPublishResult( attempt.publish = result.publish; } +/** Create the MoQ connection and wire up the health-status sync effect. */ +function createConnectionAndHealth( + serverUrl: string, + moqToken: string, + get: () => ConnectableState, + set: StateSetter +): { connection: Hang.Moq.Connection.Reload; healthEffect: Effect } { + logger.info('Step 1: Creating connection to relay server'); + const url = new URL(serverUrl); + const jwt = moqToken.trim(); + if (jwt) { + url.searchParams.set('jwt', jwt); + } + + const connection = new Hang.Moq.Connection.Reload({ url, enabled: true }); + const healthEffect = new Effect(); + setupConnectionStatusSync(healthEffect, connection, get, set); + return { connection, healthEffect }; +} + +/** Wait for relay connection, optionally wait for broadcast announcement, then set up watch. */ +async function connectWatchPath( + attempt: ConnectAttempt, + state: ConnectableState, + decision: Extract, + set: StateSetter, + abortSignal: AbortSignal +): Promise { + set({ connectingStep: 'relay' }); + await waitForSignalValue( + attempt.connection!.established, + (value) => value !== undefined, + 12_000, + 'Timed out connecting to MoQ gateway.', + abortSignal + ); + + if (decision.shouldWatch) { + // When publishing to an external relay, the skit pipeline needs time to + // discover input tracks, build the graph, and start publishing output. + // Wait for the output broadcast to be announced on the relay before + // subscribing, otherwise the catalog subscribe gets RESET_STREAM. + // In gateway mode the skit server manages the peer connection directly, + // so no announcement polling is needed. + if (decision.shouldPublish && state.isExternalRelay) { + set({ connectingStep: 'pipeline' }); + await waitForBroadcastAnnouncement( + attempt.connection!, + state.outputBroadcast, + 15_000, + abortSignal + ); + } + + applyWatchResult( + attempt, + setupWatchPath( + attempt.healthEffect!, + attempt.connection!, + state.outputBroadcast, + state.pipelineOutputsAudio, + state.pipelineOutputsVideo, + set + ) + ); + } +} + /** Core connection logic extracted from the store for reduced complexity. */ export async function performConnect( state: ConnectableState, decision: Extract, get: () => ConnectableState & { outputBroadcast: string }, - set: StateSetter + set: StateSetter, + abortSignal: AbortSignal ): Promise { const attempt: ConnectAttempt = { ...NULL_MOQ_REFS }; try { - logger.info('Step 1: Creating connection to relay server'); - const url = new URL(decision.trimmedServerUrl); - const jwt = state.moqToken.trim(); - if (jwt) { - url.searchParams.set('jwt', jwt); - } + if (abortSignal.aborted) throw new DOMException('Aborted', 'AbortError'); - attempt.connection = new Hang.Moq.Connection.Reload({ url, enabled: true }); - attempt.healthEffect = new Effect(); - setupConnectionStatusSync(attempt.healthEffect, attempt.connection, get, set); + const { connection, healthEffect } = createConnectionAndHealth( + decision.trimmedServerUrl, + state.moqToken, + get, + set + ); + attempt.connection = connection; + attempt.healthEffect = healthEffect; // Set up publish BEFORE watch. For external relay pipelines (pub/sub), // the skit pipeline needs input data before it can publish output. // If we watch first, the subscribe to output/catalog.json fails with // RESET_STREAM because skit hasn't started publishing yet. if (decision.shouldPublish) { + set({ connectingStep: 'devices' }); applyPublishResult( attempt, await setupPublishPath( @@ -533,40 +656,19 @@ export async function performConnect( state.inputBroadcast, state.pipelineNeedsAudio, state.pipelineNeedsVideo, - set + set, + abortSignal ) ); } - await waitForSignalValue( - attempt.connection.established, - (value) => value !== undefined, - 12_000, - 'Timed out connecting to MoQ gateway.' - ); + await connectWatchPath(attempt, state, decision, set, abortSignal); - if (decision.shouldWatch) { - // When publishing to an external relay, the skit pipeline needs time to - // discover input tracks, build the graph, and start publishing output. - // Wait for the output broadcast to be announced on the relay before - // subscribing, otherwise the catalog subscribe gets RESET_STREAM. - // In gateway mode the skit server manages the peer connection directly, - // so no announcement polling is needed. - if (decision.shouldPublish && state.isExternalRelay) { - await waitForBroadcastAnnouncement(attempt.connection, state.outputBroadcast); - } - - applyWatchResult( - attempt, - setupWatchPath( - attempt.healthEffect, - attempt.connection, - state.outputBroadcast, - state.pipelineOutputsAudio, - state.pipelineOutputsVideo, - set - ) - ); + // If aborted between the last await and now, discard this attempt + // so we don't overwrite a newer connect's state. + if (abortSignal.aborted) { + cleanupConnectAttempt(attempt); + return false; } schedulePostConnectWarnings(decision, attempt, get, set); @@ -574,6 +676,7 @@ export async function performConnect( set({ ...attempt, status: 'connected', + connectingStep: '', isMicEnabled: decision.shouldPublish && state.pipelineNeedsAudio, isCameraEnabled: decision.shouldPublish && state.pipelineNeedsVideo, }); @@ -584,11 +687,18 @@ export async function performConnect( logger.info(`Connection established: ${modes.join(' and ')}`); return true; } catch (error) { - logger.error('Connection failed:', error); cleanupConnectAttempt(attempt); + // If this attempt was aborted (superseded by disconnect or a newer + // connect), silently discard — don't overwrite the store. + if (abortSignal.aborted) { + return false; + } + + logger.error('Connection failed:', error); set({ status: 'disconnected', + connectingStep: '', watchStatus: 'disabled', micStatus: 'disabled', cameraStatus: 'disabled', diff --git a/ui/src/views/StreamView.tsx b/ui/src/views/StreamView.tsx index dc10c804..f7a47106 100644 --- a/ui/src/views/StreamView.tsx +++ b/ui/src/views/StreamView.tsx @@ -269,6 +269,7 @@ const StreamView: React.FC = () => { watchStatus, pipelineNeedsAudio, pipelineNeedsVideo, + connectingStep, errorMessage, configLoaded, activeSessionId, @@ -309,6 +310,7 @@ const StreamView: React.FC = () => { watchStatus: s.watchStatus, pipelineNeedsAudio: s.pipelineNeedsAudio, pipelineNeedsVideo: s.pipelineNeedsVideo, + connectingStep: s.connectingStep, errorMessage: s.errorMessage, configLoaded: s.configLoaded, activeSessionId: s.activeSessionId, @@ -548,9 +550,15 @@ const StreamView: React.FC = () => { if (status === 'disconnected' && serverUrl.trim()) { void (async () => { try { - await connect(); + const ok = await connect(); + if (!ok) { + logger.warn('Auto-connect after session creation did not succeed'); + } } catch (error) { logger.error('MoQ connection attempt after session creation failed:', error); + viewState.setSessionCreationError( + error instanceof Error ? error.message : 'Connection failed after session creation' + ); } })(); } @@ -638,6 +646,12 @@ const StreamView: React.FC = () => { live: 'Watch: live', }; + const connectingStepText: Record = { + devices: 'Requesting device access', + relay: 'Connecting to relay', + pipeline: 'Waiting for pipeline', + }; + return ( { {(status === 'connecting' || status === 'connected') && (
- {status === 'connected' ? 'Relay: connected' : 'Relay: connecting…'} •{' '} - {watchStatusText[watchStatus]} + {status === 'connected' + ? 'Relay: connected' + : connectingStep + ? 'Connecting — ' + (connectingStepText[connectingStep] ?? connectingStep) + : 'Connecting…'}{' '} + • {watchStatusText[watchStatus]} {pipelineNeedsAudio && <> • {micStatusText[micStatus]}} {pipelineNeedsVideo && <> • {cameraStatusText[cameraStatus]}}