From c601b2c52b46283dc4c02b36086fca861064b8dc Mon Sep 17 00:00:00 2001 From: dariusz-did Date: Thu, 19 Mar 2026 13:42:34 +0100 Subject: [PATCH 1/3] =?UTF-8?q?feature:=20SDK=20=E2=80=94=20ToolActive=20s?= =?UTF-8?q?tate=20+=20interruptible=20handling=20(#340)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feature: SDK — ToolActive state + interruptible handling * restored onInterruptibleChange callback * "success" removed * added guard for isInterruptible --- src/services/agent-manager/index.ts | 2 + src/services/streaming-manager/common.ts | 5 + .../data-channel-handlers.ts | 7 + .../streaming-manager/livekit-manager.test.ts | 333 ++++++++++++++++++ .../streaming-manager/livekit-manager.ts | 134 ++++--- .../streaming-manager/webrtc-manager.ts | 1 + .../factories/streaming-manager.factory.ts | 1 + src/types/entities/agents/manager.ts | 16 + src/types/stream/stream.ts | 24 ++ 9 files changed, 476 insertions(+), 47 deletions(-) create mode 100644 src/services/streaming-manager/data-channel-handlers.ts diff --git a/src/services/agent-manager/index.ts b/src/services/agent-manager/index.ts index 33dd565e..7fdd0fe0 100644 --- a/src/services/agent-manager/index.ts +++ b/src/services/agent-manager/index.ts @@ -103,6 +103,8 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt }; const interrupt = ({ type }: Interrupt) => { + if (!items.streamingManager?.isInterruptible) return; + const lastMessage = items.messages[items.messages.length - 1]; analytics.track('agent-video-interrupt', { diff --git a/src/services/streaming-manager/common.ts b/src/services/streaming-manager/common.ts index ce6d7401..63295562 100644 --- a/src/services/streaming-manager/common.ts +++ b/src/services/streaming-manager/common.ts @@ -81,6 +81,11 @@ export type StreamingManager = { + [StreamEvents.ChatAnswer]: ChatProgress.Answer, + [StreamEvents.ChatPartial]: ChatProgress.Partial, +}; diff --git a/src/services/streaming-manager/livekit-manager.test.ts b/src/services/streaming-manager/livekit-manager.test.ts index 96c9a69c..88abeb1d 100644 --- a/src/services/streaming-manager/livekit-manager.test.ts +++ b/src/services/streaming-manager/livekit-manager.test.ts @@ -194,6 +194,9 @@ function getConnectionStateHandler(index?: number) { return calls.length > 0 ? calls[calls.length - 1][1] : undefined; } +function createDataChannelPayload(data: any): Uint8Array { + return Buffer.from(JSON.stringify(data)); +} function getDataReceivedHandler() { const calls = mockRoom.on.mock.calls.filter((call: any[]) => call[0] === 'DataReceived'); return calls.length > 0 ? calls[calls.length - 1][1] : undefined; @@ -1102,3 +1105,333 @@ describe('LiveKit Streaming Manager - Disconnect Behavior', () => { expect(mockUnpublishTrack).not.toHaveBeenCalled(); }); }); + +describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { + let agentId: string; + let sessionOptions: CreateSessionV2Options; + let options: StreamingManagerOptions; + + beforeEach(() => { + jest.clearAllMocks(); + mockRoom.connect.mockResolvedValue(undefined); + mockRoom.prepareConnection.mockResolvedValue(undefined); + mockRoom.disconnect.mockResolvedValue(undefined); + mockRoom.on.mockReturnThis(); + mockLocalParticipant.audioTrackPublications = new Map(); + mockLocalParticipant.videoTrackPublications = new Map(); + agentId = TEST_AGENT_ID; + sessionOptions = { + chat_persist: true, + transport_provider: 'livekit' as any, + }; + options = StreamingManagerOptionsFactory.build(); + }); + + describe('Enum values', () => { + it('should have correct StreamEvents enum values for tool events', () => { + // ASSERT: + expect(StreamEvents.ToolCalling).toBe('tool/calling'); + expect(StreamEvents.ToolResult).toBe('tool/result'); + }); + + it('should have correct AgentActivityState enum value for ToolActive', () => { + // ASSERT: + expect(AgentActivityState.ToolActive).toBe('tool_active'); + }); + }); + + describe('handleDataReceived - tool/calling', () => { + it('should transition to ToolActive and call onToolEvent on tool/calling', async () => { + // ARRANGE: + const onAgentActivityStateChange = jest.fn(); + const onToolEvent = jest.fn(); + options.callbacks.onAgentActivityStateChange = onAgentActivityStateChange; + options.callbacks.onToolEvent = onToolEvent; + + await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + const dataHandler = getDataReceivedHandler(); + const payload = createDataChannelPayload({ + subject: StreamEvents.ToolCalling, + execution_id: 'exec-123', + tool_name: 'get_weather', + arguments: { location: 'Tel Aviv' }, + created_at: new Date().toISOString(), + }); + + // ACT: + dataHandler(payload); + + // ASSERT: + expect(onAgentActivityStateChange).toHaveBeenCalledWith(AgentActivityState.ToolActive); + expect(onToolEvent).toHaveBeenCalledWith( + StreamEvents.ToolCalling, + expect.objectContaining({ + execution_id: 'exec-123', + tool_name: 'get_weather', + }) + ); + }); + }); + + describe('handleDataReceived - tool/result', () => { + it('should call onToolEvent but not change state on tool/result', async () => { + // ARRANGE: + const onAgentActivityStateChange = jest.fn(); + const onToolEvent = jest.fn(); + options.callbacks.onAgentActivityStateChange = onAgentActivityStateChange; + options.callbacks.onToolEvent = onToolEvent; + + await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + const dataHandler = getDataReceivedHandler(); + + // First trigger tool/calling to set ToolActive + dataHandler( + createDataChannelPayload({ + subject: StreamEvents.ToolCalling, + execution_id: 'exec-123', + tool_name: 'get_weather', + arguments: {}, + created_at: new Date().toISOString(), + }) + ); + onAgentActivityStateChange.mockClear(); + + const toolResultPayload = createDataChannelPayload({ + subject: StreamEvents.ToolResult, + execution_id: 'exec-123', + tool_name: 'get_weather', + success: true, + duration_ms: 500, + error_message: null, + created_at: new Date().toISOString(), + }); + + // ACT: + dataHandler(toolResultPayload); + + // ASSERT: + expect(onAgentActivityStateChange).not.toHaveBeenCalled(); + expect(onToolEvent).toHaveBeenCalledWith( + StreamEvents.ToolResult, + expect.objectContaining({ + execution_id: 'exec-123', + success: true, + }) + ); + }); + }); + + describe('handleDataReceived - stream-video/done with interruptible', () => { + it('should transition to Idle on stream-video/done when interruptible is true', async () => { + // ARRANGE: + const onAgentActivityStateChange = jest.fn(); + options.callbacks.onAgentActivityStateChange = onAgentActivityStateChange; + + await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + const dataHandler = getDataReceivedHandler(); + + // Set ToolActive state first + dataHandler( + createDataChannelPayload({ + subject: StreamEvents.ToolCalling, + execution_id: 'exec-123', + tool_name: 'test', + arguments: {}, + created_at: new Date().toISOString(), + }) + ); + onAgentActivityStateChange.mockClear(); + + const streamVideoDonePayload = createDataChannelPayload({ + subject: StreamEvents.StreamVideoDone, + metadata: { interruptible: true }, + }); + + // ACT: + dataHandler(streamVideoDonePayload); + + // ASSERT: + expect(onAgentActivityStateChange).toHaveBeenCalledWith(AgentActivityState.Idle); + }); + + it('should transition to Idle on stream-video/done when interruptible is absent (default true)', async () => { + // ARRANGE: + const onAgentActivityStateChange = jest.fn(); + options.callbacks.onAgentActivityStateChange = onAgentActivityStateChange; + + await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + const dataHandler = getDataReceivedHandler(); + + // Set ToolActive state first + dataHandler( + createDataChannelPayload({ + subject: StreamEvents.ToolCalling, + execution_id: 'exec-123', + tool_name: 'test', + arguments: {}, + created_at: new Date().toISOString(), + }) + ); + onAgentActivityStateChange.mockClear(); + + const streamVideoDonePayload = createDataChannelPayload({ + subject: StreamEvents.StreamVideoDone, + }); + + // ACT: + dataHandler(streamVideoDonePayload); + + // ASSERT: + expect(onAgentActivityStateChange).toHaveBeenCalledWith(AgentActivityState.Idle); + }); + + it('should stay in ToolActive on stream-video/done when interruptible is false', async () => { + // ARRANGE: + const onAgentActivityStateChange = jest.fn(); + options.callbacks.onAgentActivityStateChange = onAgentActivityStateChange; + + await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + const dataHandler = getDataReceivedHandler(); + + // Set ToolActive state first + dataHandler( + createDataChannelPayload({ + subject: StreamEvents.ToolCalling, + execution_id: 'exec-123', + tool_name: 'test', + arguments: {}, + created_at: new Date().toISOString(), + }) + ); + onAgentActivityStateChange.mockClear(); + + const streamVideoDonePayload = createDataChannelPayload({ + subject: StreamEvents.StreamVideoDone, + metadata: { interruptible: false }, + }); + + // ACT: + dataHandler(streamVideoDonePayload); + + // ASSERT: + expect(onAgentActivityStateChange).not.toHaveBeenCalled(); + }); + }); + + describe('Chained tools', () => { + it('should stay ToolActive across multiple tool calls until final stream-video/done', async () => { + // ARRANGE: + const onAgentActivityStateChange = jest.fn(); + const onToolEvent = jest.fn(); + options.callbacks.onAgentActivityStateChange = onAgentActivityStateChange; + options.callbacks.onToolEvent = onToolEvent; + + await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + const dataHandler = getDataReceivedHandler(); + + // ACT: + // First tool cycle + dataHandler( + createDataChannelPayload({ + subject: StreamEvents.ToolCalling, + execution_id: 'exec-1', + tool_name: 'tool1', + arguments: {}, + created_at: new Date().toISOString(), + }) + ); + + dataHandler( + createDataChannelPayload({ + subject: StreamEvents.ToolResult, + execution_id: 'exec-1', + tool_name: 'tool1', + success: true, + duration_ms: 100, + created_at: new Date().toISOString(), + }) + ); + + // interruptible: false = more tools coming + dataHandler( + createDataChannelPayload({ + subject: StreamEvents.StreamVideoDone, + metadata: { interruptible: false }, + }) + ); + + // Second tool cycle + dataHandler( + createDataChannelPayload({ + subject: StreamEvents.ToolCalling, + execution_id: 'exec-2', + tool_name: 'tool2', + arguments: {}, + created_at: new Date().toISOString(), + }) + ); + + dataHandler( + createDataChannelPayload({ + subject: StreamEvents.ToolResult, + execution_id: 'exec-2', + tool_name: 'tool2', + success: true, + duration_ms: 200, + created_at: new Date().toISOString(), + }) + ); + + // interruptible: true = tool chain complete + dataHandler( + createDataChannelPayload({ + subject: StreamEvents.StreamVideoDone, + metadata: { interruptible: true }, + }) + ); + + // ASSERT: + expect(onAgentActivityStateChange).toHaveBeenCalledWith(AgentActivityState.ToolActive); + expect(onAgentActivityStateChange).toHaveBeenLastCalledWith(AgentActivityState.Idle); + expect(onToolEvent).toHaveBeenCalledTimes(4); + }); + }); + + describe('No regression - sessions without tools', () => { + it('should handle stream-video/done without metadata (backwards compatible)', async () => { + // ARRANGE: + const onAgentActivityStateChange = jest.fn(); + const onMessage = jest.fn(); + options.callbacks.onAgentActivityStateChange = onAgentActivityStateChange; + options.callbacks.onMessage = onMessage; + + await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + const dataHandler = getDataReceivedHandler(); + + // Regular session without any tool events + const payload = createDataChannelPayload({ + subject: StreamEvents.StreamVideoDone, + }); + + // ACT: + dataHandler(payload); + + // ASSERT: + expect(onAgentActivityStateChange).toHaveBeenCalledWith(AgentActivityState.Idle); + expect(onMessage).toHaveBeenCalled(); + }); + }); +}); diff --git a/src/services/streaming-manager/livekit-manager.ts b/src/services/streaming-manager/livekit-manager.ts index 3de5e23a..468c50b9 100644 --- a/src/services/streaming-manager/livekit-manager.ts +++ b/src/services/streaming-manager/livekit-manager.ts @@ -10,6 +10,8 @@ import { StreamingManagerOptions, StreamingState, StreamType, + ToolCallingPayload, + ToolResultPayload, TransportProvider, } from '@sdk/types'; import { ChatProgress } from '@sdk/types/entities/agents/manager'; @@ -18,6 +20,7 @@ import { createStreamApiV2 } from '../../api/streams/streamsApiV2'; import { didApiUrl } from '../../config/environment'; import { latencyTimestampTracker } from '../analytics/timestamp-tracker'; import { createStreamingLogger, StreamingManager } from './common'; +import { chatEventMap } from './data-channel-handlers'; import type { ConnectionQuality, @@ -119,6 +122,7 @@ export async function createLiveKitStreamingManager | null = null; let currentActivityState: AgentActivityState = AgentActivityState.Idle; + let currentInterruptible = true; const streamApi = createStreamApiV2(auth, baseURL || didApiUrl, agentId, callbacks.onError); let sessionId: string | undefined; @@ -319,6 +323,84 @@ export async function createLiveKitStreamingManager sets ToolActive + * - stream-video/done with interruptible: true -> sets Idle + * - stream-video/done with interruptible: false -> stays ToolActive (more tools coming) + */ + function handleToolEvents(subject: string, data: any): void { + if (subject === StreamEvents.ToolCalling) { + currentActivityState = AgentActivityState.ToolActive; + callbacks.onAgentActivityStateChange?.(AgentActivityState.ToolActive); + callbacks.onToolEvent?.(StreamEvents.ToolCalling, data as ToolCallingPayload); + return; + } + + if (subject === StreamEvents.ToolResult) { + callbacks.onToolEvent?.(StreamEvents.ToolResult, data as ToolResultPayload); + } + } + + function handleVideoActivityState(subject: string, data: any): void { + currentInterruptible = data.metadata?.interruptible !== false; + callbacks.onInterruptibleChange?.(currentInterruptible); + + if (subject === StreamEvents.StreamVideoCreated) { + currentActivityState = AgentActivityState.Talking; + callbacks.onAgentActivityStateChange?.(AgentActivityState.Talking); + return; + } + + if (currentInterruptible) { + currentActivityState = AgentActivityState.Idle; + callbacks.onAgentActivityStateChange?.(AgentActivityState.Idle); + } + } + + function handleVideoEvents(subject: string, data: any): void { + const rtt = videoStatsMonitor?.getReport()?.webRTCStats?.avgRtt ?? 0; + const downstreamNetworkLatency = rtt > 0 ? Math.round((rtt / 2) * 1000) : 0; + const messageData: VideoMessageData = { ...data, downstreamNetworkLatency }; + + if (options.debug && data?.metadata?.sentiment) { + messageData.sentiment = { + id: data.metadata.sentiment.id, + name: data.metadata.sentiment.sentiment, + }; + } + + callbacks.onMessage?.(subject as StreamEvents, messageData); + handleVideoActivityState(subject, data); + } + + function handleTranscriptionEvents(_: string, data: any): void { + callbacks.onMessage?.(ChatProgress.Transcribe, { event: ChatProgress.Transcribe, ...data }); + queueMicrotask(() => { + callbacks.onAgentActivityStateChange?.(AgentActivityState.Loading); + }); + } + + type DataChannelHandler = (subject: string, data: any) => void; + const dataChannelHandlers: Record = { + [StreamEvents.ChatAnswer]: handleChatEvents, + [StreamEvents.ChatPartial]: handleChatEvents, + [StreamEvents.ToolCalling]: handleToolEvents, + [StreamEvents.ToolResult]: handleToolEvents, + [StreamEvents.StreamVideoCreated]: handleVideoEvents, + [StreamEvents.StreamVideoDone]: handleVideoEvents, + [StreamEvents.StreamVideoError]: handleVideoEvents, + [StreamEvents.StreamVideoRejected]: handleVideoEvents, + [StreamEvents.ChatAudioTranscribed]: handleTranscriptionEvents, + }; + function handleDataReceived( payload: Uint8Array, participant?: RemoteParticipant, @@ -330,56 +412,13 @@ export async function createLiveKitStreamingManager 0 ? Math.round((rtt / 2) * 1000) : 0; - const messageData: VideoMessageData = { ...data, downstreamNetworkLatency }; - - if (options.debug && data?.metadata?.sentiment) { - messageData.sentiment = { - id: data.metadata.sentiment.id, - name: data.metadata.sentiment.sentiment, - }; - } + if (!subject) return; - callbacks.onMessage?.(subject, messageData); - } else if (subject === StreamEvents.ChatAudioTranscribed) { - const eventName = ChatProgress.Transcribe; - callbacks.onMessage?.(eventName, { - event: eventName, - ...data, - }); - // Set loading state after transcribed message is processed (similar to v1) - // Use queueMicrotask to ensure message is added before setting loading state - queueMicrotask(() => { - callbacks.onAgentActivityStateChange?.(AgentActivityState.Loading); - }); - } + const handler = dataChannelHandlers[subject]; + handler?.(subject, data); } catch (e) { log('Failed to parse data channel message:', e); } @@ -654,6 +693,7 @@ export async function createLiveKitStreamingManager jest.fn().mockResolvedValue({ status: 'success', duration: 5000, video_id: 'video-123' }), disconnect: () => jest.fn().mockResolvedValue(undefined), sendDataChannelMessage: () => jest.fn(), diff --git a/src/types/entities/agents/manager.ts b/src/types/entities/agents/manager.ts index d00916c8..23c8675a 100644 --- a/src/types/entities/agents/manager.ts +++ b/src/types/entities/agents/manager.ts @@ -9,6 +9,8 @@ import { StreamEvents, StreamType, StreamingState, + ToolCallingPayload, + ToolResultPayload, } from '@sdk/types/stream'; import { SupportedStreamScript } from '@sdk/types/stream-script'; import type { ManagerCallbacks as StreamManagerCallbacks } from '../../stream/stream'; @@ -103,6 +105,20 @@ interface ManagerCallbacks { * @param stream - object containing stream_id, session_id and agent_id */ onStreamCreated?: StreamManagerCallbacks['onStreamCreated']; + /** + * Optional callback function that will be triggered when tool events occur during the call + * @param event - The tool event type (tool/calling or tool/result) + * @param data - The tool event payload + */ + onToolEvent?: ( + event: StreamEvents.ToolCalling | StreamEvents.ToolResult, + data: ToolCallingPayload | ToolResultPayload + ) => void; + /** + * Optional callback function that will be triggered when the interruptible state changes + * @param interruptible - Whether the agent can be interrupted by the user + */ + onInterruptibleChange?: StreamManagerCallbacks['onInterruptibleChange']; } interface StreamOptions { diff --git a/src/types/stream/stream.ts b/src/types/stream/stream.ts index 6ae1bc18..c7f8176e 100644 --- a/src/types/stream/stream.ts +++ b/src/types/stream/stream.ts @@ -23,6 +23,7 @@ export enum AgentActivityState { Idle = 'IDLE', Loading = 'LOADING', Talking = 'TALKING', + ToolActive = 'tool_active', } export enum StreamEvents { @@ -39,6 +40,8 @@ export enum StreamEvents { StreamVideoDone = 'stream-video/done', StreamVideoError = 'stream-video/error', StreamVideoRejected = 'stream-video/rejected', + ToolCalling = 'tool/calling', + ToolResult = 'tool/result', } export enum ConnectionState { @@ -69,6 +72,11 @@ export interface ManagerCallbacks { onStreamCreated?: (stream: { stream_id: string; session_id: string; agent_id: string }) => void; onStreamReady?: () => void; onInterruptDetected?: (interrupt: Interrupt) => void; + onToolEvent?: ( + event: StreamEvents.ToolCalling | StreamEvents.ToolResult, + data: ToolCallingPayload | ToolResultPayload + ) => void; + onInterruptibleChange?: (interruptible: boolean) => void; } export type ManagerCallbackKeys = keyof ManagerCallbacks; @@ -179,3 +187,19 @@ export interface StreamInterruptPayload { videoId: string; timestamp: number; } + +export interface ToolCallingPayload { + execution_id: string; + tool_name: string; + arguments: Record; + created_at: string; +} + +export interface ToolResultPayload { + execution_id: string; + tool_name: string; + duration_ms: number; + result?: unknown; + error_message?: string | null; + created_at: string; +} From 4397133c3725604e0283bc3e32546d11272a46cc Mon Sep 17 00:00:00 2001 From: dariusz-did Date: Mon, 23 Mar 2026 13:22:22 +0100 Subject: [PATCH 2/3] enum case (#342) --- src/services/streaming-manager/livekit-manager.test.ts | 2 +- src/types/stream/stream.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/services/streaming-manager/livekit-manager.test.ts b/src/services/streaming-manager/livekit-manager.test.ts index 88abeb1d..e9b8b930 100644 --- a/src/services/streaming-manager/livekit-manager.test.ts +++ b/src/services/streaming-manager/livekit-manager.test.ts @@ -1136,7 +1136,7 @@ describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { it('should have correct AgentActivityState enum value for ToolActive', () => { // ASSERT: - expect(AgentActivityState.ToolActive).toBe('tool_active'); + expect(AgentActivityState.ToolActive).toBe('TOOL_ACTIVE'); }); }); diff --git a/src/types/stream/stream.ts b/src/types/stream/stream.ts index c7f8176e..404e055c 100644 --- a/src/types/stream/stream.ts +++ b/src/types/stream/stream.ts @@ -23,7 +23,7 @@ export enum AgentActivityState { Idle = 'IDLE', Loading = 'LOADING', Talking = 'TALKING', - ToolActive = 'tool_active', + ToolActive = 'TOOL_ACTIVE', } export enum StreamEvents { From e91dcdd2b46b509d86122f3b780c86f5d22de318 Mon Sep 17 00:00:00 2001 From: omrizitrin-did Date: Mon, 23 Mar 2026 15:51:17 +0200 Subject: [PATCH 3/3] feature/improve e2e latency measurment (#341) --- .../agent-manager/connect-to-manager.test.ts | 4 + .../agent-manager/connect-to-manager.ts | 50 ++++--- src/services/analytics/timestamp-tracker.ts | 13 ++ .../streaming-manager/livekit-manager.test.ts | 4 + .../streaming-manager/livekit-manager.ts | 32 ++++- src/services/streaming-manager/stats/poll.ts | 123 +++++++++++++++--- src/types/stream/stream.ts | 1 + 7 files changed, 188 insertions(+), 39 deletions(-) diff --git a/src/services/agent-manager/connect-to-manager.test.ts b/src/services/agent-manager/connect-to-manager.test.ts index 8e626a2d..e8f653c8 100644 --- a/src/services/agent-manager/connect-to-manager.test.ts +++ b/src/services/agent-manager/connect-to-manager.test.ts @@ -237,6 +237,7 @@ describe('connect-to-manager', () => { let onConnectionStateChange: (state: ConnectionState) => void; let onVideoStateChange: (state: StreamingState, statsReport?: any) => void; let onAgentActivityStateChange: (state: AgentActivityState) => void; + let onFirstAudioDetected: ((latency?: number) => void) | undefined; let onStreamReady: (() => void) | undefined; beforeEach(async () => { @@ -249,6 +250,7 @@ describe('connect-to-manager', () => { onConnectionStateChange = options.callbacks.onConnectionStateChange; onVideoStateChange = options.callbacks.onVideoStateChange; onAgentActivityStateChange = options.callbacks.onAgentActivityStateChange; + onFirstAudioDetected = options.callbacks.onFirstAudioDetected; onStreamReady = options.callbacks.onStreamReady; return new Promise(resolve => { @@ -386,6 +388,7 @@ describe('connect-to-manager', () => { describe('onAgentActivityStateChange', () => { it('should handle agent talking state', () => { onAgentActivityStateChange(AgentActivityState.Talking); + onFirstAudioDetected?.(1000); expect(mockOptions.callbacks.onAgentActivityStateChange).toHaveBeenCalledWith( AgentActivityState.Talking @@ -417,6 +420,7 @@ describe('connect-to-manager', () => { it('should handle agent talking state with analytics', () => { onAgentActivityStateChange(AgentActivityState.Talking); + onFirstAudioDetected?.(1000); expect(mockOptions.callbacks.onAgentActivityStateChange).toHaveBeenCalledWith( AgentActivityState.Talking diff --git a/src/services/agent-manager/connect-to-manager.ts b/src/services/agent-manager/connect-to-manager.ts index d6be0744..53f1924b 100644 --- a/src/services/agent-manager/connect-to-manager.ts +++ b/src/services/agent-manager/connect-to-manager.ts @@ -107,17 +107,13 @@ function trackAgentActivityAnalytics( state: StreamingState, agent: Agent, analytics: Analytics, - streamType: StreamType + streamType: StreamType, + latency?: number ) { - if (latencyTimestampTracker.get() <= 0) return; - if (state === StreamingState.Start) { - analytics.linkTrack( - 'agent-video', - { event: 'start', latency: latencyTimestampTracker.get(true), 'stream-type': streamType }, - 'start', - [StreamEvents.StreamVideoCreated] - ); + analytics.linkTrack('agent-video', { event: 'start', latency, 'stream-type': streamType }, 'start', [ + StreamEvents.StreamVideoCreated, + ]); } else if (state === StreamingState.Stop) { analytics.linkTrack( 'agent-video', @@ -140,8 +136,6 @@ function trackLegacyVideoAnalytics( analytics: Analytics, streamType: StreamType ) { - if (latencyTimestampTracker.get() <= 0) return; - if (state === StreamingState.Start) { analytics.linkTrack( 'agent-video', @@ -172,6 +166,7 @@ type ConnectToManagerOptions = AgentManagerOptions & { onMessage?: ChatProgressCallback; /** Internal callback for when interrupt is detected by streaming manager */ onInterruptDetected?: (interrupt: Interrupt) => void; + onFirstAudioDetected?: (latency?: number) => void; }; chatId?: string; }; @@ -195,6 +190,9 @@ function connectToManager( 'stream-version': streamOptions.version.toString(), }); + let pendingStartTrack: ((latency?: number) => void) | null = null; + const isExpressive = agent.presenter.type === 'expressive'; + streamingManager = await createStreamingManager( agent, streamOptions, @@ -234,16 +232,32 @@ function connectToManager( if (state === AgentActivityState.Talking) { interruptTimestampTracker.update(); + pendingStartTrack = latency => { + trackAgentActivityAnalytics( + StreamingState.Start, + agent, + analytics, + streamingManager.streamType, + latency + ); + pendingStartTrack = null; + }; + if (!isExpressive) { + pendingStartTrack(latencyTimestampTracker.get(true)); + } } else { interruptTimestampTracker.reset(); + pendingStartTrack = null; + trackAgentActivityAnalytics( + StreamingState.Stop, + agent, + analytics, + streamingManager.streamType + ); } - - trackAgentActivityAnalytics( - state === AgentActivityState.Talking ? StreamingState.Start : StreamingState.Stop, - agent, - analytics, - streamingManager.streamType - ); + }, + onFirstAudioDetected: latency => { + pendingStartTrack?.(latency); }, onStreamReady: () => { const readyLatency = streamReadyTimestampTracker.get(true); diff --git a/src/services/analytics/timestamp-tracker.ts b/src/services/analytics/timestamp-tracker.ts index 381a89c0..40299098 100644 --- a/src/services/analytics/timestamp-tracker.ts +++ b/src/services/analytics/timestamp-tracker.ts @@ -11,3 +11,16 @@ function createTimestampTracker() { export const latencyTimestampTracker = createTimestampTracker(); export const interruptTimestampTracker = createTimestampTracker(); export const streamReadyTimestampTracker = createTimestampTracker(); + +export const sttLatencyStore = (() => { + let value: number | undefined; + return { + set: (newValue: number | undefined) => { + value = newValue; + }, + get: () => value, + reset: () => { + value = undefined; + }, + }; +})(); diff --git a/src/services/streaming-manager/livekit-manager.test.ts b/src/services/streaming-manager/livekit-manager.test.ts index e9b8b930..1f508632 100644 --- a/src/services/streaming-manager/livekit-manager.test.ts +++ b/src/services/streaming-manager/livekit-manager.test.ts @@ -99,6 +99,10 @@ jest.mock('./stats/poll', () => ({ }; } ), + createAudioStatsDetector: jest.fn(() => ({ + arm: jest.fn(), + destroy: jest.fn(), + })), })); const mockLatencyTimestampTrackerUpdate = jest.fn(); diff --git a/src/services/streaming-manager/livekit-manager.ts b/src/services/streaming-manager/livekit-manager.ts index 468c50b9..9dc8caf4 100644 --- a/src/services/streaming-manager/livekit-manager.ts +++ b/src/services/streaming-manager/livekit-manager.ts @@ -18,7 +18,7 @@ import { ChatProgress } from '@sdk/types/entities/agents/manager'; import { noop } from '@sdk/utils'; import { createStreamApiV2 } from '../../api/streams/streamsApiV2'; import { didApiUrl } from '../../config/environment'; -import { latencyTimestampTracker } from '../analytics/timestamp-tracker'; +import { latencyTimestampTracker, sttLatencyStore } from '../analytics/timestamp-tracker'; import { createStreamingLogger, StreamingManager } from './common'; import { chatEventMap } from './data-channel-handlers'; @@ -35,7 +35,7 @@ import type { Track, TranscriptionSegment, } from 'livekit-client'; -import { createVideoStatsMonitor } from './stats/poll'; +import { createAudioStatsDetector, createVideoStatsMonitor } from './stats/poll'; import { VideoRTCStatsReport } from './stats/report'; const TRACK_SUBSCRIPTION_TIMEOUT_MS = 20000; @@ -111,6 +111,7 @@ export async function createLiveKitStreamingManager | null = null; + let audioStatsDetector: ReturnType | null = null; let videoStreamingState: StreamingState | null = null; // We defer Connected until video track is subscribed to align with WebRTC behavior let hasEmittedConnected = false; @@ -282,6 +283,23 @@ export async function createLiveKitStreamingManager track.getRTCStatsReport(), + () => { + const clientLatency = latencyTimestampTracker.get(true); + const sttLatency = sttLatencyStore.get(); + let networkLatency = 0; + if (sttLatency) { + const rtt = videoStatsMonitor?.getReport()?.webRTCStats?.avgRtt ?? 0; + networkLatency = rtt > 0 ? Math.round(rtt * 1000) : 0; + } + const latency = clientLatency > 0 ? clientLatency + (sttLatency ?? 0) + networkLatency : undefined; + callbacks.onFirstAudioDetected?.(latency); + } + ); + } + if (track.kind === 'video') { callbacks.onStreamReady?.(); log('CALLBACK: onSrcObjectReady'); @@ -316,6 +334,11 @@ export async function createLiveKitStreamingManager Promise, + onFirstAudioDetected: () => void +) { + let armed = false; + let baselined = false; + let timerId: ReturnType | null = null; + let armTime = 0; + let prevTotalAudioEnergy = 0; + let prevTotalSamplesReceived = 0; + + async function poll() { + if (!armed) return; + + try { + const stats = await getStats(); + if (!stats) { + timerId = setTimeout(poll, AUDIO_STATS_POLL_INTERVAL_MS); + return; + } + + const report = findInboundRtpReport(stats, 'audio'); + if (!report) { + timerId = setTimeout(poll, AUDIO_STATS_POLL_INTERVAL_MS); + return; + } + + const totalAudioEnergy: number = report.totalAudioEnergy ?? 0; + const totalSamplesReceived: number = report.totalSamplesReceived ?? 0; + + if (!baselined) { + prevTotalAudioEnergy = totalAudioEnergy; + prevTotalSamplesReceived = totalSamplesReceived; + baselined = true; + timerId = setTimeout(poll, AUDIO_STATS_POLL_INTERVAL_MS); + return; + } + + const energyDelta = totalAudioEnergy - prevTotalAudioEnergy; + const samplesDelta = totalSamplesReceived - prevTotalSamplesReceived; + prevTotalAudioEnergy = totalAudioEnergy; + prevTotalSamplesReceived = totalSamplesReceived; + + if (samplesDelta > 0 && energyDelta > 0) { + armed = false; + onFirstAudioDetected(); + return; + } + } catch { + // stats not available yet + } + + if (armed) { + timerId = setTimeout(poll, AUDIO_STATS_POLL_INTERVAL_MS); + } + } + + return { + arm() { + armed = true; + baselined = false; + armTime = performance.now(); + if (timerId !== null) clearTimeout(timerId); + timerId = setTimeout(poll, AUDIO_STATS_POLL_INTERVAL_MS); + }, + destroy() { + armed = false; + if (timerId !== null) { + clearTimeout(timerId); + timerId = null; + } + }, + }; +} + const interval = 100; const notReceivingIntervalsThreshold = Math.max(Math.ceil(400 / interval), 1); const LOW_JITTER_TRESHOLD = 0.25; @@ -12,29 +98,28 @@ function createVideoStatsAnalyzer() { let prevCount: any; let avgJitterDelayInInterval = 0; return (stats: RTCStatsReport) => { - for (const report of stats.values()) { - if (report && report.type === 'inbound-rtp' && report.kind === 'video') { - const delay = report.jitterBufferDelay; - const count = report.jitterBufferEmittedCount; - - if (prevCount && count > prevCount) { - const deltaDelay = delay - prevDelay; - const deltaCount = count - prevCount; - avgJitterDelayInInterval = deltaDelay / deltaCount; - } - - prevDelay = delay; - prevCount = count; + const report = findInboundRtpReport(stats, 'video'); + if (!report) { + return { isReceiving: false, avgJitterDelayInInterval }; + } - const currFramesReceived = report.framesDecoded; - const isReceiving = currFramesReceived - lastFramesReceived > 0; - lastFramesReceived = currFramesReceived; + const delay = report.jitterBufferDelay; + const count = report.jitterBufferEmittedCount; - return { isReceiving, avgJitterDelayInInterval, freezeCount: report.freezeCount }; - } + if (prevCount && count > prevCount) { + const deltaDelay = delay - prevDelay; + const deltaCount = count - prevCount; + avgJitterDelayInInterval = deltaDelay / deltaCount; } - return { isReceiving: false, avgJitterDelayInInterval }; + prevDelay = delay; + prevCount = count; + + const currFramesReceived = report.framesDecoded; + const isReceiving = currFramesReceived - lastFramesReceived > 0; + lastFramesReceived = currFramesReceived; + + return { isReceiving, avgJitterDelayInInterval, freezeCount: report.freezeCount }; }; } diff --git a/src/types/stream/stream.ts b/src/types/stream/stream.ts index 404e055c..f6d98693 100644 --- a/src/types/stream/stream.ts +++ b/src/types/stream/stream.ts @@ -77,6 +77,7 @@ export interface ManagerCallbacks { data: ToolCallingPayload | ToolResultPayload ) => void; onInterruptibleChange?: (interruptible: boolean) => void; + onFirstAudioDetected?: (latency?: number) => void; } export type ManagerCallbackKeys = keyof ManagerCallbacks;