diff --git a/package.json b/package.json index 805dc729..7490f2fe 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@d-id/client-sdk", "private": false, - "version": "1.1.61", + "version": "1.1.63", "type": "module", "description": "d-id client sdk", "repository": { diff --git a/src/auth/get-auth-header.test.ts b/src/auth/get-auth-header.test.ts index 7608ab96..1fd48b36 100644 --- a/src/auth/get-auth-header.test.ts +++ b/src/auth/get-auth-header.test.ts @@ -128,7 +128,7 @@ describe('getAuthHeader', () => { const auth: Auth = { type: 'key', clientKey: 'test-client-key' }; const result = getAuthHeader(auth); - expect(result).toBe('Client-Key test-client-key.generated-external-id_mocked-random-id'); + expect(result).toBe('Client-Key test-client-key.generated-external-id'); }); it('should use provided externalId in Client-Key header', () => { @@ -136,8 +136,7 @@ describe('getAuthHeader', () => { const externalId = 'user-123'; const result = getAuthHeader(auth, externalId); - expect(result).toBe('Client-Key test-client-key.user-123_mocked-random-id'); - expect(result).toContain('user-123'); + expect(result).toBe('Client-Key test-client-key.user-123'); }); it('should use externalId from localStorage when not provided', () => { @@ -147,8 +146,7 @@ describe('getAuthHeader', () => { const auth: Auth = { type: 'key', clientKey: 'test-client-key' }; const result = getAuthHeader(auth); - expect(result).toBe('Client-Key test-client-key.stored-user-id_mocked-random-id'); - expect(result).toContain('stored-user-id'); + expect(result).toBe('Client-Key test-client-key.stored-user-id'); }); it('should generate new externalId and store it when localStorage is empty', () => { @@ -158,8 +156,7 @@ describe('getAuthHeader', () => { const auth: Auth = { type: 'key', clientKey: 'test-client-key' }; const result = getAuthHeader(auth); - expect(result).toBe('Client-Key test-client-key.new-generated-id_mocked-random-id'); - expect(result).toContain('new-generated-id'); + expect(result).toBe('Client-Key test-client-key.new-generated-id'); expect(window.localStorage.getItem('did_external_key_id')).toBe(mockRandomId); }); diff --git a/src/auth/get-auth-header.ts b/src/auth/get-auth-header.ts index 2e3eddc1..7959b887 100644 --- a/src/auth/get-auth-header.ts +++ b/src/auth/get-auth-header.ts @@ -18,14 +18,13 @@ export function getExternalId(externalId?: string): string { return key; } -let sessionKey = getRandom(); export function getAuthHeader(auth: Auth, externalId?: string) { if (auth.type === 'bearer') { return `Bearer ${auth.token}`; } else if (auth.type === 'basic') { return `Basic ${'token' in auth ? auth.token : btoa(`${auth.username}:${auth.password}`)}`; } else if (auth.type === 'key') { - return `Client-Key ${auth.clientKey}.${getExternalId(externalId)}_${sessionKey}`; + return `Client-Key ${auth.clientKey}.${getExternalId(externalId)}`; } else { throw new Error(`Unknown auth type: ${auth}`); } diff --git a/src/services/agent-manager/connect-to-manager.test.ts b/src/services/agent-manager/connect-to-manager.test.ts index 6e80a36e..99f42a35 100644 --- a/src/services/agent-manager/connect-to-manager.test.ts +++ b/src/services/agent-manager/connect-to-manager.test.ts @@ -86,6 +86,7 @@ describe('connect-to-manager', () => { onSrcObjectReady: jest.fn(), onNewMessage: jest.fn(), onNewChat: jest.fn(), + onToolEvent: jest.fn(), }, }; @@ -239,6 +240,7 @@ describe('connect-to-manager', () => { let onAgentActivityStateChange: (state: AgentActivityState) => void; let onFirstAudioDetected: ((metrics: { latency?: number; networkLatency?: number }) => void) | undefined; let onStreamReady: (() => void) | undefined; + let onToolEvent: ((event: StreamEvents, data: any) => void) | undefined; beforeEach(async () => { // Initialize callbacks to avoid undefined errors @@ -252,6 +254,7 @@ describe('connect-to-manager', () => { onAgentActivityStateChange = options.callbacks.onAgentActivityStateChange; onFirstAudioDetected = options.callbacks.onFirstAudioDetected; onStreamReady = options.callbacks.onStreamReady; + onToolEvent = options.callbacks.onToolEvent; return new Promise(resolve => { setTimeout(() => { @@ -447,6 +450,79 @@ describe('connect-to-manager', () => { }); }); }); + + describe('onToolEvent', () => { + const startedPayload = { + call_id: 'call-1', + name: 'lookup', + input: { q: 'hello' }, + output: {}, + timestamp: '2026-04-28T00:00:00Z', + }; + const donePayload = { + ...startedPayload, + output: { result: 'ok' }, + duration_ms: 123, + extra: { region: 'eu' }, + }; + const errorPayload = { + ...donePayload, + extra: { reason: 'timeout', code: 504 }, + }; + + beforeEach(() => { + (mockAnalytics.track as jest.Mock).mockClear(); + }); + + it('forwards started event to user callback and tracks agent-tool-call', () => { + onToolEvent?.(StreamEvents.ToolCallStarted, startedPayload as any); + + expect(mockOptions.callbacks.onToolEvent).toHaveBeenCalledWith( + StreamEvents.ToolCallStarted, + startedPayload + ); + expect(mockAnalytics.track).toHaveBeenCalledWith('agent-tool-call', { + event: 'started', + call_id: 'call-1', + name: 'lookup', + }); + }); + + it('tracks done event with duration_ms and extra_keys count', () => { + onToolEvent?.(StreamEvents.ToolCallDone, donePayload as any); + + expect(mockAnalytics.track).toHaveBeenCalledWith('agent-tool-call', { + event: 'done', + call_id: 'call-1', + name: 'lookup', + duration_ms: 123, + extra_keys: 1, + }); + }); + + it('tracks error event with duration_ms and extra_keys count', () => { + onToolEvent?.(StreamEvents.ToolCallError, errorPayload as any); + + expect(mockAnalytics.track).toHaveBeenCalledWith('agent-tool-call', { + event: 'error', + call_id: 'call-1', + name: 'lookup', + duration_ms: 123, + extra_keys: 2, + }); + }); + + it('handles missing extra map by emitting extra_keys=0', () => { + const { extra: _extra, ...donePayloadWithoutExtra } = donePayload; + + onToolEvent?.(StreamEvents.ToolCallDone, donePayloadWithoutExtra as any); + + expect(mockAnalytics.track).toHaveBeenCalledWith( + 'agent-tool-call', + expect.objectContaining({ extra_keys: 0 }) + ); + }); + }); }); describe('Stream Options Mapping', () => { diff --git a/src/services/agent-manager/connect-to-manager.ts b/src/services/agent-manager/connect-to-manager.ts index 28b84e4b..a65d6ef7 100644 --- a/src/services/agent-manager/connect-to-manager.ts +++ b/src/services/agent-manager/connect-to-manager.ts @@ -20,6 +20,9 @@ import { StreamEvents, StreamType, StreamingState, + ToolCallDonePayload, + ToolCallErrorPayload, + ToolEventPayload, TransportProvider, } from '@sdk/types'; import { isStreamsV2Agent } from '@sdk/utils/agent'; @@ -161,6 +164,30 @@ function trackLegacyVideoAnalytics( } } +function trackToolEventAnalytics( + event: StreamEvents.ToolCallStarted | StreamEvents.ToolCallDone | StreamEvents.ToolCallError, + payload: ToolEventPayload, + analytics: Analytics +) { + const baseProps: Record = { + call_id: payload.call_id, + name: payload.name, + }; + + if (event === StreamEvents.ToolCallStarted) { + analytics.track('agent-tool-call', { ...baseProps, event: 'started' }); + return; + } + + const finishedPayload = payload as ToolCallDonePayload | ToolCallErrorPayload; + analytics.track('agent-tool-call', { + ...baseProps, + event: event === StreamEvents.ToolCallDone ? 'done' : 'error', + duration_ms: finishedPayload.duration_ms, + extra_keys: finishedPayload.extra ? Object.keys(finishedPayload.extra).length : 0, + }); +} + type ConnectToManagerOptions = AgentManagerOptions & { callbacks: AgentManagerOptions['callbacks'] & { onVideoIdChange?: (videoId: string | null) => void; @@ -263,6 +290,10 @@ function connectToManager( const readyLatency = streamReadyTimestampTracker.get(true); analytics.track('agent-chat', { event: 'ready', latency: readyLatency }); }, + onToolEvent: ((event: any, data: any) => { + options.callbacks.onToolEvent?.(event, data); + trackToolEventAnalytics(event, data, analytics); + }) as typeof options.callbacks.onToolEvent, }, }, signal diff --git a/src/services/agent-manager/index.test.ts b/src/services/agent-manager/index.test.ts index b2c51c51..168619d4 100644 --- a/src/services/agent-manager/index.test.ts +++ b/src/services/agent-manager/index.test.ts @@ -12,7 +12,6 @@ import { Agent, AgentManager, AgentManagerOptions, ChatMode, ConnectionState, St import { initializeAnalytics } from '../analytics/mixpanel'; import { createChat } from '../chat'; import { getInitialMessages } from '../chat/intial-messages'; -import { sendInterrupt, validateInterrupt } from '../interrupt'; import { createSocketManager } from '../socket-manager'; import { createMessageEventQueue } from '../socket-manager/message-queue'; import { initializeStreamAndChat } from './connect-to-manager'; @@ -26,7 +25,6 @@ jest.mock('./connect-to-manager'); jest.mock('../socket-manager/message-queue'); jest.mock('../chat/intial-messages'); jest.mock('../chat'); -jest.mock('../interrupt'); jest.mock('../../utils/retry-operation', () => ({ retryOperation: jest.fn(fn => fn()) })); jest.mock('../../utils', () => ({ getRandom: jest.fn(() => 'random-id-123') })); jest.mock('../../utils/chat', () => ({ @@ -82,8 +80,6 @@ describe('createAgentManager', () => { (createMessageEventQueue as jest.Mock).mockReturnValue({ onMessage: jest.fn(), clearQueue: jest.fn() }); (getInitialMessages as jest.Mock).mockReturnValue([]); (createChat as jest.Mock).mockResolvedValue({ chat: mockChat }); - (validateInterrupt as jest.Mock).mockReturnValue(undefined); - (sendInterrupt as jest.Mock).mockReturnValue(undefined); }); describe('createAgentManager', () => { @@ -556,8 +552,7 @@ describe('createAgentManager', () => { manager.interrupt({ type: 'click' }); - expect(validateInterrupt).toHaveBeenCalledWith(mockStreamingManager, StreamType.Legacy, null); - expect(sendInterrupt).toHaveBeenCalledWith(mockStreamingManager, null); + expect(mockStreamingManager.interrupt).toHaveBeenCalledWith('click'); expect(mockAnalytics.track).toHaveBeenCalledWith('agent-video-interrupt', { type: 'click', video_duration_to_interrupt: expect.any(Number), @@ -569,18 +564,15 @@ describe('createAgentManager', () => { // Add a message to interrupt await manager.chat('Hello'); - // Mock validateInterrupt to throw an error - (validateInterrupt as jest.Mock).mockImplementationOnce(() => { + // Mock streamingManager.interrupt to throw a validation error + (mockStreamingManager.interrupt as jest.Mock).mockImplementationOnce(() => { throw new Error('Interrupt validation failed'); }); expect(() => manager.interrupt({ type: 'click' })).toThrow('Interrupt validation failed'); - // Verify validateInterrupt was called - expect(validateInterrupt).toHaveBeenCalledWith(mockStreamingManager, StreamType.Legacy, null); - - // Verify sendInterrupt was not called due to validation failure - expect(sendInterrupt).not.toHaveBeenCalled(); + // Verify streamingManager.interrupt was called + expect(mockStreamingManager.interrupt).toHaveBeenCalledWith('click'); }); }); @@ -785,6 +777,34 @@ describe('createAgentManager', () => { }); }); + describe('replaceMicrophoneTrack', () => { + let manager: AgentManager; + + beforeEach(async () => { + manager = await createAgentManager('agent-123', mockOptions); + await manager.connect(); + }); + + it('should replace microphone track when available', async () => { + const mockTrack = { kind: 'audio', id: 'audio-track-1' } as unknown as MediaStreamTrack; + const mockReplace = jest.fn().mockResolvedValue(undefined); + mockStreamingManager.replaceMicrophoneTrack = mockReplace; + + await manager.replaceMicrophoneTrack?.(mockTrack); + + expect(mockReplace).toHaveBeenCalledWith(mockTrack); + }); + + it('should throw error when replaceMicrophoneTrack is not available', async () => { + mockStreamingManager.replaceMicrophoneTrack = undefined; + const mockTrack = { kind: 'audio', id: 'audio-track-1' } as unknown as MediaStreamTrack; + + await expect(manager.replaceMicrophoneTrack?.(mockTrack)).rejects.toThrow( + 'replaceMicrophoneTrack is not available for this streaming manager' + ); + }); + }); + describe('publishCameraStream', () => { let manager: AgentManager; diff --git a/src/services/agent-manager/index.ts b/src/services/agent-manager/index.ts index 47e876f8..73e41c91 100644 --- a/src/services/agent-manager/index.ts +++ b/src/services/agent-manager/index.ts @@ -7,7 +7,6 @@ import { ChatResponse, ClientToolHandler, ConnectionState, - CreateSessionV2Options, CreateStreamOptions, Interrupt, Message, @@ -30,7 +29,6 @@ import { initializeAnalytics } from '../analytics/mixpanel'; import { interruptTimestampTracker, latencyTimestampTracker } from '../analytics/timestamp-tracker'; import { createChat, getRequestHeaders } from '../chat'; import { getInitialMessages } from '../chat/intial-messages'; -import { sendInterrupt, sendInterruptV2, validateInterrupt } from '../interrupt'; import { SocketManager, createSocketManager } from '../socket-manager'; import { createMessageEventQueue } from '../socket-manager/message-queue'; import { StreamingManager } from '../streaming-manager'; @@ -122,12 +120,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt lastMessage.interrupted = true; options.callbacks.onNewMessage?.([...items.messages], 'answer'); - if (isStreamsV2) { - sendInterruptV2(items.streamingManager! as StreamingManager); - } else { - validateInterrupt(items.streamingManager, items.streamingManager?.streamType, videoId); - sendInterrupt(items.streamingManager!, videoId!); - } + items.streamingManager.interrupt(type); }; const clientToolHandlers = new Map(); @@ -320,27 +313,33 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt mode: items.chatMode, }); }, - async publishMicrophoneStream(stream: MediaStream) { + publishMicrophoneStream(stream: MediaStream): Promise { if (!items.streamingManager?.publishMicrophoneStream) { - throw new Error('publishMicrophoneStream is not available for this streaming manager'); + return Promise.reject(new Error('publishMicrophoneStream is not available for this streaming manager')); } return items.streamingManager.publishMicrophoneStream(stream); }, - async unpublishMicrophoneStream() { + unpublishMicrophoneStream(): Promise { if (!items.streamingManager?.unpublishMicrophoneStream) { - return; + return Promise.resolve(); } return items.streamingManager.unpublishMicrophoneStream(); }, - async publishCameraStream(stream: MediaStream) { + replaceMicrophoneTrack(track: MediaStreamTrack): Promise { + if (!items.streamingManager?.replaceMicrophoneTrack) { + return Promise.reject(new Error('replaceMicrophoneTrack is not available for this streaming manager')); + } + return items.streamingManager.replaceMicrophoneTrack(track); + }, + publishCameraStream(stream: MediaStream): Promise { if (!items.streamingManager?.publishCameraStream) { - throw new Error('publishCameraStream is not available for this streaming manager'); + return Promise.reject(new Error('publishCameraStream is not available for this streaming manager')); } return items.streamingManager.publishCameraStream(stream); }, - async unpublishCameraStream() { + unpublishCameraStream(): Promise { if (!items.streamingManager?.unpublishCameraStream) { - return; + return Promise.resolve(); } return items.streamingManager.unpublishCameraStream(); }, diff --git a/src/services/interrupt/index.ts b/src/services/interrupt/index.ts deleted file mode 100644 index f5b8a6a9..00000000 --- a/src/services/interrupt/index.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { - CreateSessionV2Options, - CreateStreamOptions, - StreamEvents, - StreamInterruptPayload, - StreamType, -} from '@sdk/types'; -import { StreamingManager } from '../streaming-manager'; -import { DataChannelTopic } from '../streaming-manager/livekit-manager'; - -export function validateInterrupt( - streamingManager: StreamingManager | undefined, - streamType: StreamType | undefined, - videoId: string | null -): void { - if (!streamingManager) { - throw new Error('Please connect to the agent first'); - } - - if (!streamingManager.interruptAvailable) { - throw new Error('Interrupt is not enabled for this stream'); - } - - if (streamType !== StreamType.Fluent) { - throw new Error('Interrupt only available for Fluent streams'); - } - - if (!videoId) { - throw new Error('No active video to interrupt'); - } -} - -export async function sendInterrupt( - streamingManager: StreamingManager, - videoId: string -): Promise { - const payload: StreamInterruptPayload = { - type: StreamEvents.StreamInterrupt, - videoId, - timestamp: Date.now(), - }; - - streamingManager.sendDataChannelMessage(JSON.stringify(payload)); -} - -export async function sendInterruptV2(streamingManager: StreamingManager): Promise { - const payload = { - topic: DataChannelTopic.Interrupt, - }; - streamingManager.sendDataChannelMessage(JSON.stringify(payload)); -} diff --git a/src/services/streaming-manager/common.ts b/src/services/streaming-manager/common.ts index a8c4a432..da87ecc4 100644 --- a/src/services/streaming-manager/common.ts +++ b/src/services/streaming-manager/common.ts @@ -1,4 +1,4 @@ -import { CreateSessionV2Options, CreateStreamOptions, PayloadType, StreamType } from '@sdk/types'; +import { CreateSessionV2Options, CreateStreamOptions, Interrupt, PayloadType, StreamType } from '@sdk/types'; export const createStreamingLogger = (debug: boolean, prefix: string) => (message: string, extra?: any) => debug && console.log(`[${prefix}] ${message}`, extra ?? ''); @@ -47,6 +47,16 @@ export type StreamingManager; + /** + * Replace the currently published microphone MediaStreamTrack without + * unpublishing. Preserves the LiveKit publication (SSRC, trackSid) so the + * server sees continuous audio. Resolves once LiveKit has switched the + * underlying RTCRtpSender's track. Rejects if no publication exists — + * caller should fall back to `publishMicrophoneStream` in that case. + * supported only for livekit manager + */ + replaceMicrophoneTrack?(track: MediaStreamTrack): Promise; + /** * Publish a camera video stream to the LiveKit room. * Can be called after connection to enable vision. @@ -86,6 +96,14 @@ export type StreamingManager { }); }); + describe('Microphone Stream Replacement', () => { + it('should throw when room is not connected', async () => { + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + const newTrack = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + + await expect(manager.replaceMicrophoneTrack?.(newTrack)).rejects.toThrow('Room is not connected'); + }); + + it('should throw when there is no microphone publication', async () => { + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + const newTrack = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + + await expect(manager.replaceMicrophoneTrack?.(newTrack)).rejects.toThrow( + 'No microphone publication to replace' + ); + }); + + it('should throw when given a non-audio track', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + + const videoTrack = createMockCameraTrack(); + + await expect(manager.replaceMicrophoneTrack?.(videoTrack)).rejects.toThrow( + 'Microphone track must be an audio track' + ); + expect(mockPublication.track.replaceTrack).not.toHaveBeenCalled(); + }); + + it('should throw when a publish is already in progress', async () => { + const mockStream = createMockStream(); + let resolvePublish: (value: any) => void; + const slowPublish = new Promise(resolve => { + resolvePublish = resolve; + }); + mockPublishTrack.mockReturnValue(slowPublish); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + const publishPromise = manager.publishMicrophoneStream?.(mockStream); + const newTrack = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + + await expect(manager.replaceMicrophoneTrack?.(newTrack)).rejects.toThrow('Microphone publish in progress'); + + resolvePublish!(createMockPublication()); + await publishPromise; + }); + + it('should call publication.track.replaceTrack and not unpublish/publish', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + mockUnpublishTrack.mockResolvedValue(undefined); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + + const newTrack = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + mockPublishTrack.mockClear(); + mockUnpublishTrack.mockClear(); + + await manager.replaceMicrophoneTrack?.(newTrack); + + expect(mockPublication.track.replaceTrack).toHaveBeenCalledTimes(1); + expect(mockPublication.track.replaceTrack).toHaveBeenCalledWith(newTrack); + expect(mockUnpublishTrack).not.toHaveBeenCalled(); + expect(mockPublishTrack).not.toHaveBeenCalled(); + + // Verify the SDK still holds the original publication reference: + // disconnect should unpublish *that exact* publication's track. + await manager.disconnect(); + expect(mockUnpublishTrack).toHaveBeenCalledTimes(1); + expect(mockUnpublishTrack).toHaveBeenCalledWith(mockPublication.track, false); + }); + + it('should throw "No microphone publication to replace" after an explicit unpublish', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + mockUnpublishTrack.mockResolvedValue(undefined); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + await manager.unpublishMicrophoneStream?.(); + + const newTrack = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + + await expect(manager.replaceMicrophoneTrack?.(newTrack)).rejects.toThrow( + 'No microphone publication to replace' + ); + expect(mockPublication.track.replaceTrack).not.toHaveBeenCalled(); + }); + + it('should reset isPublishing after replaceTrack rejects, allowing a subsequent replace', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + + const replaceSpy = mockPublication.track.replaceTrack as jest.Mock; + replaceSpy.mockRejectedValueOnce(new Error('replace failed')); + + const failingTrack = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + await expect(manager.replaceMicrophoneTrack?.(failingTrack)).rejects.toThrow('replace failed'); + + // Subsequent call must not be blocked by a leaked isPublishing flag. + const followupTrack = createMockAudioTrack('audio-track-3'); + await expect(manager.replaceMicrophoneTrack?.(followupTrack)).resolves.toBeUndefined(); + expect(replaceSpy).toHaveBeenLastCalledWith(followupTrack); + expect(replaceSpy).toHaveBeenCalledTimes(2); + }); + }); + describe('Agent Activity State Changes', () => { let mockOnAgentActivityStateChange: jest.Mock; let sendDataEvent: (event: StreamEvents, extraData?: object) => void; diff --git a/src/services/streaming-manager/livekit-manager.ts b/src/services/streaming-manager/livekit-manager.ts index 0582cad1..c9365f7c 100644 --- a/src/services/streaming-manager/livekit-manager.ts +++ b/src/services/streaming-manager/livekit-manager.ts @@ -4,6 +4,7 @@ import { ConnectivityState, CreateSessionV2Options, CreateStreamOptions, + Interrupt, Message, PayloadType, StreamEvents, @@ -582,6 +583,33 @@ export async function createLiveKitStreamingManager { + if (!isConnected || !room) { + log('Cannot replace microphone track: room is not connected'); + throw new Error('Room is not connected'); + } + if (track.kind !== 'audio') { + log('Cannot replace microphone track: not an audio track', { kind: track.kind }); + throw new Error('Microphone track must be an audio track'); + } + if (microphoneState.isPublishing) { + log('Cannot replace microphone track: publish in progress'); + throw new Error('Microphone publish in progress'); + } + const pub = microphoneState.publication; + if (!pub || !pub.track) { + log('Cannot replace microphone track: no publication to replace'); + throw new Error('No microphone publication to replace'); + } + try { + microphoneState.isPublishing = true; + await pub.track.replaceTrack(track); + log('Microphone track replaced', { trackId: track.id, trackSid: pub.trackSid }); + } finally { + microphoneState.isPublishing = false; + } + } + async function publishCameraStream(stream: MediaStream): Promise { return publishTrackStream( cameraState, @@ -724,9 +752,18 @@ export async function createLiveKitStreamingManager Promise) { room?.registerRpcMethod(method, handler); }, diff --git a/src/services/streaming-manager/webrtc-manager.ts b/src/services/streaming-manager/webrtc-manager.ts index 6b33e808..fea7a347 100644 --- a/src/services/streaming-manager/webrtc-manager.ts +++ b/src/services/streaming-manager/webrtc-manager.ts @@ -4,10 +4,12 @@ import { AgentActivityState, ConnectionState, CreateStreamOptions, + Interrupt, PayloadType, StreamEvents, StreamingManagerOptions, StreamingState, + StreamInterruptPayload, StreamType, } from '@sdk/types'; import { createStreamingLogger, StreamingManager } from './common'; @@ -246,7 +248,9 @@ export async function createWebRTCStreamingManager { + currentVideoId = videoId; callbacks.onVideoIdChange?.(videoId); }; @@ -320,6 +324,23 @@ export async function createWebRTCStreamingManager jest.fn().mockResolvedValue({ status: 'success', duration: 5000, video_id: 'video-123' }), disconnect: () => jest.fn().mockResolvedValue(undefined), sendDataChannelMessage: () => jest.fn(), + interrupt: () => jest.fn(), }); export const StreamingManagerOptionsFactory = new Factory().attrs({ diff --git a/src/types/entities/agents/manager.ts b/src/types/entities/agents/manager.ts index 044c15e3..8f915dcd 100644 --- a/src/types/entities/agents/manager.ts +++ b/src/types/entities/agents/manager.ts @@ -224,6 +224,16 @@ export interface AgentManager { * supported only for livekit manager */ unpublishMicrophoneStream?: () => Promise; + /** + * Replace the live microphone track on the current publication without + * unpublishing. Preserves the LiveKit publication (SSRC, trackSid) so the + * server sees continuous audio across mic device swaps. Resolves once + * LiveKit has switched the underlying RTCRtpSender's track. + * Rejects if there is no active publication — callers should fall back to + * `publishMicrophoneStream` in that case. + * Supported only for the LiveKit streaming manager. + */ + replaceMicrophoneTrack?: (track: MediaStreamTrack) => Promise; /** * Publish a camera video stream to the LiveKit room. * Can be called after connection to enable vision.