From 1b4435d3d6e6c3a1e998cc2951c0602e382ac7bd Mon Sep 17 00:00:00 2001 From: Ofek Simhi <158498125+osimhi213@users.noreply.github.com> Date: Sun, 25 Jan 2026 14:09:17 +0200 Subject: [PATCH] Merge pull request #286 from de-id/bugfix/latency-metric fix latency metric tracker for expressive --- .../streaming-manager/livekit-manager.test.ts | 98 +++++++++++++++++++ .../streaming-manager/livekit-manager.ts | 10 +- 2 files changed, 105 insertions(+), 3 deletions(-) diff --git a/src/services/streaming-manager/livekit-manager.test.ts b/src/services/streaming-manager/livekit-manager.test.ts index 7c085541..a9ff9d6f 100644 --- a/src/services/streaming-manager/livekit-manager.test.ts +++ b/src/services/streaming-manager/livekit-manager.test.ts @@ -41,6 +41,7 @@ jest.mock('livekit-client', () => ({ MediaDevicesError: 'MediaDevicesError', EncryptionError: 'EncryptionError', TrackSubscriptionFailed: 'TrackSubscriptionFailed', + TranscriptionReceived: 'TranscriptionReceived', }, ConnectionState: { Connecting: 'connecting', @@ -67,6 +68,15 @@ jest.mock('../../api/streams/streamsApiV2', () => ({ jest.mock('../../config/environment', () => ({ didApiUrl: 'http://test-api.com' })); +const mockLatencyTimestampTrackerUpdate = jest.fn(); +jest.mock('../analytics/timestamp-tracker', () => ({ + latencyTimestampTracker: { + reset: jest.fn(), + update: (...args: any[]) => mockLatencyTimestampTrackerUpdate(...args), + get: jest.fn(), + }, +})); + // Test constants const TEST_AGENT_ID = 'agent123'; const TEST_TRACK_SID = 'track-sid-123'; @@ -124,6 +134,11 @@ function getDataReceivedHandler() { return calls.length > 0 ? calls[calls.length - 1][1] : undefined; } +function getTranscriptionReceivedHandler() { + const calls = mockRoom.on.mock.calls.filter((call: any[]) => call[0] === 'TranscriptionReceived'); + return calls.length > 0 ? calls[calls.length - 1][1] : undefined; +} + async function simulateConnection(handlerIndex?: number) { const handler = getConnectionStateHandler(handlerIndex); if (handler) { @@ -139,6 +154,7 @@ describe('LiveKit Streaming Manager - Microphone Stream', () => { beforeEach(() => { jest.clearAllMocks(); + mockLatencyTimestampTrackerUpdate.mockClear(); agentId = TEST_AGENT_ID; sessionOptions = { chat_persist: true, @@ -359,4 +375,86 @@ describe('LiveKit Streaming Manager - Microphone Stream', () => { expect(mockOnAgentActivityStateChange).toHaveBeenNthCalledWith(2, AgentActivityState.Idle); }); }); + + describe('Transcription Interrupt Detection', () => { + let mockOnInterruptDetected: jest.Mock; + let transcriptionHandler: (segments: any[], participant?: any) => void; + let dataHandler: (payload: Uint8Array, participant?: any, kind?: any, topic?: string) => void; + + beforeEach(async () => { + mockOnInterruptDetected = jest.fn(); + options.callbacks.onInterruptDetected = mockOnInterruptDetected; + + await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + transcriptionHandler = getTranscriptionReceivedHandler(); + dataHandler = getDataReceivedHandler(); + }); + + it('should update latency tracker when local participant sends transcription', () => { + const localParticipant = { isLocal: true }; + transcriptionHandler([], localParticipant); + + expect(mockLatencyTimestampTrackerUpdate).toHaveBeenCalledTimes(1); + }); + + it('should not update latency tracker when remote participant sends transcription', () => { + const remoteParticipant = { isLocal: false }; + transcriptionHandler([], remoteParticipant); + + expect(mockLatencyTimestampTrackerUpdate).not.toHaveBeenCalled(); + }); + + it('should not update latency tracker when participant is undefined', () => { + transcriptionHandler([]); + + expect(mockLatencyTimestampTrackerUpdate).not.toHaveBeenCalled(); + }); + + it('should detect interrupt and set state to Idle when local participant sends transcription during Talking state', () => { + const localParticipant = { isLocal: true }; + const payload = Buffer.from(JSON.stringify({ subject: StreamEvents.StreamVideoCreated })); + dataHandler(payload); + + transcriptionHandler([], localParticipant); + + expect(mockLatencyTimestampTrackerUpdate).toHaveBeenCalledTimes(1); + expect(mockOnInterruptDetected).toHaveBeenCalledTimes(1); + expect(mockOnInterruptDetected).toHaveBeenCalledWith({ type: 'audio' }); + }); + + it('should not detect interrupt when local participant sends transcription during Idle state', () => { + const localParticipant = { isLocal: true }; + transcriptionHandler([], localParticipant); + + expect(mockLatencyTimestampTrackerUpdate).toHaveBeenCalledTimes(1); + expect(mockOnInterruptDetected).not.toHaveBeenCalled(); + }); + + it('should not detect interrupt when local participant sends transcription during Loading state', async () => { + const localParticipant = { isLocal: true }; + const chatTranscribedPayload = Buffer.from( + JSON.stringify({ subject: StreamEvents.ChatAudioTranscribed, content: 'test', role: 'user' }) + ); + dataHandler(chatTranscribedPayload); + await new Promise(resolve => setTimeout(resolve, 0)); + + transcriptionHandler([], localParticipant); + + expect(mockLatencyTimestampTrackerUpdate).toHaveBeenCalledTimes(1); + expect(mockOnInterruptDetected).not.toHaveBeenCalled(); + }); + + it('should update latency tracker but not detect interrupt when remote participant sends transcription during Talking state', () => { + const remoteParticipant = { isLocal: false }; + const payload = Buffer.from(JSON.stringify({ subject: StreamEvents.StreamVideoCreated })); + dataHandler(payload); + + transcriptionHandler([], remoteParticipant); + + expect(mockLatencyTimestampTrackerUpdate).not.toHaveBeenCalled(); + expect(mockOnInterruptDetected).not.toHaveBeenCalled(); + }); + }); }); diff --git a/src/services/streaming-manager/livekit-manager.ts b/src/services/streaming-manager/livekit-manager.ts index b744dc85..15b4f9b2 100644 --- a/src/services/streaming-manager/livekit-manager.ts +++ b/src/services/streaming-manager/livekit-manager.ts @@ -14,6 +14,7 @@ import { import { ChatProgress } from '@sdk/types/entities/agents/manager'; import { createStreamApiV2 } from '../../api/streams/streamsApiV2'; import { didApiUrl } from '../../config/environment'; +import { latencyTimestampTracker } from '../analytics/timestamp-tracker'; import { createStreamingLogger, StreamingManager } from './common'; import type { @@ -150,9 +151,12 @@ export async function createLiveKitStreamingManager