Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions ui/src/stores/streamStore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ describe('streamStore', () => {
pipelineOutputsVideo: true,
errorMessage: '',
configLoaded: false,
connectAbort: null,
connectingStep: '',
activeSessionId: null,
activeSessionName: null,
activePipelineName: null,
Expand Down Expand Up @@ -149,6 +151,12 @@ describe('streamStore', () => {
expect(state.activePipelineName).toBeNull();
});

it('should have null connectAbort and empty connectingStep initially', () => {
const state = useStreamStore.getState();
expect(state.connectAbort).toBeNull();
expect(state.connectingStep).toBe('');
});

it('should have null MoQ references initially', () => {
const state = useStreamStore.getState();
expect(state.publish).toBeNull();
Expand Down Expand Up @@ -497,6 +505,34 @@ describe('streamStore', () => {

expect(useStreamStore.getState().status).toBe('disconnected');
});

it('should abort in-flight connect attempt on disconnect', () => {
const abort = new AbortController();
const abortSpy = vi.spyOn(abort, 'abort');
useStreamStore.setState({
status: 'connecting',
connectAbort: abort,
});

useStreamStore.getState().disconnect();

expect(abortSpy).toHaveBeenCalled();
const state = useStreamStore.getState();
expect(state.status).toBe('disconnected');
expect(state.connectAbort).toBeNull();
expect(state.connectingStep).toBe('');
});

it('should clear connectingStep on disconnect', () => {
useStreamStore.setState({
status: 'connecting',
connectingStep: 'devices',
});

useStreamStore.getState().disconnect();

expect(useStreamStore.getState().connectingStep).toBe('');
});
});

describe('connection state machine', () => {
Expand Down
23 changes: 22 additions & 1 deletion ui/src/stores/streamStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ interface StreamState {
activeSessionName: string | null;
activePipelineName: string | null;

// Connect lifecycle — abort controller for the in-flight performConnect call
connectAbort: AbortController | null;
/** Human-readable label for the current phase of a connect attempt. */
connectingStep: string;

// MoQ references (stored but not serialized)
publish: Publish.Broadcast | null;
watch: Watch.Broadcast | null;
Expand Down Expand Up @@ -148,6 +153,8 @@ export const useStreamStore = create<StreamState>((set, get) => ({
isExternalRelay: false,
errorMessage: '',
configLoaded: false,
connectAbort: null,
connectingStep: '',

// Active session state
activeSessionId: null,
Expand Down Expand Up @@ -235,25 +242,39 @@ export const useStreamStore = create<StreamState>((set, get) => ({
return false;
}

// Abort any in-flight connect attempt so it doesn't later overwrite
// our state. This is defensive — normally disconnect() already aborted,
// but a rapid connect→connect without disconnect can still race.
state.connectAbort?.abort();

const abort = new AbortController();
set({
connectAbort: abort,
connectingStep: '',
status: 'connecting',
errorMessage: '',
watchStatus: decision.shouldWatch ? 'loading' : 'disabled',
micStatus: decision.shouldPublish && state.pipelineNeedsAudio ? 'requesting' : 'disabled',
cameraStatus: decision.shouldPublish && state.pipelineNeedsVideo ? 'requesting' : 'disabled',
});

return performConnect(state, decision, get, set);
return performConnect(state, decision, get, set, abort.signal);
},

disconnect: () => {
const state = get();

// Cancel any in-flight connect attempt so its catch/success path
// doesn't later overwrite the freshly-reset store.
state.connectAbort?.abort();

// Reuse the same teardown logic used when a connect attempt fails.
cleanupConnectAttempt(state);

set({
status: 'disconnected',
connectAbort: null,
connectingStep: '',
isMicEnabled: false,
micStatus: 'disabled',
isCameraEnabled: false,
Expand Down
47 changes: 47 additions & 0 deletions ui/src/stores/streamStoreHelpers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,53 @@ describe('waitForSignalValue', () => {

vi.useRealTimers();
});

it('should reject immediately when abortSignal is already aborted', async () => {
const signal = createMockSignal(0);
const abort = new AbortController();
abort.abort();

await expect(
waitForSignalValue(signal, (v) => v > 0, 5_000, 'timeout', abort.signal)
).rejects.toThrow('Aborted');
});

it('should reject with AbortError when abortSignal fires during wait', async () => {
const signal = createMockSignal(0);
const abort = new AbortController();

const promise = waitForSignalValue(signal, (v) => v > 0, 5_000, 'timeout', abort.signal);

// Abort before the signal value changes
abort.abort();

await expect(promise).rejects.toThrow('Aborted');
});

it('should not reject on abort if predicate already matched', async () => {
const signal = createMockSignal(42);
const abort = new AbortController();

// Predicate matches initial value — resolves synchronously before abort
const value = await waitForSignalValue(signal, (v) => v === 42, 5_000, 'timeout', abort.signal);
expect(value).toBe(42);

// Aborting after resolution should be harmless
abort.abort();
});

it('should clean up subscription when abortSignal fires', async () => {
const signal = createMockSignal<number>(0);
const abort = new AbortController();

const promise = waitForSignalValue(signal, (v) => v > 100, 5_000, 'timeout', abort.signal);
abort.abort();

await expect(promise).rejects.toThrow('Aborted');

// After abort, emitting new values should be harmless (no dangling listeners)
expect(() => signal.set(200)).not.toThrow();
});
});

// ---------------------------------------------------------------------------
Expand Down
Loading