From 3a859998eac8e42e966b2cc3af1c724c2b90614a Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sun, 22 Mar 2026 20:14:27 +0000 Subject: [PATCH 1/3] fix: cancel in-flight connect on disconnect to prevent second-attempt bug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The core issue: disconnect() during an in-flight performConnect() could not cancel it. The local 'attempt' objects (Camera, Connection, etc.) lived in performConnect's stack frame and were never cleaned up by disconnect()'s cleanupConnectAttempt() call (which only saw the store's null refs). When the user started a new pipeline, performConnect₁ would eventually complete and overwrite the store — clobbering performConnect₂'s state, leaking resources, or flipping the UI back to 'disconnected'. Changes: - Add AbortController to stream store; abort on disconnect() and connect() - Thread AbortSignal through waitForSignalValue, waitForBroadcastAnnouncement, setupMediaSources, setupPublishPath, and performConnect - Aborted performConnect silently discards its attempt (no store writes) - Add connectingStep state for granular UX feedback during connect phases (devices → relay → pipeline) - Surface auto-connect errors in StreamView instead of silently logging - Extract createConnectionAndHealth() and connectWatchPath() helpers to keep performConnect under the max-statements lint threshold - Add tests for abort signal cancellation in waitForSignalValue and disconnect-during-connect in streamStore Co-Authored-By: Claudio Costa --- ui/src/stores/streamStore.test.ts | 34 ++++ ui/src/stores/streamStore.ts | 23 ++- ui/src/stores/streamStoreHelpers.test.ts | 47 ++++++ ui/src/stores/streamStoreHelpers.ts | 194 +++++++++++++++++------ ui/src/views/StreamView.tsx | 22 ++- 5 files changed, 266 insertions(+), 54 deletions(-) diff --git a/ui/src/stores/streamStore.test.ts b/ui/src/stores/streamStore.test.ts index 4a40674c..8059b1e4 100644 --- a/ui/src/stores/streamStore.test.ts +++ b/ui/src/stores/streamStore.test.ts @@ -149,6 +149,12 @@ describe('streamStore', () => { expect(state.activePipelineName).toBeNull(); }); + it('should have null connectAbort and empty connectingStep initially', () => { + const state = useStreamStore.getState(); + expect(state.connectAbort).toBeNull(); + expect(state.connectingStep).toBe(''); + }); + it('should have null MoQ references initially', () => { const state = useStreamStore.getState(); expect(state.publish).toBeNull(); @@ -497,6 +503,34 @@ describe('streamStore', () => { expect(useStreamStore.getState().status).toBe('disconnected'); }); + + it('should abort in-flight connect attempt on disconnect', () => { + const abort = new AbortController(); + const abortSpy = vi.spyOn(abort, 'abort'); + useStreamStore.setState({ + status: 'connecting', + connectAbort: abort, + }); + + useStreamStore.getState().disconnect(); + + expect(abortSpy).toHaveBeenCalled(); + const state = useStreamStore.getState(); + expect(state.status).toBe('disconnected'); + expect(state.connectAbort).toBeNull(); + expect(state.connectingStep).toBe(''); + }); + + it('should clear connectingStep on disconnect', () => { + useStreamStore.setState({ + status: 'connecting', + connectingStep: 'devices', + }); + + useStreamStore.getState().disconnect(); + + expect(useStreamStore.getState().connectingStep).toBe(''); + }); }); describe('connection state machine', () => { diff --git a/ui/src/stores/streamStore.ts b/ui/src/stores/streamStore.ts index 082268d7..c9cbc011 100644 --- a/ui/src/stores/streamStore.ts +++ b/ui/src/stores/streamStore.ts @@ -69,6 +69,11 @@ interface StreamState { activeSessionName: string | null; activePipelineName: string | null; + // Connect lifecycle — abort controller for the in-flight performConnect call + connectAbort: AbortController | null; + /** Human-readable label for the current phase of a connect attempt. */ + connectingStep: string; + // MoQ references (stored but not serialized) publish: Publish.Broadcast | null; watch: Watch.Broadcast | null; @@ -148,6 +153,8 @@ export const useStreamStore = create((set, get) => ({ isExternalRelay: false, errorMessage: '', configLoaded: false, + connectAbort: null, + connectingStep: '', // Active session state activeSessionId: null, @@ -235,7 +242,15 @@ export const useStreamStore = create((set, get) => ({ return false; } + // Abort any in-flight connect attempt so it doesn't later overwrite + // our state. This is defensive — normally disconnect() already aborted, + // but a rapid connect→connect without disconnect can still race. + state.connectAbort?.abort(); + + const abort = new AbortController(); set({ + connectAbort: abort, + connectingStep: '', status: 'connecting', errorMessage: '', watchStatus: decision.shouldWatch ? 'loading' : 'disabled', @@ -243,17 +258,23 @@ export const useStreamStore = create((set, get) => ({ cameraStatus: decision.shouldPublish && state.pipelineNeedsVideo ? 'requesting' : 'disabled', }); - return performConnect(state, decision, get, set); + return performConnect(state, decision, get, set, abort.signal); }, disconnect: () => { const state = get(); + // Cancel any in-flight connect attempt so its catch/success path + // doesn't later overwrite the freshly-reset store. + state.connectAbort?.abort(); + // Reuse the same teardown logic used when a connect attempt fails. cleanupConnectAttempt(state); set({ status: 'disconnected', + connectAbort: null, + connectingStep: '', isMicEnabled: false, micStatus: 'disabled', isCameraEnabled: false, diff --git a/ui/src/stores/streamStoreHelpers.test.ts b/ui/src/stores/streamStoreHelpers.test.ts index d3ca93a2..45f0e73b 100644 --- a/ui/src/stores/streamStoreHelpers.test.ts +++ b/ui/src/stores/streamStoreHelpers.test.ts @@ -299,6 +299,53 @@ describe('waitForSignalValue', () => { vi.useRealTimers(); }); + + it('should reject immediately when abortSignal is already aborted', async () => { + const signal = createMockSignal(0); + const abort = new AbortController(); + abort.abort(); + + await expect( + waitForSignalValue(signal, (v) => v > 0, 5_000, 'timeout', abort.signal) + ).rejects.toThrow('Aborted'); + }); + + it('should reject with AbortError when abortSignal fires during wait', async () => { + const signal = createMockSignal(0); + const abort = new AbortController(); + + const promise = waitForSignalValue(signal, (v) => v > 0, 5_000, 'timeout', abort.signal); + + // Abort before the signal value changes + abort.abort(); + + await expect(promise).rejects.toThrow('Aborted'); + }); + + it('should not reject on abort if predicate already matched', async () => { + const signal = createMockSignal(42); + const abort = new AbortController(); + + // Predicate matches initial value — resolves synchronously before abort + const value = await waitForSignalValue(signal, (v) => v === 42, 5_000, 'timeout', abort.signal); + expect(value).toBe(42); + + // Aborting after resolution should be harmless + abort.abort(); + }); + + it('should clean up subscription when abortSignal fires', async () => { + const signal = createMockSignal(0); + const abort = new AbortController(); + + const promise = waitForSignalValue(signal, (v) => v > 100, 5_000, 'timeout', abort.signal); + abort.abort(); + + await expect(promise).rejects.toThrow('Aborted'); + + // After abort, emitting new values should be harmless (no dangling listeners) + expect(() => signal.set(200)).not.toThrow(); + }); }); // --------------------------------------------------------------------------- diff --git a/ui/src/stores/streamStoreHelpers.ts b/ui/src/stores/streamStoreHelpers.ts index b80e7e6f..c1956144 100644 --- a/ui/src/stores/streamStoreHelpers.ts +++ b/ui/src/stores/streamStoreHelpers.ts @@ -64,6 +64,9 @@ export interface ConnectableState { micStatus: MicStatus; cameraStatus: CameraStatus; watchStatus: WatchStatus; + /** Human-readable label for the current phase of a connect attempt + * (e.g. 'devices', 'relay', 'pipeline'). Empty when idle. */ + connectingStep: string; } type StateSetter = (partial: Partial) => void; @@ -89,8 +92,13 @@ export function waitForSignalValue( signal: Getter, predicate: (value: T) => boolean, timeoutMs: number, - timeoutMessage: string + timeoutMessage: string, + abortSignal?: AbortSignal ): Promise { + if (abortSignal?.aborted) { + return Promise.reject(new DOMException('Aborted', 'AbortError')); + } + const initial = signal.peek(); if (predicate(initial)) { return Promise.resolve(initial); @@ -98,15 +106,31 @@ export function waitForSignalValue( return new Promise((resolve, reject) => { let dispose: () => void = () => {}; - const timeoutId = setTimeout(() => { + + const cleanup = () => { + clearTimeout(timeoutId); dispose(); + }; + + const timeoutId = setTimeout(() => { + cleanup(); reject(new Error(timeoutMessage)); }, timeoutMs); + if (abortSignal) { + abortSignal.addEventListener( + 'abort', + () => { + cleanup(); + reject(new DOMException('Aborted', 'AbortError')); + }, + { once: true } + ); + } + dispose = signal.subscribe((value) => { if (predicate(value)) { - clearTimeout(timeoutId); - dispose(); + cleanup(); resolve(value); } }); @@ -122,8 +146,10 @@ export function waitForSignalValue( async function waitForBroadcastAnnouncement( connection: Hang.Moq.Connection.Reload, broadcastName: string, - timeoutMs = 15_000 + timeoutMs = 15_000, + abortSignal?: AbortSignal ): Promise { + if (abortSignal?.aborted) throw new DOMException('Aborted', 'AbortError'); const conn = connection.established.peek(); if (!conn) return; logger.info(`Waiting for broadcast '${broadcastName}' announcement...`); @@ -131,6 +157,7 @@ async function waitForBroadcastAnnouncement( const deadline = Date.now() + timeoutMs; try { while (Date.now() < deadline) { + if (abortSignal?.aborted) throw new DOMException('Aborted', 'AbortError'); const remaining = deadline - Date.now(); const entry = await Promise.race([ announcements.next(), @@ -320,7 +347,8 @@ async function setupMediaSources( healthEffect: Effect, needsAudio: boolean, needsVideo: boolean, - set: StateSetter + set: StateSetter, + abortSignal?: AbortSignal ): Promise<{ microphone: Publish.Source.Microphone | null; camera: Publish.Source.Camera | null; @@ -346,7 +374,8 @@ async function setupMediaSources( camera.source, (v) => v !== undefined, 15_000, - 'Camera not available' + 'Camera not available', + abortSignal ); } catch (e) { shutdownMediaSource(camera); @@ -364,13 +393,20 @@ async function setupPublishPath( inputBroadcast: string, needsAudio: boolean, needsVideo: boolean, - set: StateSetter + set: StateSetter, + abortSignal?: AbortSignal ): Promise<{ microphone: Publish.Source.Microphone | null; camera: Publish.Source.Camera | null; publish: Publish.Broadcast; }> { - const { microphone, camera } = await setupMediaSources(healthEffect, needsAudio, needsVideo, set); + const { microphone, camera } = await setupMediaSources( + healthEffect, + needsAudio, + needsVideo, + set, + abortSignal + ); logger.info('Step 5: Creating publish broadcast'); const broadcastConfig: ConstructorParameters[0] = { @@ -398,7 +434,8 @@ async function setupPublishPath( publish.video.catalog, (v) => v !== undefined, 10_000, - 'Video encoder failed to initialize' + 'Video encoder failed to initialize', + abortSignal ); } catch (e) { publish.close(); @@ -499,32 +536,102 @@ function applyPublishResult( attempt.publish = result.publish; } +/** Create the MoQ connection and wire up the health-status sync effect. */ +function createConnectionAndHealth( + serverUrl: string, + moqToken: string, + get: () => ConnectableState, + set: StateSetter +): { connection: Hang.Moq.Connection.Reload; healthEffect: Effect } { + logger.info('Step 1: Creating connection to relay server'); + const url = new URL(serverUrl); + const jwt = moqToken.trim(); + if (jwt) { + url.searchParams.set('jwt', jwt); + } + + const connection = new Hang.Moq.Connection.Reload({ url, enabled: true }); + const healthEffect = new Effect(); + setupConnectionStatusSync(healthEffect, connection, get, set); + return { connection, healthEffect }; +} + +/** Wait for relay connection, optionally wait for broadcast announcement, then set up watch. */ +async function connectWatchPath( + attempt: ConnectAttempt, + state: ConnectableState, + decision: Extract, + set: StateSetter, + abortSignal: AbortSignal +): Promise { + set({ connectingStep: 'relay' }); + await waitForSignalValue( + attempt.connection!.established, + (value) => value !== undefined, + 12_000, + 'Timed out connecting to MoQ gateway.', + abortSignal + ); + + if (decision.shouldWatch) { + // When publishing to an external relay, the skit pipeline needs time to + // discover input tracks, build the graph, and start publishing output. + // Wait for the output broadcast to be announced on the relay before + // subscribing, otherwise the catalog subscribe gets RESET_STREAM. + // In gateway mode the skit server manages the peer connection directly, + // so no announcement polling is needed. + if (decision.shouldPublish && state.isExternalRelay) { + set({ connectingStep: 'pipeline' }); + await waitForBroadcastAnnouncement( + attempt.connection!, + state.outputBroadcast, + 15_000, + abortSignal + ); + } + + applyWatchResult( + attempt, + setupWatchPath( + attempt.healthEffect!, + attempt.connection!, + state.outputBroadcast, + state.pipelineOutputsAudio, + state.pipelineOutputsVideo, + set + ) + ); + } +} + /** Core connection logic extracted from the store for reduced complexity. */ export async function performConnect( state: ConnectableState, decision: Extract, get: () => ConnectableState & { outputBroadcast: string }, - set: StateSetter + set: StateSetter, + abortSignal: AbortSignal ): Promise { const attempt: ConnectAttempt = { ...NULL_MOQ_REFS }; try { - logger.info('Step 1: Creating connection to relay server'); - const url = new URL(decision.trimmedServerUrl); - const jwt = state.moqToken.trim(); - if (jwt) { - url.searchParams.set('jwt', jwt); - } + if (abortSignal.aborted) throw new DOMException('Aborted', 'AbortError'); - attempt.connection = new Hang.Moq.Connection.Reload({ url, enabled: true }); - attempt.healthEffect = new Effect(); - setupConnectionStatusSync(attempt.healthEffect, attempt.connection, get, set); + const { connection, healthEffect } = createConnectionAndHealth( + decision.trimmedServerUrl, + state.moqToken, + get, + set + ); + attempt.connection = connection; + attempt.healthEffect = healthEffect; // Set up publish BEFORE watch. For external relay pipelines (pub/sub), // the skit pipeline needs input data before it can publish output. // If we watch first, the subscribe to output/catalog.json fails with // RESET_STREAM because skit hasn't started publishing yet. if (decision.shouldPublish) { + set({ connectingStep: 'devices' }); applyPublishResult( attempt, await setupPublishPath( @@ -533,40 +640,19 @@ export async function performConnect( state.inputBroadcast, state.pipelineNeedsAudio, state.pipelineNeedsVideo, - set + set, + abortSignal ) ); } - await waitForSignalValue( - attempt.connection.established, - (value) => value !== undefined, - 12_000, - 'Timed out connecting to MoQ gateway.' - ); - - if (decision.shouldWatch) { - // When publishing to an external relay, the skit pipeline needs time to - // discover input tracks, build the graph, and start publishing output. - // Wait for the output broadcast to be announced on the relay before - // subscribing, otherwise the catalog subscribe gets RESET_STREAM. - // In gateway mode the skit server manages the peer connection directly, - // so no announcement polling is needed. - if (decision.shouldPublish && state.isExternalRelay) { - await waitForBroadcastAnnouncement(attempt.connection, state.outputBroadcast); - } + await connectWatchPath(attempt, state, decision, set, abortSignal); - applyWatchResult( - attempt, - setupWatchPath( - attempt.healthEffect, - attempt.connection, - state.outputBroadcast, - state.pipelineOutputsAudio, - state.pipelineOutputsVideo, - set - ) - ); + // If aborted between the last await and now, discard this attempt + // so we don't overwrite a newer connect's state. + if (abortSignal.aborted) { + cleanupConnectAttempt(attempt); + return false; } schedulePostConnectWarnings(decision, attempt, get, set); @@ -574,6 +660,7 @@ export async function performConnect( set({ ...attempt, status: 'connected', + connectingStep: '', isMicEnabled: decision.shouldPublish && state.pipelineNeedsAudio, isCameraEnabled: decision.shouldPublish && state.pipelineNeedsVideo, }); @@ -584,11 +671,18 @@ export async function performConnect( logger.info(`Connection established: ${modes.join(' and ')}`); return true; } catch (error) { - logger.error('Connection failed:', error); cleanupConnectAttempt(attempt); + // If this attempt was aborted (superseded by disconnect or a newer + // connect), silently discard — don't overwrite the store. + if (abortSignal.aborted) { + return false; + } + + logger.error('Connection failed:', error); set({ status: 'disconnected', + connectingStep: '', watchStatus: 'disabled', micStatus: 'disabled', cameraStatus: 'disabled', diff --git a/ui/src/views/StreamView.tsx b/ui/src/views/StreamView.tsx index dc10c804..1459d6ce 100644 --- a/ui/src/views/StreamView.tsx +++ b/ui/src/views/StreamView.tsx @@ -269,6 +269,7 @@ const StreamView: React.FC = () => { watchStatus, pipelineNeedsAudio, pipelineNeedsVideo, + connectingStep, errorMessage, configLoaded, activeSessionId, @@ -309,6 +310,7 @@ const StreamView: React.FC = () => { watchStatus: s.watchStatus, pipelineNeedsAudio: s.pipelineNeedsAudio, pipelineNeedsVideo: s.pipelineNeedsVideo, + connectingStep: s.connectingStep, errorMessage: s.errorMessage, configLoaded: s.configLoaded, activeSessionId: s.activeSessionId, @@ -548,9 +550,15 @@ const StreamView: React.FC = () => { if (status === 'disconnected' && serverUrl.trim()) { void (async () => { try { - await connect(); + const ok = await connect(); + if (!ok) { + logger.warn('Auto-connect after session creation did not succeed'); + } } catch (error) { logger.error('MoQ connection attempt after session creation failed:', error); + viewState.setSessionCreationError( + error instanceof Error ? error.message : 'Connection failed after session creation' + ); } })(); } @@ -638,6 +646,12 @@ const StreamView: React.FC = () => { live: 'Watch: live', }; + const connectingStepText: Record = { + devices: 'Requesting device access', + relay: 'Connecting to relay', + pipeline: 'Waiting for pipeline', + }; + return ( { {(status === 'connecting' || status === 'connected') && (
- {status === 'connected' ? 'Relay: connected' : 'Relay: connecting…'} •{' '} - {watchStatusText[watchStatus]} + {status === 'connected' + ? 'Relay: connected' + : connectingStep ? 'Connecting — ' + (connectingStepText[connectingStep] ?? connectingStep) : 'Connecting…'}{' '} + • {watchStatusText[watchStatus]} {pipelineNeedsAudio && <> • {micStatusText[micStatus]}} {pipelineNeedsVideo && <> • {cameraStatusText[cameraStatus]}}
From b7cadaab306c8511f7259815c3b39279babe4a33 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sun, 22 Mar 2026 20:14:43 +0000 Subject: [PATCH 2/3] style: format StreamView.tsx with prettier Co-Authored-By: Claudio Costa --- ui/src/views/StreamView.tsx | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ui/src/views/StreamView.tsx b/ui/src/views/StreamView.tsx index 1459d6ce..f7a47106 100644 --- a/ui/src/views/StreamView.tsx +++ b/ui/src/views/StreamView.tsx @@ -860,7 +860,9 @@ const StreamView: React.FC = () => {
{status === 'connected' ? 'Relay: connected' - : connectingStep ? 'Connecting — ' + (connectingStepText[connectingStep] ?? connectingStep) : 'Connecting…'}{' '} + : connectingStep + ? 'Connecting — ' + (connectingStepText[connectingStep] ?? connectingStep) + : 'Connecting…'}{' '} • {watchStatusText[watchStatus]} {pipelineNeedsAudio && <> • {micStatusText[micStatus]}} {pipelineNeedsVideo && <> • {cameraStatusText[cameraStatus]}} From 94828870ff63b9f4109b53e89d08e6f82bebb9db Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sun, 22 Mar 2026 20:22:13 +0000 Subject: [PATCH 3/3] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20reset?= =?UTF-8?q?=20new=20fields=20in=20beforeEach,=20event-driven=20abort=20in?= =?UTF-8?q?=20waitForBroadcastAnnouncement?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add connectAbort and connectingStep to the beforeEach setState reset to prevent test pollution across runs. - Replace polling abort check in waitForBroadcastAnnouncement with an event-driven abortPromise in Promise.race so abort fires immediately even when announcements.next() is blocked. Co-Authored-By: Claudio Costa --- ui/src/stores/streamStore.test.ts | 2 ++ ui/src/stores/streamStoreHelpers.ts | 22 +++++++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/ui/src/stores/streamStore.test.ts b/ui/src/stores/streamStore.test.ts index 8059b1e4..3ea26272 100644 --- a/ui/src/stores/streamStore.test.ts +++ b/ui/src/stores/streamStore.test.ts @@ -98,6 +98,8 @@ describe('streamStore', () => { pipelineOutputsVideo: true, errorMessage: '', configLoaded: false, + connectAbort: null, + connectingStep: '', activeSessionId: null, activeSessionName: null, activePipelineName: null, diff --git a/ui/src/stores/streamStoreHelpers.ts b/ui/src/stores/streamStoreHelpers.ts index c1956144..97244030 100644 --- a/ui/src/stores/streamStoreHelpers.ts +++ b/ui/src/stores/streamStoreHelpers.ts @@ -156,13 +156,29 @@ async function waitForBroadcastAnnouncement( const announcements = conn.announced(); const deadline = Date.now() + timeoutMs; try { + // Build a promise that rejects when the abort signal fires so the + // Promise.race below reacts immediately instead of polling. + const abortPromise = abortSignal + ? new Promise((_, reject) => { + abortSignal.addEventListener( + 'abort', + () => reject(new DOMException('Aborted', 'AbortError')), + { once: true } + ); + }) + : null; + while (Date.now() < deadline) { - if (abortSignal?.aborted) throw new DOMException('Aborted', 'AbortError'); const remaining = deadline - Date.now(); - const entry = await Promise.race([ + const racers: Promise[] = [ announcements.next(), new Promise((r) => setTimeout(() => r(null), remaining)), - ]); + ]; + if (abortPromise) racers.push(abortPromise); + + const entry = (await Promise.race(racers)) as Awaited< + ReturnType + > | null; if (!entry) break; if (entry.active && entry.path.toString() === broadcastName) { logger.info(`Broadcast '${broadcastName}' announced`);