Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions src/services/streaming-manager/livekit-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jest.mock('livekit-client', () => ({
MediaDevicesError: 'MediaDevicesError',
EncryptionError: 'EncryptionError',
TrackSubscriptionFailed: 'TrackSubscriptionFailed',
TranscriptionReceived: 'TranscriptionReceived',
},
ConnectionState: {
Connecting: 'connecting',
Expand All @@ -67,6 +68,15 @@ jest.mock('../../api/streams/streamsApiV2', () => ({

jest.mock('../../config/environment', () => ({ didApiUrl: 'http://test-api.com' }));

const mockLatencyTimestampTrackerUpdate = jest.fn();
jest.mock('../analytics/timestamp-tracker', () => ({
latencyTimestampTracker: {
reset: jest.fn(),
update: (...args: any[]) => mockLatencyTimestampTrackerUpdate(...args),
get: jest.fn(),
},
}));

// Test constants
const TEST_AGENT_ID = 'agent123';
const TEST_TRACK_SID = 'track-sid-123';
Expand Down Expand Up @@ -124,6 +134,11 @@ function getDataReceivedHandler() {
return calls.length > 0 ? calls[calls.length - 1][1] : undefined;
}

function getTranscriptionReceivedHandler() {
const calls = mockRoom.on.mock.calls.filter((call: any[]) => call[0] === 'TranscriptionReceived');
return calls.length > 0 ? calls[calls.length - 1][1] : undefined;
}

async function simulateConnection(handlerIndex?: number) {
const handler = getConnectionStateHandler(handlerIndex);
if (handler) {
Expand All @@ -139,6 +154,7 @@ describe('LiveKit Streaming Manager - Microphone Stream', () => {

beforeEach(() => {
jest.clearAllMocks();
mockLatencyTimestampTrackerUpdate.mockClear();
agentId = TEST_AGENT_ID;
sessionOptions = {
chat_persist: true,
Expand Down Expand Up @@ -359,4 +375,86 @@ describe('LiveKit Streaming Manager - Microphone Stream', () => {
expect(mockOnAgentActivityStateChange).toHaveBeenNthCalledWith(2, AgentActivityState.Idle);
});
});

describe('Transcription Interrupt Detection', () => {
let mockOnInterruptDetected: jest.Mock;
let transcriptionHandler: (segments: any[], participant?: any) => void;
let dataHandler: (payload: Uint8Array, participant?: any, kind?: any, topic?: string) => void;

beforeEach(async () => {
mockOnInterruptDetected = jest.fn();
options.callbacks.onInterruptDetected = mockOnInterruptDetected;

await createLiveKitStreamingManager(agentId, sessionOptions, options);
await simulateConnection();

transcriptionHandler = getTranscriptionReceivedHandler();
dataHandler = getDataReceivedHandler();
});

it('should update latency tracker when local participant sends transcription', () => {
const localParticipant = { isLocal: true };
transcriptionHandler([], localParticipant);

expect(mockLatencyTimestampTrackerUpdate).toHaveBeenCalledTimes(1);
});

it('should not update latency tracker when remote participant sends transcription', () => {
const remoteParticipant = { isLocal: false };
transcriptionHandler([], remoteParticipant);

expect(mockLatencyTimestampTrackerUpdate).not.toHaveBeenCalled();
});

it('should not update latency tracker when participant is undefined', () => {
transcriptionHandler([]);

expect(mockLatencyTimestampTrackerUpdate).not.toHaveBeenCalled();
});

it('should detect interrupt and set state to Idle when local participant sends transcription during Talking state', () => {
const localParticipant = { isLocal: true };
const payload = Buffer.from(JSON.stringify({ subject: StreamEvents.StreamVideoCreated }));
dataHandler(payload);

transcriptionHandler([], localParticipant);

expect(mockLatencyTimestampTrackerUpdate).toHaveBeenCalledTimes(1);
expect(mockOnInterruptDetected).toHaveBeenCalledTimes(1);
expect(mockOnInterruptDetected).toHaveBeenCalledWith({ type: 'audio' });
});

it('should not detect interrupt when local participant sends transcription during Idle state', () => {
const localParticipant = { isLocal: true };
transcriptionHandler([], localParticipant);

expect(mockLatencyTimestampTrackerUpdate).toHaveBeenCalledTimes(1);
expect(mockOnInterruptDetected).not.toHaveBeenCalled();
});

it('should not detect interrupt when local participant sends transcription during Loading state', async () => {
const localParticipant = { isLocal: true };
const chatTranscribedPayload = Buffer.from(
JSON.stringify({ subject: StreamEvents.ChatAudioTranscribed, content: 'test', role: 'user' })
);
dataHandler(chatTranscribedPayload);
await new Promise(resolve => setTimeout(resolve, 0));

transcriptionHandler([], localParticipant);

expect(mockLatencyTimestampTrackerUpdate).toHaveBeenCalledTimes(1);
expect(mockOnInterruptDetected).not.toHaveBeenCalled();
});

it('should update latency tracker but not detect interrupt when remote participant sends transcription during Talking state', () => {
const remoteParticipant = { isLocal: false };
const payload = Buffer.from(JSON.stringify({ subject: StreamEvents.StreamVideoCreated }));
dataHandler(payload);

transcriptionHandler([], remoteParticipant);

expect(mockLatencyTimestampTrackerUpdate).not.toHaveBeenCalled();
expect(mockOnInterruptDetected).not.toHaveBeenCalled();
});
});
});
10 changes: 7 additions & 3 deletions src/services/streaming-manager/livekit-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
import { ChatProgress } from '@sdk/types/entities/agents/manager';
import { createStreamApiV2 } from '../../api/streams/streamsApiV2';
import { didApiUrl } from '../../config/environment';
import { latencyTimestampTracker } from '../analytics/timestamp-tracker';
import { createStreamingLogger, StreamingManager } from './common';

import type {
Expand Down Expand Up @@ -150,9 +151,12 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
callbacks.onConnectionStateChange?.(ConnectionState.New);

function handleTranscriptionReceived(_segments: TranscriptionSegment[], participant?: Participant): void {
if (participant?.isLocal && currentActivityState === AgentActivityState.Talking) {
callbacks.onInterruptDetected?.({ type: 'audio' });
currentActivityState = AgentActivityState.Idle;
if (participant?.isLocal) {
latencyTimestampTracker.update();
if (currentActivityState === AgentActivityState.Talking) {
callbacks.onInterruptDetected?.({ type: 'audio' });
currentActivityState = AgentActivityState.Idle;
}
}
}
try {
Expand Down