From 103c9481fb9a1bbabb8223b9b34f5eb053789abb Mon Sep 17 00:00:00 2001 From: Dariusz Date: Tue, 28 Apr 2026 13:58:55 +0200 Subject: [PATCH 1/5] feat: track tool-call/* events to Mixpanel Wires the new tool-call/started, tool-call/done, and tool-call/error data channel events into the existing Mixpanel pipeline as a single agent-tool-call event with an event discriminator prop. started ships call_id and name. done and error additionally ship duration_ms and extra_keys (a count, not the content) so dashboards can compute latency and error rate per tool name without leaking input/output payloads. Tracking is hooked at the same callbacks-spread layer in connect-to-manager where connection / video / activity analytics already live, keeping the streaming-manager unaware of analytics. --- .../agent-manager/connect-to-manager.ts | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/services/agent-manager/connect-to-manager.ts b/src/services/agent-manager/connect-to-manager.ts index 28b84e4b..a65d6ef7 100644 --- a/src/services/agent-manager/connect-to-manager.ts +++ b/src/services/agent-manager/connect-to-manager.ts @@ -20,6 +20,9 @@ import { StreamEvents, StreamType, StreamingState, + ToolCallDonePayload, + ToolCallErrorPayload, + ToolEventPayload, TransportProvider, } from '@sdk/types'; import { isStreamsV2Agent } from '@sdk/utils/agent'; @@ -161,6 +164,30 @@ function trackLegacyVideoAnalytics( } } +function trackToolEventAnalytics( + event: StreamEvents.ToolCallStarted | StreamEvents.ToolCallDone | StreamEvents.ToolCallError, + payload: ToolEventPayload, + analytics: Analytics +) { + const baseProps: Record = { + call_id: payload.call_id, + name: payload.name, + }; + + if (event === StreamEvents.ToolCallStarted) { + analytics.track('agent-tool-call', { ...baseProps, event: 'started' }); + return; + } + + const finishedPayload = payload as ToolCallDonePayload | ToolCallErrorPayload; + analytics.track('agent-tool-call', { + ...baseProps, + event: event === StreamEvents.ToolCallDone ? 'done' : 'error', + duration_ms: finishedPayload.duration_ms, + extra_keys: finishedPayload.extra ? Object.keys(finishedPayload.extra).length : 0, + }); +} + type ConnectToManagerOptions = AgentManagerOptions & { callbacks: AgentManagerOptions['callbacks'] & { onVideoIdChange?: (videoId: string | null) => void; @@ -263,6 +290,10 @@ function connectToManager( const readyLatency = streamReadyTimestampTracker.get(true); analytics.track('agent-chat', { event: 'ready', latency: readyLatency }); }, + onToolEvent: ((event: any, data: any) => { + options.callbacks.onToolEvent?.(event, data); + trackToolEventAnalytics(event, data, analytics); + }) as typeof options.callbacks.onToolEvent, }, }, signal From c18807eebc7b86c2ecc75cbccaf5c600d6f353a8 Mon Sep 17 00:00:00 2001 From: Dariusz Date: Tue, 28 Apr 2026 15:34:40 +0200 Subject: [PATCH 2/5] test: cover trackToolEventAnalytics callback wiring Adds an onToolEvent describe block exercising the started, done, and error branches plus the missing-extra fallback. Restores Lines coverage above the 87% global threshold (was 86.94%). --- .../agent-manager/connect-to-manager.test.ts | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/src/services/agent-manager/connect-to-manager.test.ts b/src/services/agent-manager/connect-to-manager.test.ts index 6e80a36e..99f42a35 100644 --- a/src/services/agent-manager/connect-to-manager.test.ts +++ b/src/services/agent-manager/connect-to-manager.test.ts @@ -86,6 +86,7 @@ describe('connect-to-manager', () => { onSrcObjectReady: jest.fn(), onNewMessage: jest.fn(), onNewChat: jest.fn(), + onToolEvent: jest.fn(), }, }; @@ -239,6 +240,7 @@ describe('connect-to-manager', () => { let onAgentActivityStateChange: (state: AgentActivityState) => void; let onFirstAudioDetected: ((metrics: { latency?: number; networkLatency?: number }) => void) | undefined; let onStreamReady: (() => void) | undefined; + let onToolEvent: ((event: StreamEvents, data: any) => void) | undefined; beforeEach(async () => { // Initialize callbacks to avoid undefined errors @@ -252,6 +254,7 @@ describe('connect-to-manager', () => { onAgentActivityStateChange = options.callbacks.onAgentActivityStateChange; onFirstAudioDetected = options.callbacks.onFirstAudioDetected; onStreamReady = options.callbacks.onStreamReady; + onToolEvent = options.callbacks.onToolEvent; return new Promise(resolve => { setTimeout(() => { @@ -447,6 +450,79 @@ describe('connect-to-manager', () => { }); }); }); + + describe('onToolEvent', () => { + const startedPayload = { + call_id: 'call-1', + name: 'lookup', + input: { q: 'hello' }, + output: {}, + timestamp: '2026-04-28T00:00:00Z', + }; + const donePayload = { + ...startedPayload, + output: { result: 'ok' }, + duration_ms: 123, + extra: { region: 'eu' }, + }; + const errorPayload = { + ...donePayload, + extra: { reason: 'timeout', code: 504 }, + }; + + beforeEach(() => { + (mockAnalytics.track as jest.Mock).mockClear(); + }); + + it('forwards started event to user callback and tracks agent-tool-call', () => { + onToolEvent?.(StreamEvents.ToolCallStarted, startedPayload as any); + + expect(mockOptions.callbacks.onToolEvent).toHaveBeenCalledWith( + StreamEvents.ToolCallStarted, + startedPayload + ); + expect(mockAnalytics.track).toHaveBeenCalledWith('agent-tool-call', { + event: 'started', + call_id: 'call-1', + name: 'lookup', + }); + }); + + it('tracks done event with duration_ms and extra_keys count', () => { + onToolEvent?.(StreamEvents.ToolCallDone, donePayload as any); + + expect(mockAnalytics.track).toHaveBeenCalledWith('agent-tool-call', { + event: 'done', + call_id: 'call-1', + name: 'lookup', + duration_ms: 123, + extra_keys: 1, + }); + }); + + it('tracks error event with duration_ms and extra_keys count', () => { + onToolEvent?.(StreamEvents.ToolCallError, errorPayload as any); + + expect(mockAnalytics.track).toHaveBeenCalledWith('agent-tool-call', { + event: 'error', + call_id: 'call-1', + name: 'lookup', + duration_ms: 123, + extra_keys: 2, + }); + }); + + it('handles missing extra map by emitting extra_keys=0', () => { + const { extra: _extra, ...donePayloadWithoutExtra } = donePayload; + + onToolEvent?.(StreamEvents.ToolCallDone, donePayloadWithoutExtra as any); + + expect(mockAnalytics.track).toHaveBeenCalledWith( + 'agent-tool-call', + expect.objectContaining({ extra_keys: 0 }) + ); + }); + }); }); describe('Stream Options Mapping', () => { From 7c678d4e59a9a75c3bb9647198fe57c285bcc79f Mon Sep 17 00:00:00 2001 From: dor-eitan <164745144+dor-eitan@users.noreply.github.com> Date: Tue, 28 Apr 2026 19:26:10 +0300 Subject: [PATCH 3/5] bugfix: remove random session-key suffix from Client-Key auth header (#377) The SDK appended a per-page-load random `_${sessionKey}` to the Client-Key header. The backend authorizer captures it as part of `external_id`, which silently broke per-user persistence (e.g. memory_id keys) by minting a new identifier on every reload. No backend code consumes the suffix as a session identifier; one analytics handler already strips it. Drop it from the SDK so external_id stays stable across reloads. --- src/auth/get-auth-header.test.ts | 11 ++++------- src/auth/get-auth-header.ts | 3 +-- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/auth/get-auth-header.test.ts b/src/auth/get-auth-header.test.ts index 7608ab96..1fd48b36 100644 --- a/src/auth/get-auth-header.test.ts +++ b/src/auth/get-auth-header.test.ts @@ -128,7 +128,7 @@ describe('getAuthHeader', () => { const auth: Auth = { type: 'key', clientKey: 'test-client-key' }; const result = getAuthHeader(auth); - expect(result).toBe('Client-Key test-client-key.generated-external-id_mocked-random-id'); + expect(result).toBe('Client-Key test-client-key.generated-external-id'); }); it('should use provided externalId in Client-Key header', () => { @@ -136,8 +136,7 @@ describe('getAuthHeader', () => { const externalId = 'user-123'; const result = getAuthHeader(auth, externalId); - expect(result).toBe('Client-Key test-client-key.user-123_mocked-random-id'); - expect(result).toContain('user-123'); + expect(result).toBe('Client-Key test-client-key.user-123'); }); it('should use externalId from localStorage when not provided', () => { @@ -147,8 +146,7 @@ describe('getAuthHeader', () => { const auth: Auth = { type: 'key', clientKey: 'test-client-key' }; const result = getAuthHeader(auth); - expect(result).toBe('Client-Key test-client-key.stored-user-id_mocked-random-id'); - expect(result).toContain('stored-user-id'); + expect(result).toBe('Client-Key test-client-key.stored-user-id'); }); it('should generate new externalId and store it when localStorage is empty', () => { @@ -158,8 +156,7 @@ describe('getAuthHeader', () => { const auth: Auth = { type: 'key', clientKey: 'test-client-key' }; const result = getAuthHeader(auth); - expect(result).toBe('Client-Key test-client-key.new-generated-id_mocked-random-id'); - expect(result).toContain('new-generated-id'); + expect(result).toBe('Client-Key test-client-key.new-generated-id'); expect(window.localStorage.getItem('did_external_key_id')).toBe(mockRandomId); }); diff --git a/src/auth/get-auth-header.ts b/src/auth/get-auth-header.ts index 2e3eddc1..7959b887 100644 --- a/src/auth/get-auth-header.ts +++ b/src/auth/get-auth-header.ts @@ -18,14 +18,13 @@ export function getExternalId(externalId?: string): string { return key; } -let sessionKey = getRandom(); export function getAuthHeader(auth: Auth, externalId?: string) { if (auth.type === 'bearer') { return `Bearer ${auth.token}`; } else if (auth.type === 'basic') { return `Basic ${'token' in auth ? auth.token : btoa(`${auth.username}:${auth.password}`)}`; } else if (auth.type === 'key') { - return `Client-Key ${auth.clientKey}.${getExternalId(externalId)}_${sessionKey}`; + return `Client-Key ${auth.clientKey}.${getExternalId(externalId)}`; } else { throw new Error(`Unknown auth type: ${auth}`); } From 1aa4b369f2db8c5e369ee1bf6c7ec53c79ea2c36 Mon Sep 17 00:00:00 2001 From: Arik Sfaradi Date: Wed, 29 Apr 2026 11:27:54 +0300 Subject: [PATCH 4/5] fix: skip text interrupts on expressive agents to avoid races (#376) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: skip text interrupts on expressive agents to avoid races Text-typed interrupts on expressive (V2) streams caused race conditions when the user kept typing while the agent was talking — each keystroke that triggered chat send would also fire an extra interrupt on top of prior unsettled ones. Refactor: move per-transport interrupt logic into each StreamingManager (livekit-manager skips type==='text'; webrtc-manager preserves the existing validation + payload from sendInterrupt/validateInterrupt). agent-manager just forwards the call. Removes services/interrupt module. Co-Authored-By: Claude Opus 4.7 (1M context) * lint --------- Co-authored-by: Claude Opus 4.7 (1M context) --- src/services/agent-manager/index.test.ts | 18 ++---- src/services/agent-manager/index.ts | 9 +-- src/services/interrupt/index.ts | 51 ---------------- src/services/streaming-manager/common.ts | 10 +++- .../streaming-manager/livekit-manager.ts | 9 +++ .../streaming-manager/webrtc-manager.ts | 60 +++++++++++++------ .../factories/streaming-manager.factory.ts | 1 + 7 files changed, 66 insertions(+), 92 deletions(-) delete mode 100644 src/services/interrupt/index.ts diff --git a/src/services/agent-manager/index.test.ts b/src/services/agent-manager/index.test.ts index b2c51c51..598ad26c 100644 --- a/src/services/agent-manager/index.test.ts +++ b/src/services/agent-manager/index.test.ts @@ -12,7 +12,6 @@ import { Agent, AgentManager, AgentManagerOptions, ChatMode, ConnectionState, St import { initializeAnalytics } from '../analytics/mixpanel'; import { createChat } from '../chat'; import { getInitialMessages } from '../chat/intial-messages'; -import { sendInterrupt, validateInterrupt } from '../interrupt'; import { createSocketManager } from '../socket-manager'; import { createMessageEventQueue } from '../socket-manager/message-queue'; import { initializeStreamAndChat } from './connect-to-manager'; @@ -26,7 +25,6 @@ jest.mock('./connect-to-manager'); jest.mock('../socket-manager/message-queue'); jest.mock('../chat/intial-messages'); jest.mock('../chat'); -jest.mock('../interrupt'); jest.mock('../../utils/retry-operation', () => ({ retryOperation: jest.fn(fn => fn()) })); jest.mock('../../utils', () => ({ getRandom: jest.fn(() => 'random-id-123') })); jest.mock('../../utils/chat', () => ({ @@ -82,8 +80,6 @@ describe('createAgentManager', () => { (createMessageEventQueue as jest.Mock).mockReturnValue({ onMessage: jest.fn(), clearQueue: jest.fn() }); (getInitialMessages as jest.Mock).mockReturnValue([]); (createChat as jest.Mock).mockResolvedValue({ chat: mockChat }); - (validateInterrupt as jest.Mock).mockReturnValue(undefined); - (sendInterrupt as jest.Mock).mockReturnValue(undefined); }); describe('createAgentManager', () => { @@ -556,8 +552,7 @@ describe('createAgentManager', () => { manager.interrupt({ type: 'click' }); - expect(validateInterrupt).toHaveBeenCalledWith(mockStreamingManager, StreamType.Legacy, null); - expect(sendInterrupt).toHaveBeenCalledWith(mockStreamingManager, null); + expect(mockStreamingManager.interrupt).toHaveBeenCalledWith('click'); expect(mockAnalytics.track).toHaveBeenCalledWith('agent-video-interrupt', { type: 'click', video_duration_to_interrupt: expect.any(Number), @@ -569,18 +564,15 @@ describe('createAgentManager', () => { // Add a message to interrupt await manager.chat('Hello'); - // Mock validateInterrupt to throw an error - (validateInterrupt as jest.Mock).mockImplementationOnce(() => { + // Mock streamingManager.interrupt to throw a validation error + (mockStreamingManager.interrupt as jest.Mock).mockImplementationOnce(() => { throw new Error('Interrupt validation failed'); }); expect(() => manager.interrupt({ type: 'click' })).toThrow('Interrupt validation failed'); - // Verify validateInterrupt was called - expect(validateInterrupt).toHaveBeenCalledWith(mockStreamingManager, StreamType.Legacy, null); - - // Verify sendInterrupt was not called due to validation failure - expect(sendInterrupt).not.toHaveBeenCalled(); + // Verify streamingManager.interrupt was called + expect(mockStreamingManager.interrupt).toHaveBeenCalledWith('click'); }); }); diff --git a/src/services/agent-manager/index.ts b/src/services/agent-manager/index.ts index 47e876f8..f129fbfa 100644 --- a/src/services/agent-manager/index.ts +++ b/src/services/agent-manager/index.ts @@ -7,7 +7,6 @@ import { ChatResponse, ClientToolHandler, ConnectionState, - CreateSessionV2Options, CreateStreamOptions, Interrupt, Message, @@ -30,7 +29,6 @@ import { initializeAnalytics } from '../analytics/mixpanel'; import { interruptTimestampTracker, latencyTimestampTracker } from '../analytics/timestamp-tracker'; import { createChat, getRequestHeaders } from '../chat'; import { getInitialMessages } from '../chat/intial-messages'; -import { sendInterrupt, sendInterruptV2, validateInterrupt } from '../interrupt'; import { SocketManager, createSocketManager } from '../socket-manager'; import { createMessageEventQueue } from '../socket-manager/message-queue'; import { StreamingManager } from '../streaming-manager'; @@ -122,12 +120,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt lastMessage.interrupted = true; options.callbacks.onNewMessage?.([...items.messages], 'answer'); - if (isStreamsV2) { - sendInterruptV2(items.streamingManager! as StreamingManager); - } else { - validateInterrupt(items.streamingManager, items.streamingManager?.streamType, videoId); - sendInterrupt(items.streamingManager!, videoId!); - } + items.streamingManager.interrupt(type); }; const clientToolHandlers = new Map(); diff --git a/src/services/interrupt/index.ts b/src/services/interrupt/index.ts deleted file mode 100644 index f5b8a6a9..00000000 --- a/src/services/interrupt/index.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { - CreateSessionV2Options, - CreateStreamOptions, - StreamEvents, - StreamInterruptPayload, - StreamType, -} from '@sdk/types'; -import { StreamingManager } from '../streaming-manager'; -import { DataChannelTopic } from '../streaming-manager/livekit-manager'; - -export function validateInterrupt( - streamingManager: StreamingManager | undefined, - streamType: StreamType | undefined, - videoId: string | null -): void { - if (!streamingManager) { - throw new Error('Please connect to the agent first'); - } - - if (!streamingManager.interruptAvailable) { - throw new Error('Interrupt is not enabled for this stream'); - } - - if (streamType !== StreamType.Fluent) { - throw new Error('Interrupt only available for Fluent streams'); - } - - if (!videoId) { - throw new Error('No active video to interrupt'); - } -} - -export async function sendInterrupt( - streamingManager: StreamingManager, - videoId: string -): Promise { - const payload: StreamInterruptPayload = { - type: StreamEvents.StreamInterrupt, - videoId, - timestamp: Date.now(), - }; - - streamingManager.sendDataChannelMessage(JSON.stringify(payload)); -} - -export async function sendInterruptV2(streamingManager: StreamingManager): Promise { - const payload = { - topic: DataChannelTopic.Interrupt, - }; - streamingManager.sendDataChannelMessage(JSON.stringify(payload)); -} diff --git a/src/services/streaming-manager/common.ts b/src/services/streaming-manager/common.ts index a8c4a432..c6a34ad9 100644 --- a/src/services/streaming-manager/common.ts +++ b/src/services/streaming-manager/common.ts @@ -1,4 +1,4 @@ -import { CreateSessionV2Options, CreateStreamOptions, PayloadType, StreamType } from '@sdk/types'; +import { CreateSessionV2Options, CreateStreamOptions, Interrupt, PayloadType, StreamType } from '@sdk/types'; export const createStreamingLogger = (debug: boolean, prefix: string) => (message: string, extra?: any) => debug && console.log(`[${prefix}] ${message}`, extra ?? ''); @@ -86,6 +86,14 @@ export type StreamingManager Promise) { room?.registerRpcMethod(method, handler); }, diff --git a/src/services/streaming-manager/webrtc-manager.ts b/src/services/streaming-manager/webrtc-manager.ts index 6b33e808..fea7a347 100644 --- a/src/services/streaming-manager/webrtc-manager.ts +++ b/src/services/streaming-manager/webrtc-manager.ts @@ -4,10 +4,12 @@ import { AgentActivityState, ConnectionState, CreateStreamOptions, + Interrupt, PayloadType, StreamEvents, StreamingManagerOptions, StreamingState, + StreamInterruptPayload, StreamType, } from '@sdk/types'; import { createStreamingLogger, StreamingManager } from './common'; @@ -246,7 +248,9 @@ export async function createWebRTCStreamingManager { + currentVideoId = videoId; callbacks.onVideoIdChange?.(videoId); }; @@ -320,6 +324,23 @@ export async function createWebRTCStreamingManager jest.fn().mockResolvedValue({ status: 'success', duration: 5000, video_id: 'video-123' }), disconnect: () => jest.fn().mockResolvedValue(undefined), sendDataChannelMessage: () => jest.fn(), + interrupt: () => jest.fn(), }); export const StreamingManagerOptionsFactory = new Factory().attrs({ From 18025165a7059c7deb32744ee65ac8601c6032b0 Mon Sep 17 00:00:00 2001 From: dor-eitan <164745144+dor-eitan@users.noreply.github.com> Date: Wed, 29 Apr 2026 12:13:50 +0300 Subject: [PATCH 5/5] feat(livekit): add replaceMicrophoneTrack for seamless mic device swap (#371) * feat(livekit): add replaceMicrophoneTrack for seamless mic device swap Adds replaceMicrophoneTrack(track) on the LiveKit streaming manager and exposes it through the public AgentManager surface. Internally calls LocalAudioTrack.replaceTrack on the existing publication, preserving the publication (SSRC, trackSid) so the server's STT/VAD pipeline does not reset across mic device swaps. Throws when there is no publication so callers can fall back to publishMicrophoneStream. Co-Authored-By: Claude Opus 4.7 (1M context) * test(livekit): tighten replaceMicrophoneTrack coverage - Bake replaceTrack into the default mock audio track so each test gets a fresh spy without ad-hoc (mockTrack as any).replaceTrack = ... lines. - Drop the misnamed "after disconnect clears the publication" test: it hit the 'Room is not connected' guard, which is already covered by the dedicated test. Replace with a publish + explicit unpublish path that actually reaches the 'No microphone publication to replace' branch with isConnected still true. - Add a regression test that replaceTrack rejection resets the isPublishing flag so the next replace is not silently blocked. - Extend the success test to assert the SDK still holds the original publication after replace, by checking that the subsequent disconnect unpublishes that exact publication's track. Co-Authored-By: Claude Opus 4.7 (1M context) * chore(livekit): align replace error messages, log guard failures, doc resolution - Log every guard failure in replaceMicrophoneTrack before throwing, to mirror publishTrackStream and give field debugging a console line that explains why a replace was skipped. - Align error message style with the rest of livekit-manager: 'replaceMicrophoneTrack requires an audio track' -> 'Microphone track must be an audio track', 'Microphone publish in progress, cannot replace' -> 'Microphone publish in progress'. - Update the matching test assertions for the new strings. - Note in the JSDoc on both the StreamingManager and AgentManager surfaces that the promise resolves once LiveKit has switched the underlying RTCRtpSender's track. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) * chore(agent-manager): drop unnecessary async wrappers; align JSDoc Per PR review: publish/unpublish/replace passthroughs in agent-manager have no awaits, so async is just a noisy wrapper over a Promise return. Switch to non-async with explicit Promise return type; use Promise.reject/resolve to preserve the rejected-promise contract (rather than sync throw, which would change call-site semantics and break tests using .rejects.toThrow()). Also align the replaceMicrophoneTrack JSDoc with the sibling "supported only for livekit manager" tagline. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude Opus 4.7 (1M context) --- package.json | 2 +- src/services/agent-manager/index.test.ts | 28 ++++ src/services/agent-manager/index.ts | 22 +-- src/services/streaming-manager/common.ts | 10 ++ .../streaming-manager/livekit-manager.test.ts | 131 +++++++++++++++++- .../streaming-manager/livekit-manager.ts | 28 ++++ src/types/entities/agents/manager.ts | 10 ++ 7 files changed, 220 insertions(+), 11 deletions(-) diff --git a/package.json b/package.json index 805dc729..7490f2fe 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@d-id/client-sdk", "private": false, - "version": "1.1.61", + "version": "1.1.63", "type": "module", "description": "d-id client sdk", "repository": { diff --git a/src/services/agent-manager/index.test.ts b/src/services/agent-manager/index.test.ts index 598ad26c..168619d4 100644 --- a/src/services/agent-manager/index.test.ts +++ b/src/services/agent-manager/index.test.ts @@ -777,6 +777,34 @@ describe('createAgentManager', () => { }); }); + describe('replaceMicrophoneTrack', () => { + let manager: AgentManager; + + beforeEach(async () => { + manager = await createAgentManager('agent-123', mockOptions); + await manager.connect(); + }); + + it('should replace microphone track when available', async () => { + const mockTrack = { kind: 'audio', id: 'audio-track-1' } as unknown as MediaStreamTrack; + const mockReplace = jest.fn().mockResolvedValue(undefined); + mockStreamingManager.replaceMicrophoneTrack = mockReplace; + + await manager.replaceMicrophoneTrack?.(mockTrack); + + expect(mockReplace).toHaveBeenCalledWith(mockTrack); + }); + + it('should throw error when replaceMicrophoneTrack is not available', async () => { + mockStreamingManager.replaceMicrophoneTrack = undefined; + const mockTrack = { kind: 'audio', id: 'audio-track-1' } as unknown as MediaStreamTrack; + + await expect(manager.replaceMicrophoneTrack?.(mockTrack)).rejects.toThrow( + 'replaceMicrophoneTrack is not available for this streaming manager' + ); + }); + }); + describe('publishCameraStream', () => { let manager: AgentManager; diff --git a/src/services/agent-manager/index.ts b/src/services/agent-manager/index.ts index f129fbfa..73e41c91 100644 --- a/src/services/agent-manager/index.ts +++ b/src/services/agent-manager/index.ts @@ -313,27 +313,33 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt mode: items.chatMode, }); }, - async publishMicrophoneStream(stream: MediaStream) { + publishMicrophoneStream(stream: MediaStream): Promise { if (!items.streamingManager?.publishMicrophoneStream) { - throw new Error('publishMicrophoneStream is not available for this streaming manager'); + return Promise.reject(new Error('publishMicrophoneStream is not available for this streaming manager')); } return items.streamingManager.publishMicrophoneStream(stream); }, - async unpublishMicrophoneStream() { + unpublishMicrophoneStream(): Promise { if (!items.streamingManager?.unpublishMicrophoneStream) { - return; + return Promise.resolve(); } return items.streamingManager.unpublishMicrophoneStream(); }, - async publishCameraStream(stream: MediaStream) { + replaceMicrophoneTrack(track: MediaStreamTrack): Promise { + if (!items.streamingManager?.replaceMicrophoneTrack) { + return Promise.reject(new Error('replaceMicrophoneTrack is not available for this streaming manager')); + } + return items.streamingManager.replaceMicrophoneTrack(track); + }, + publishCameraStream(stream: MediaStream): Promise { if (!items.streamingManager?.publishCameraStream) { - throw new Error('publishCameraStream is not available for this streaming manager'); + return Promise.reject(new Error('publishCameraStream is not available for this streaming manager')); } return items.streamingManager.publishCameraStream(stream); }, - async unpublishCameraStream() { + unpublishCameraStream(): Promise { if (!items.streamingManager?.unpublishCameraStream) { - return; + return Promise.resolve(); } return items.streamingManager.unpublishCameraStream(); }, diff --git a/src/services/streaming-manager/common.ts b/src/services/streaming-manager/common.ts index c6a34ad9..da87ecc4 100644 --- a/src/services/streaming-manager/common.ts +++ b/src/services/streaming-manager/common.ts @@ -47,6 +47,16 @@ export type StreamingManager; + /** + * Replace the currently published microphone MediaStreamTrack without + * unpublishing. Preserves the LiveKit publication (SSRC, trackSid) so the + * server sees continuous audio. Resolves once LiveKit has switched the + * underlying RTCRtpSender's track. Rejects if no publication exists — + * caller should fall back to `publishMicrophoneStream` in that case. + * supported only for livekit manager + */ + replaceMicrophoneTrack?(track: MediaStreamTrack): Promise; + /** * Publish a camera video stream to the LiveKit room. * Can be called after connection to enable vision. diff --git a/src/services/streaming-manager/livekit-manager.test.ts b/src/services/streaming-manager/livekit-manager.test.ts index 832aed41..f643dcf9 100644 --- a/src/services/streaming-manager/livekit-manager.test.ts +++ b/src/services/streaming-manager/livekit-manager.test.ts @@ -137,11 +137,13 @@ function createMockAudioTrack(id: string = TEST_AUDIO_TRACK_ID, additionalProps: } function createMockTrack(id: string = TEST_AUDIO_TRACK_ID, mediaStreamTrack?: MediaStreamTrack) { - return { + const track: any = { kind: 'audio', id, mediaStreamTrack: mediaStreamTrack || createMockAudioTrack(id), - } as any; + }; + track.replaceTrack = jest.fn().mockResolvedValue(track); + return track; } function createMockPublication(trackId: string = TEST_AUDIO_TRACK_ID, trackSid: string = TEST_TRACK_SID) { @@ -442,6 +444,131 @@ describe('LiveKit Streaming Manager - Microphone Stream', () => { }); }); + describe('Microphone Stream Replacement', () => { + it('should throw when room is not connected', async () => { + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + const newTrack = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + + await expect(manager.replaceMicrophoneTrack?.(newTrack)).rejects.toThrow('Room is not connected'); + }); + + it('should throw when there is no microphone publication', async () => { + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + const newTrack = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + + await expect(manager.replaceMicrophoneTrack?.(newTrack)).rejects.toThrow( + 'No microphone publication to replace' + ); + }); + + it('should throw when given a non-audio track', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + + const videoTrack = createMockCameraTrack(); + + await expect(manager.replaceMicrophoneTrack?.(videoTrack)).rejects.toThrow( + 'Microphone track must be an audio track' + ); + expect(mockPublication.track.replaceTrack).not.toHaveBeenCalled(); + }); + + it('should throw when a publish is already in progress', async () => { + const mockStream = createMockStream(); + let resolvePublish: (value: any) => void; + const slowPublish = new Promise(resolve => { + resolvePublish = resolve; + }); + mockPublishTrack.mockReturnValue(slowPublish); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + const publishPromise = manager.publishMicrophoneStream?.(mockStream); + const newTrack = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + + await expect(manager.replaceMicrophoneTrack?.(newTrack)).rejects.toThrow('Microphone publish in progress'); + + resolvePublish!(createMockPublication()); + await publishPromise; + }); + + it('should call publication.track.replaceTrack and not unpublish/publish', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + mockUnpublishTrack.mockResolvedValue(undefined); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + + const newTrack = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + mockPublishTrack.mockClear(); + mockUnpublishTrack.mockClear(); + + await manager.replaceMicrophoneTrack?.(newTrack); + + expect(mockPublication.track.replaceTrack).toHaveBeenCalledTimes(1); + expect(mockPublication.track.replaceTrack).toHaveBeenCalledWith(newTrack); + expect(mockUnpublishTrack).not.toHaveBeenCalled(); + expect(mockPublishTrack).not.toHaveBeenCalled(); + + // Verify the SDK still holds the original publication reference: + // disconnect should unpublish *that exact* publication's track. + await manager.disconnect(); + expect(mockUnpublishTrack).toHaveBeenCalledTimes(1); + expect(mockUnpublishTrack).toHaveBeenCalledWith(mockPublication.track, false); + }); + + it('should throw "No microphone publication to replace" after an explicit unpublish', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + mockUnpublishTrack.mockResolvedValue(undefined); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + await manager.unpublishMicrophoneStream?.(); + + const newTrack = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + + await expect(manager.replaceMicrophoneTrack?.(newTrack)).rejects.toThrow( + 'No microphone publication to replace' + ); + expect(mockPublication.track.replaceTrack).not.toHaveBeenCalled(); + }); + + it('should reset isPublishing after replaceTrack rejects, allowing a subsequent replace', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + + const replaceSpy = mockPublication.track.replaceTrack as jest.Mock; + replaceSpy.mockRejectedValueOnce(new Error('replace failed')); + + const failingTrack = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + await expect(manager.replaceMicrophoneTrack?.(failingTrack)).rejects.toThrow('replace failed'); + + // Subsequent call must not be blocked by a leaked isPublishing flag. + const followupTrack = createMockAudioTrack('audio-track-3'); + await expect(manager.replaceMicrophoneTrack?.(followupTrack)).resolves.toBeUndefined(); + expect(replaceSpy).toHaveBeenLastCalledWith(followupTrack); + expect(replaceSpy).toHaveBeenCalledTimes(2); + }); + }); + describe('Agent Activity State Changes', () => { let mockOnAgentActivityStateChange: jest.Mock; let sendDataEvent: (event: StreamEvents, extraData?: object) => void; diff --git a/src/services/streaming-manager/livekit-manager.ts b/src/services/streaming-manager/livekit-manager.ts index a5f838bc..c9365f7c 100644 --- a/src/services/streaming-manager/livekit-manager.ts +++ b/src/services/streaming-manager/livekit-manager.ts @@ -583,6 +583,33 @@ export async function createLiveKitStreamingManager { + if (!isConnected || !room) { + log('Cannot replace microphone track: room is not connected'); + throw new Error('Room is not connected'); + } + if (track.kind !== 'audio') { + log('Cannot replace microphone track: not an audio track', { kind: track.kind }); + throw new Error('Microphone track must be an audio track'); + } + if (microphoneState.isPublishing) { + log('Cannot replace microphone track: publish in progress'); + throw new Error('Microphone publish in progress'); + } + const pub = microphoneState.publication; + if (!pub || !pub.track) { + log('Cannot replace microphone track: no publication to replace'); + throw new Error('No microphone publication to replace'); + } + try { + microphoneState.isPublishing = true; + await pub.track.replaceTrack(track); + log('Microphone track replaced', { trackId: track.id, trackSid: pub.trackSid }); + } finally { + microphoneState.isPublishing = false; + } + } + async function publishCameraStream(stream: MediaStream): Promise { return publishTrackStream( cameraState, @@ -725,6 +752,7 @@ export async function createLiveKitStreamingManager Promise; + /** + * Replace the live microphone track on the current publication without + * unpublishing. Preserves the LiveKit publication (SSRC, trackSid) so the + * server sees continuous audio across mic device swaps. Resolves once + * LiveKit has switched the underlying RTCRtpSender's track. + * Rejects if there is no active publication — callers should fall back to + * `publishMicrophoneStream` in that case. + * Supported only for the LiveKit streaming manager. + */ + replaceMicrophoneTrack?: (track: MediaStreamTrack) => Promise; /** * Publish a camera video stream to the LiveKit room. * Can be called after connection to enable vision.