Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 62 additions & 43 deletions src/services/streaming-manager/livekit-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,28 @@ jest.mock('../../api/streams/streamsApiV2', () => ({

jest.mock('../../config/environment', () => ({ didApiUrl: 'http://test-api.com' }));

// Mock createVideoStatsMonitor
// Mock VideoStatsMonitor
const mockVideoStatsMonitor = {
start: jest.fn(),
stop: jest.fn(),
getReport: jest.fn(() => ({})),
_onVideoStateChange: null as any | null,
invokeStateChange(state: StreamingState, report?: unknown) {
this._onVideoStateChange?.(state, report);
},
};

jest.mock('./stats/poll', () => ({
createVideoStatsMonitor: jest.fn(() => mockVideoStatsMonitor),
createVideoStatsMonitor: jest.fn(
(_getStats: unknown, _getIsConnected: unknown, _onConnected: unknown, onVideoStateChange: any) => {
mockVideoStatsMonitor._onVideoStateChange = onVideoStateChange;
return {
start: mockVideoStatsMonitor.start,
stop: mockVideoStatsMonitor.stop,
getReport: mockVideoStatsMonitor.getReport,
};
}
),
}));

const mockLatencyTimestampTrackerUpdate = jest.fn();
Expand All @@ -99,7 +113,6 @@ const TEST_AGENT_ID = 'agent123';
const TEST_TRACK_SID = 'track-sid-123';
const TEST_AUDIO_TRACK_ID = 'audio-track-1';
const TEST_AUDIO_TRACK_ID_2 = 'audio-track-2';
const TEST_SESSION_ID = 'session-123';
const ASYNC_WAIT_TIME = 10;

// Helper functions to create mock objects
Expand Down Expand Up @@ -539,75 +552,81 @@ describe('LiveKit Streaming Manager - Microphone Stream', () => {
expect(mockVideoStatsMonitor.start).not.toHaveBeenCalled();
});

it('should stop video stats monitor when video track is unsubscribed', async () => {
it('should call getReport and onVideoStateChange with Stop and report when video track is unsubscribed', async () => {
const onVideoStateChange = jest.fn();
const report = { duration: 1000 };
options.callbacks.onVideoStateChange = onVideoStateChange;
mockVideoStatsMonitor.getReport.mockReturnValue(report);

await createLiveKitStreamingManager(agentId, sessionOptions, options);
await simulateConnection();

const trackSubscribedHandler = getTrackSubscribedHandler();
const trackUnsubscribedHandler = getTrackUnsubscribedHandler();
const mockVideoTrack = createMockVideoTrack();
const mockParticipant = createMockRemoteParticipant();
getTrackSubscribedHandler()(mockVideoTrack, {}, createMockRemoteParticipant());
mockVideoStatsMonitor.invokeStateChange(StreamingState.Start);
onVideoStateChange.mockClear();
mockVideoStatsMonitor.getReport.mockClear();

trackSubscribedHandler(mockVideoTrack, {}, mockParticipant);
expect(mockVideoStatsMonitor.start).toHaveBeenCalledTimes(1);
trackUnsubscribedHandler(mockVideoTrack, {}, mockParticipant);
getTrackUnsubscribedHandler()(mockVideoTrack, {}, createMockRemoteParticipant());

expect(mockVideoStatsMonitor.getReport).toHaveBeenCalledTimes(1);
expect(mockVideoStatsMonitor.stop).toHaveBeenCalledTimes(1);
expect(onVideoStateChange).toHaveBeenCalledTimes(1);
expect(onVideoStateChange).toHaveBeenCalledWith(StreamingState.Stop, report);
});

it('should get report from video stats monitor when video track is unsubscribed', async () => {
it('should call onVideoStateChange with Start when videoStatsMonitor callback is invoked with Start', async () => {
const onVideoStateChange = jest.fn();
options.callbacks.onVideoStateChange = onVideoStateChange;

await createLiveKitStreamingManager(agentId, sessionOptions, options);
await simulateConnection();
const mockReport = { duration: 1000 };
mockVideoStatsMonitor.getReport.mockReturnValue(mockReport);
getTrackSubscribedHandler()(createMockVideoTrack(), {}, createMockRemoteParticipant());

const trackSubscribedHandler = getTrackSubscribedHandler();
const trackUnsubscribedHandler = getTrackUnsubscribedHandler();
const mockVideoTrack = createMockVideoTrack();
const mockParticipant = createMockRemoteParticipant();
onVideoStateChange.mockClear();
mockVideoStatsMonitor.invokeStateChange(StreamingState.Start);

trackSubscribedHandler(mockVideoTrack, {}, mockParticipant);
trackUnsubscribedHandler(mockVideoTrack, {}, mockParticipant);

expect(mockVideoStatsMonitor.getReport).toHaveBeenCalledTimes(1);
expect(onVideoStateChange).toHaveBeenCalledTimes(1);
expect(onVideoStateChange).toHaveBeenCalledWith(StreamingState.Start);
});

it('should call onVideoStateChange with Start when video track is subscribed', async () => {
const mockOnVideoStateChange = jest.fn();
options.callbacks.onVideoStateChange = mockOnVideoStateChange;
it('should call onVideoStateChange with Stop and report when videoStatsMonitor callback is invoked with Stop', async () => {
const onVideoStateChange = jest.fn();
const report = { duration: 1000 };
options.callbacks.onVideoStateChange = onVideoStateChange;

await createLiveKitStreamingManager(agentId, sessionOptions, options);
await simulateConnection();
getTrackSubscribedHandler()(createMockVideoTrack(), {}, createMockRemoteParticipant());
mockVideoStatsMonitor.invokeStateChange(StreamingState.Start);

const trackSubscribedHandler = getTrackSubscribedHandler();
const mockVideoTrack = createMockVideoTrack();
const mockParticipant = createMockRemoteParticipant();

trackSubscribedHandler(mockVideoTrack, {}, mockParticipant);
onVideoStateChange.mockClear();
mockVideoStatsMonitor.invokeStateChange(StreamingState.Stop, report);

expect(mockOnVideoStateChange).toHaveBeenCalledWith(StreamingState.Start);
expect(mockVideoStatsMonitor.getReport).not.toHaveBeenCalled();
expect(onVideoStateChange).toHaveBeenCalledTimes(1);
expect(onVideoStateChange).toHaveBeenCalledWith(StreamingState.Stop, report);
});

it('should call onVideoStateChange with Stop and report when video track is unsubscribed', async () => {
const mockOnVideoStateChange = jest.fn();
options.callbacks.onVideoStateChange = mockOnVideoStateChange;
const mockReport = { duration: 1000 };
mockVideoStatsMonitor.getReport.mockReturnValue(mockReport);
it('should not call onVideoStateChange(Stop) twice when monitor Stop and track unsubscribed both occur', async () => {
const onVideoStateChange = jest.fn();
const report = { duration: 1000 };
options.callbacks.onVideoStateChange = onVideoStateChange;
mockVideoStatsMonitor.getReport.mockReturnValue(report);

await createLiveKitStreamingManager(agentId, sessionOptions, options);
await simulateConnection();

const trackSubscribedHandler = getTrackSubscribedHandler();
const trackUnsubscribedHandler = getTrackUnsubscribedHandler();
const mockVideoTrack = createMockVideoTrack();
const mockParticipant = createMockRemoteParticipant();

trackSubscribedHandler(mockVideoTrack, {}, mockParticipant);
mockOnVideoStateChange.mockClear();
getTrackSubscribedHandler()(mockVideoTrack, {}, createMockRemoteParticipant());
mockVideoStatsMonitor.invokeStateChange(StreamingState.Start);
onVideoStateChange.mockClear();

trackUnsubscribedHandler(mockVideoTrack, {}, mockParticipant);
mockVideoStatsMonitor.invokeStateChange(StreamingState.Stop, report);
getTrackUnsubscribedHandler()(mockVideoTrack, {}, createMockRemoteParticipant());

expect(mockOnVideoStateChange).toHaveBeenCalledWith(StreamingState.Stop, mockReport);
expect(onVideoStateChange).toHaveBeenCalledTimes(1);
expect(onVideoStateChange).toHaveBeenCalledWith(StreamingState.Stop, report);
});
});

Expand Down
35 changes: 29 additions & 6 deletions src/services/streaming-manager/livekit-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import type {
TranscriptionSegment,
} from 'livekit-client';
import { createVideoStatsMonitor } from './stats/poll';
import { VideoRTCStatsReport } from './stats/report';

async function importLiveKit(): Promise<{
Room: typeof Room;
Expand Down Expand Up @@ -99,6 +100,7 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
let sharedMediaStream: MediaStream | null = null;
let microphonePublication: LocalTrackPublication | null = null;
let videoStatsMonitor: ReturnType<typeof createVideoStatsMonitor> | null = null;
let videoStreamingState: StreamingState | null = null;
// We defer Connected until video track is subscribed to align with WebRTC behavior
let hasEmittedConnected = false;

Expand Down Expand Up @@ -227,6 +229,26 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
disconnect('livekit:participant-disconnected');
}

function handleVideoStarted() {
if (videoStreamingState === StreamingState.Start) {
return;
}

log('CALLBACK: onVideoStateChange(Start)');
videoStreamingState = StreamingState.Start;
callbacks.onVideoStateChange?.(StreamingState.Start);
}

function handleVideoStopped(report?: VideoRTCStatsReport) {
if (videoStreamingState === StreamingState.Stop) {
return;
}

log('CALLBACK: onVideoStateChange(Stop)');
videoStreamingState = StreamingState.Stop;
callbacks.onVideoStateChange?.(StreamingState.Stop, report);
}

function handleTrackSubscribed(track: RemoteTrack, publication: any, participant: RemoteParticipant): void {
log(`Track subscribed: ${track.kind} from ${participant.identity}`);

Expand Down Expand Up @@ -260,14 +282,17 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
log('CALLBACK: onConnectionStateChange(Connected)');
callbacks.onConnectionStateChange?.(ConnectionState.Connected, 'livekit:track-subscribed');
}
log('CALLBACK: onVideoStateChange(Start)');
callbacks.onVideoStateChange?.(StreamingState.Start);
videoStatsMonitor = createVideoStatsMonitor(
() => track.getRTCStatsReport(),
() => isConnected,
noop,
(state, _report) => {
(state, report) => {
log(`Video state change: ${state}`);
if (state === StreamingState.Start) {
handleVideoStarted();
} else if (state === StreamingState.Stop) {
handleVideoStopped(report);
}
}
);
videoStatsMonitor.start();
Expand All @@ -278,11 +303,9 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
log(`Track unsubscribed: ${track.kind} from ${participant.identity}`);

if (track.kind === 'video') {
const report = videoStatsMonitor?.getReport();
handleVideoStopped(videoStatsMonitor?.getReport());
videoStatsMonitor?.stop();
videoStatsMonitor = null;

callbacks.onVideoStateChange?.(StreamingState.Stop, report);
}
}

Expand Down
Loading