From a8ef4079bd9093b643ef2ac80d59446fc8f1d739 Mon Sep 17 00:00:00 2001 From: Arik Sfaradi Date: Tue, 30 Dec 2025 10:43:44 +0200 Subject: [PATCH 1/8] fix current version (#252) --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 6df1288b..7b052999 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@d-id/client-sdk", "private": false, - "version": "1.1.20", + "version": "1.1.21", "type": "module", "description": "d-id client sdk", "repository": { From c66ba35670083f5d4e6dcf4224bdbdb135bd8b00 Mon Sep 17 00:00:00 2001 From: netanelben-hamo Date: Tue, 30 Dec 2025 16:47:08 +0200 Subject: [PATCH 2/8] abort stream connection when create chat failed (#251) * abort stream connection when create chat failed * fix tests * add tests * fix pr comments * remove abort checks, fetch abort automatically * fix pr comment, replace set timeout with promise.resolve --- src/api/streams/streamApi.ts | 32 +++-- .../agent-manager/connect-to-manager.test.ts | 133 ++++++++++++++++- .../agent-manager/connect-to-manager.ts | 134 +++++++++++------- src/services/agent-manager/index.ts | 13 +- src/services/chat/index.ts | 21 +-- .../streaming-manager/advanced.test.ts | 7 +- .../streaming-manager/business-flows.test.ts | 5 +- .../streaming-manager/edge-cases.test.ts | 31 +++- .../streaming-manager/factory.test.ts | 14 +- src/services/streaming-manager/factory.ts | 5 +- .../streaming-manager/webrtc-core.test.ts | 15 +- .../streaming-manager/webrtc-manager.ts | 12 +- src/types/stream/stream.ts | 16 ++- 13 files changed, 325 insertions(+), 113 deletions(-) diff --git a/src/api/streams/streamApi.ts b/src/api/streams/streamApi.ts index 0ff77646..ac244e7a 100644 --- a/src/api/streams/streamApi.ts +++ b/src/api/streams/streamApi.ts @@ -20,20 +20,28 @@ export function createStreamApi( const client = createClient(auth, `${host}/agents/${agentId}`, onError); return { - createStream(options: CreateStreamOptions) { - return client.post('/streams', options); + createStream(options: CreateStreamOptions, signal?: AbortSignal) { + return client.post('/streams', options, { signal }); }, - startConnection(streamId: string, answer: RTCSessionDescriptionInit, sessionId?: string) { - return client.post(`/streams/${streamId}/sdp`, { - session_id: sessionId, - answer, - }); + startConnection(streamId: string, answer: RTCSessionDescriptionInit, sessionId?: string, signal?: AbortSignal) { + return client.post( + `/streams/${streamId}/sdp`, + { + session_id: sessionId, + answer, + }, + { signal } + ); }, - addIceCandidate(streamId: string, candidate: IceCandidate, sessionId: string) { - return client.post(`/streams/${streamId}/ice`, { - session_id: sessionId, - ...candidate, - }); + addIceCandidate(streamId: string, candidate: IceCandidate, sessionId: string, signal?: AbortSignal) { + return client.post( + `/streams/${streamId}/ice`, + { + session_id: sessionId, + ...candidate, + }, + { signal } + ); }, sendStreamRequest(streamId: string, sessionId: string, payload: SendClipStreamPayload | SendTalkStreamPayload) { return client.post(`/streams/${streamId}`, { diff --git a/src/services/agent-manager/connect-to-manager.test.ts b/src/services/agent-manager/connect-to-manager.test.ts index 6cf2763b..bc94124d 100644 --- a/src/services/agent-manager/connect-to-manager.test.ts +++ b/src/services/agent-manager/connect-to-manager.test.ts @@ -37,10 +37,15 @@ describe('connect-to-manager', () => { let mockAnalytics: Analytics; let mockStreamingManager: any; let mockChat: any; + let mockAbortController: AbortController; + let mockAbortSignal: AbortSignal; beforeEach(() => { jest.clearAllMocks(); + mockAbortController = new AbortController(); + mockAbortSignal = mockAbortController.signal; + mockAgent = { id: 'agent-123', name: 'Test Agent', @@ -165,7 +170,8 @@ describe('connect-to-manager', () => { onVideoStateChange: expect.any(Function), onAgentActivityStateChange: expect.any(Function), }), - }) + }), + mockAbortSignal ); }); @@ -393,7 +399,8 @@ describe('connect-to-manager', () => { }, expect.not.objectContaining({ chatId: expect.anything(), - }) + }), + mockAbortSignal ); }); @@ -414,7 +421,8 @@ describe('connect-to-manager', () => { }, expect.not.objectContaining({ chatId: expect.anything(), - }) + }), + mockAbortSignal ); }); @@ -437,7 +445,8 @@ describe('connect-to-manager', () => { }), expect.not.objectContaining({ chatId: expect.anything(), - }) + }), + mockAbortSignal ); }); }); @@ -518,7 +527,7 @@ describe('connect-to-manager', () => { }, }; - await initializeStreamAndChat(expressiveAgent, mockOptions, mockAgentsApi, mockAnalytics); + const result = await initializeStreamAndChat(expressiveAgent, mockOptions, mockAgentsApi, mockAnalytics); expect(createStreamingManager).toHaveBeenCalledWith( expressiveAgent, @@ -528,8 +537,19 @@ describe('connect-to-manager', () => { }, expect.not.objectContaining({ chatId: expect.anything(), - }) + }), + undefined ); + + // Verify Streams V2 path creates chat with correct chatId format + expect(result.chat).toBeDefined(); + expect(result.chat?.id).toMatch(/^cht_/); + expect(result.chat?.id).toContain(mockStreamingManager.sessionId); + expect(result.chat?.chat_mode).toBe(ChatMode.Functional); + expect(result.chat?.agent_id).toBe(expressiveAgent.id); + + // Verify createChat is NOT called for V2 agents (chat is created internally) + expect(createChat).not.toHaveBeenCalled(); }); it('should use CreateStreamOptions for non-expressive agents', async () => { @@ -544,8 +564,107 @@ describe('connect-to-manager', () => { }), expect.not.objectContaining({ chatId: expect.anything(), - }) + }), + mockAbortSignal + ); + }); + }); + + describe('Error Handling with AbortController', () => { + it('should abort and disconnect streaming manager when error occurs during initialization', async () => { + const error = new Error('Connection failed'); + let streamingManagerRef: any; + + (createStreamingManager as jest.Mock).mockImplementationOnce((agent, streamOptions, options, signal) => { + streamingManagerRef = { + ...mockStreamingManager, + disconnect: jest.fn().mockResolvedValue(undefined), + }; + return new Promise((resolve, reject) => { + Promise.resolve().then(() => { + if (options.callbacks.onConnectionStateChange) { + options.callbacks.onConnectionStateChange(ConnectionState.Connecting); + } + reject(error); + }); + }); + }); + + (createChat as jest.Mock).mockResolvedValueOnce({ + chat: mockChat, + chatMode: ChatMode.Functional, + }); + + await expect(initializeStreamAndChat(mockAgent, mockOptions, mockAgentsApi, mockAnalytics)).rejects.toThrow( + 'Connection failed' + ); + }); + + it('should handle error when streaming manager is created but chat creation fails', async () => { + const chatError = new Error('Chat creation failed'); + const disconnectSpy = jest.fn().mockResolvedValue(undefined); + const streamingManagerWithDisconnect = { + ...mockStreamingManager, + disconnect: disconnectSpy, + }; + + // Make streaming manager succeed immediately to set streamingManagerRef + (createStreamingManager as jest.Mock).mockImplementationOnce((agent, streamOptions, options) => { + // Trigger connection state change to Connected in next microtask + Promise.resolve().then(() => { + if (options.callbacks.onConnectionStateChange) { + options.callbacks.onConnectionStateChange(ConnectionState.Connected); + } + }); + return Promise.resolve(streamingManagerWithDisconnect); + }); + + // Make chat creation fail AFTER streaming manager resolves + // The .then() callback on connectToManagerPromise sets streamingManagerRef, + // so we need connectToManager to resolve before Promise.all rejects + (createChat as jest.Mock).mockImplementationOnce(() => { + return new Promise((resolve, reject) => { + Promise.resolve().then(() => { + reject(chatError); + }); + }); + }); + + await expect(initializeStreamAndChat(mockAgent, mockOptions, mockAgentsApi, mockAnalytics)).rejects.toThrow( + 'Chat creation failed' ); + + expect(disconnectSpy).toHaveBeenCalled(); + }); + }); + + describe('Connection State Handling', () => { + it('should resolve when connection state changes to Connected before manager is ready', async () => { + let onConnectionStateChange: ((state: ConnectionState) => void) | undefined; + let managerResolved = false; + + (createStreamingManager as jest.Mock).mockImplementationOnce((agent, streamOptions, options) => { + onConnectionStateChange = options.callbacks.onConnectionStateChange; + + return new Promise(resolve => { + // Trigger connection state change to Connected BEFORE manager is created + // This should set shouldResolveOnComplete = true + if (onConnectionStateChange) { + onConnectionStateChange(ConnectionState.Connected); + } + + // Resolve the manager in the next microtask + Promise.resolve().then(() => { + managerResolved = true; + resolve(mockStreamingManager); + }); + }); + }); + + const result = await initializeStreamAndChat(mockAgent, mockOptions, mockAgentsApi, mockAnalytics); + + expect(managerResolved).toBe(true); + expect(result.streamingManager).toBe(mockStreamingManager); }); }); }); diff --git a/src/services/agent-manager/connect-to-manager.ts b/src/services/agent-manager/connect-to-manager.ts index a9ee16e0..2e443452 100644 --- a/src/services/agent-manager/connect-to-manager.ts +++ b/src/services/agent-manager/connect-to-manager.ts @@ -168,7 +168,8 @@ type ConnectToManagerOptions = AgentManagerOptions & { function connectToManager( agent: Agent, options: ConnectToManagerOptions, - analytics: Analytics + analytics: Analytics, + signal?: AbortSignal ): Promise> { latencyTimestampTracker.reset(); @@ -177,53 +178,58 @@ function connectToManager( let streamingManager: StreamingManager; let shouldResolveOnComplete = false; - streamingManager = await createStreamingManager(agent, getAgentStreamOptions(agent, options), { - ...options, - analytics, - callbacks: { - ...options.callbacks, - onConnectionStateChange: state => { - options.callbacks.onConnectionStateChange?.(state); - - if (state === ConnectionState.Connected) { - // If manager is ready, resolve immediately - // Otherwise, mark to resolve after manager is created - if (streamingManager) { - resolve(streamingManager); + streamingManager = await createStreamingManager( + agent, + getAgentStreamOptions(agent, options), + { + ...options, + analytics, + callbacks: { + ...options.callbacks, + onConnectionStateChange: state => { + options.callbacks.onConnectionStateChange?.(state); + + if (state === ConnectionState.Connected) { + // If manager is ready, resolve immediately + // Otherwise, mark to resolve after manager is created + if (streamingManager) { + resolve(streamingManager); + } else { + shouldResolveOnComplete = true; + } + } + }, + onVideoStateChange: (state: StreamingState, statsReport?: any) => { + options.callbacks.onVideoStateChange?.(state); + + trackVideoStateChangeAnalytics( + state, + agent, + statsReport, + analytics, + streamingManager.streamType + ); + }, + onAgentActivityStateChange: (state: AgentActivityState) => { + options.callbacks.onAgentActivityStateChange?.(state); + + if (state === AgentActivityState.Talking) { + interruptTimestampTracker.update(); } else { - shouldResolveOnComplete = true; + interruptTimestampTracker.reset(); } - } - }, - onVideoStateChange: (state: StreamingState, statsReport?: any) => { - options.callbacks.onVideoStateChange?.(state); - - trackVideoStateChangeAnalytics( - state, - agent, - statsReport, - analytics, - streamingManager.streamType - ); - }, - onAgentActivityStateChange: (state: AgentActivityState) => { - options.callbacks.onAgentActivityStateChange?.(state); - - if (state === AgentActivityState.Talking) { - interruptTimestampTracker.update(); - } else { - interruptTimestampTracker.reset(); - } - - trackAgentActivityAnalytics( - state === AgentActivityState.Talking ? StreamingState.Start : StreamingState.Stop, - agent, - analytics, - streamingManager.streamType - ); + + trackAgentActivityAnalytics( + state === AgentActivityState.Talking ? StreamingState.Start : StreamingState.Stop, + agent, + analytics, + streamingManager.streamType + ); + }, }, }, - }); + signal + ); if (shouldResolveOnComplete) { resolve(streamingManager); @@ -263,17 +269,35 @@ export async function initializeStreamAndChat( }; return { chatResult, streamingManager }; } else { - const createChatPromise = createChat( - agent, - agentsApi, - analytics, - options.mode, - options.persistentChat, - chat - ); - const connectToManagerPromise = connectToManager(agent, options, analytics); - const [chatResult, streamingManager] = await Promise.all([createChatPromise, connectToManagerPromise]); - return { chatResult, streamingManager }; + const abortController = new AbortController(); + const signal = abortController.signal; + let streamingManagerRef: StreamingManager | undefined; + + try { + const createChatPromise = createChat( + agent, + agentsApi, + analytics, + options.mode, + options.persistentChat, + chat + ); + const connectToManagerPromise = connectToManager(agent, options, analytics, signal).then(manager => { + streamingManagerRef = manager; + return manager; + }); + + const [chatResult, streamingManager] = await Promise.all([createChatPromise, connectToManagerPromise]); + return { chatResult, streamingManager }; + } catch (error) { + abortController.abort(); + + if (streamingManagerRef) { + await streamingManagerRef.disconnect().catch(() => {}); + } + + throw error; + } } }; diff --git a/src/services/agent-manager/index.ts b/src/services/agent-manager/index.ts index c4c2090c..8fbd1989 100644 --- a/src/services/agent-manager/index.ts +++ b/src/services/agent-manager/index.ts @@ -122,8 +122,8 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt ); const initPromise = retryOperation( - () => { - return initializeStreamAndChat( + () => + initializeStreamAndChat( agentEntity, { ...options, @@ -132,14 +132,15 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt agentsApi, analytics, items.chat - ); - }, + ), { limit: 3, timeout: CONNECTION_RETRY_TIMEOUT_MS, timeoutErrorMessage: 'Timeout initializing the stream', - // Retry on all errors except for connection errors and rate limit errors, these are already handled in client level. - shouldRetryFn: (error: any) => error?.message !== 'Could not connect' && error.status !== 429, + shouldRetryFn: (error: any) => + error?.message !== 'Could not connect' && + error.status !== 429 && + error?.message !== 'InsufficientCreditsError', delayMs: 1000, } ).catch(e => { diff --git a/src/services/chat/index.ts b/src/services/chat/index.ts index 82f75687..94a192e1 100644 --- a/src/services/chat/index.ts +++ b/src/services/chat/index.ts @@ -29,16 +29,19 @@ export async function createChat( return { chat, chatMode: chat?.chat_mode ?? chatMode }; } catch (error: any) { - try { - const parsedError = JSON.parse(error.message); - - if (parsedError?.kind === 'InsufficientCreditsError') { - throw new Error('InsufficientCreditsError'); - } - } catch (e) { - console.error('Error parsing the error message:', e); + const errorKind = getErrorKind(error); + if (errorKind === 'InsufficientCreditsError') { + throw new Error('InsufficientCreditsError'); } - throw new Error('Cannot create new chat'); } } + +const getErrorKind = (error: Error) => { + try { + const parsedError = JSON.parse(error.message); + return parsedError?.kind; + } catch (e) { + return 'UnknownError'; + } +}; diff --git a/src/services/streaming-manager/advanced.test.ts b/src/services/streaming-manager/advanced.test.ts index dd783c28..a4db8e9a 100644 --- a/src/services/streaming-manager/advanced.test.ts +++ b/src/services/streaming-manager/advanced.test.ts @@ -451,7 +451,12 @@ describe('Streaming Manager Advanced', () => { mockPC.onicecandidate(mockEvent); - expect(mockApi.addIceCandidate).toHaveBeenCalledWith('streamId', { candidate: null }, 'sessionId'); + expect(mockApi.addIceCandidate).toHaveBeenCalledWith( + 'streamId', + { candidate: null }, + 'sessionId', + undefined + ); }); it('should handle agent activity state changes for fluent streams', async () => { diff --git a/src/services/streaming-manager/business-flows.test.ts b/src/services/streaming-manager/business-flows.test.ts index 2d283da8..0caf2f31 100644 --- a/src/services/streaming-manager/business-flows.test.ts +++ b/src/services/streaming-manager/business-flows.test.ts @@ -37,11 +37,12 @@ describe('Streaming Manager Business Flows', () => { const manager = await createStreamingManager(agentId, agent, options); // Verify connection API calls were made in correct order - expect(mockApi.createStream).toHaveBeenCalledWith(agent); + expect(mockApi.createStream).toHaveBeenCalledWith(agent, undefined); expect(mockApi.startConnection).toHaveBeenCalledWith( 'streamId', expect.objectContaining({ type: 'answer' }), - 'sessionId' + 'sessionId', + undefined ); // Verify stream creation callback diff --git a/src/services/streaming-manager/edge-cases.test.ts b/src/services/streaming-manager/edge-cases.test.ts index 82d2bde3..633cb8f1 100644 --- a/src/services/streaming-manager/edge-cases.test.ts +++ b/src/services/streaming-manager/edge-cases.test.ts @@ -147,11 +147,17 @@ describe('Streaming Manager Edge Cases', () => { sdpMid: fullCandidate.sdpMid, sdpMLineIndex: fullCandidate.sdpMLineIndex, }, - 'sessionId' + 'sessionId', + undefined ); mockPC.onicecandidate({ candidate: null }); - expect(mockApi.addIceCandidate).toHaveBeenCalledWith('streamId', { candidate: null }, 'sessionId'); + expect(mockApi.addIceCandidate).toHaveBeenCalledWith( + 'streamId', + { candidate: null }, + 'sessionId', + undefined + ); }); it('should test ICE candidate branches comprehensively', async () => { @@ -159,13 +165,28 @@ describe('Streaming Manager Edge Cases', () => { const mockPC = (window.RTCPeerConnection as any).mock.results[0].value; mockPC.onicecandidate({ candidate: { candidate: 'test', sdpMid: null, sdpMLineIndex: 0 } }); - expect(mockApi.addIceCandidate).toHaveBeenCalledWith('streamId', { candidate: null }, 'sessionId'); + expect(mockApi.addIceCandidate).toHaveBeenCalledWith( + 'streamId', + { candidate: null }, + 'sessionId', + undefined + ); mockPC.onicecandidate({ candidate: { candidate: 'test', sdpMid: '0', sdpMLineIndex: null } }); - expect(mockApi.addIceCandidate).toHaveBeenCalledWith('streamId', { candidate: null }, 'sessionId'); + expect(mockApi.addIceCandidate).toHaveBeenCalledWith( + 'streamId', + { candidate: null }, + 'sessionId', + undefined + ); mockPC.onicecandidate({ candidate: { candidate: 'test', sdpMid: null, sdpMLineIndex: null } }); - expect(mockApi.addIceCandidate).toHaveBeenCalledWith('streamId', { candidate: null }, 'sessionId'); + expect(mockApi.addIceCandidate).toHaveBeenCalledWith( + 'streamId', + { candidate: null }, + 'sessionId', + undefined + ); }); it('should test error handling in ICE candidate processing', async () => { diff --git a/src/services/streaming-manager/factory.test.ts b/src/services/streaming-manager/factory.test.ts index 2dbb2244..e9c557af 100644 --- a/src/services/streaming-manager/factory.test.ts +++ b/src/services/streaming-manager/factory.test.ts @@ -46,7 +46,12 @@ describe('createStreamingManager', () => { await createStreamingManager(agent, { version: StreamApiVersion.V1, ...mockStreamOptions }, mockOptions); - expect(mockCreateWebRTCStreamingManager).toHaveBeenCalledWith(agent.id, mockStreamOptions, mockOptions); + expect(mockCreateWebRTCStreamingManager).toHaveBeenCalledWith( + agent.id, + mockStreamOptions, + mockOptions, + undefined + ); expect(mockCreateLiveKitStreamingManager).not.toHaveBeenCalled(); }); @@ -65,7 +70,12 @@ describe('createStreamingManager', () => { await createStreamingManager(agent, { version: StreamApiVersion.V1, ...mockStreamOptions }, mockOptions); - expect(mockCreateWebRTCStreamingManager).toHaveBeenCalledWith(agent.id, mockStreamOptions, mockOptions); + expect(mockCreateWebRTCStreamingManager).toHaveBeenCalledWith( + agent.id, + mockStreamOptions, + mockOptions, + undefined + ); expect(mockCreateLiveKitStreamingManager).not.toHaveBeenCalled(); }); diff --git a/src/services/streaming-manager/factory.ts b/src/services/streaming-manager/factory.ts index e3a282ee..207b3954 100644 --- a/src/services/streaming-manager/factory.ts +++ b/src/services/streaming-manager/factory.ts @@ -20,14 +20,15 @@ export type ExtendedStreamOptions = export async function createStreamingManager( agent: Agent, streamOptions: ExtendedStreamOptions, - options: StreamingManagerOptions + options: StreamingManagerOptions, + signal?: AbortSignal ): Promise> { const agentId = agent.id; switch (streamOptions.version) { case StreamApiVersion.V1: { const { version, ...createStreamOptions } = streamOptions; - return createWebRTCStreamingManager(agentId, createStreamOptions, options); + return createWebRTCStreamingManager(agentId, createStreamOptions, options, signal); } case StreamApiVersion.V2: { diff --git a/src/services/streaming-manager/webrtc-core.test.ts b/src/services/streaming-manager/webrtc-core.test.ts index 0704d5c5..5e5044ba 100644 --- a/src/services/streaming-manager/webrtc-core.test.ts +++ b/src/services/streaming-manager/webrtc-core.test.ts @@ -36,7 +36,7 @@ describe('Streaming Manager Core', () => { it('should create streaming manager and set up peer connection', async () => { const manager = await createStreamingManager(agentId, agentStreamOptions, options); - expect(mockApi.createStream).toHaveBeenCalledWith(agentStreamOptions); + expect(mockApi.createStream).toHaveBeenCalledWith(agentStreamOptions, undefined); expect(manager.streamId).toBe('streamId'); expect(manager.sessionId).toBe('sessionId'); expect(options.callbacks.onStreamCreated).toHaveBeenCalledWith( @@ -100,7 +100,8 @@ describe('Streaming Manager Core', () => { expect(mockApi.addIceCandidate).toHaveBeenCalledWith( 'streamId', expect.objectContaining({ candidate: 'cand' }), - 'sessionId' + 'sessionId', + undefined ); }); @@ -111,7 +112,12 @@ describe('Streaming Manager Core', () => { mockPC.onicecandidate(mockEvent); - expect(mockApi.addIceCandidate).toHaveBeenCalledWith('streamId', { candidate: null }, 'sessionId'); + expect(mockApi.addIceCandidate).toHaveBeenCalledWith( + 'streamId', + { candidate: null }, + 'sessionId', + undefined + ); }); it('should handle errors in ICE candidate handling', async () => { @@ -301,7 +307,8 @@ describe('Streaming Manager Core', () => { expect(mockApi.startConnection).toHaveBeenCalledWith( 'streamId', { type: 'answer', sdp: 'mock-sdp' }, - 'sessionId' + 'sessionId', + undefined ); }); diff --git a/src/services/streaming-manager/webrtc-manager.ts b/src/services/streaming-manager/webrtc-manager.ts index e0afde1b..d8344c82 100644 --- a/src/services/streaming-manager/webrtc-manager.ts +++ b/src/services/streaming-manager/webrtc-manager.ts @@ -133,7 +133,8 @@ function handleStreamState({ export async function createWebRTCStreamingManager( agentId: string, streamOptions: T, - { debug = false, callbacks, auth, baseURL = didApiUrl, analytics }: StreamingManagerOptions + { debug = false, callbacks, auth, baseURL = didApiUrl, analytics }: StreamingManagerOptions, + signal?: AbortSignal ): Promise> { const log = createStreamingLogger(debug, 'WebRTCStreamingManager'); const parseDataChannelMessage = createParseDataChannelMessage(log); @@ -159,7 +160,7 @@ export async function createWebRTCStreamingManager = T extends TalkStreamOptions : never; export interface RtcApi { - createStream(options: CreateStreamOptions): Promise; - startConnection(streamId: string, answer: RTCSessionDescriptionInit, sessionId?: string): Promise; - addIceCandidate(streamId: string, candidate: IceCandidate, sessionId: string): Promise; + createStream(options: CreateStreamOptions, signal?: AbortSignal): Promise; + startConnection( + streamId: string, + answer: RTCSessionDescriptionInit, + sessionId?: string, + signal?: AbortSignal + ): Promise; + addIceCandidate( + streamId: string, + candidate: IceCandidate, + sessionId: string, + signal?: AbortSignal + ): Promise; sendStreamRequest( streamId: string, sessionId: string, From 8648d918f792c18159f8ef337fadcffeaa72d68b Mon Sep 17 00:00:00 2001 From: netanelben-hamo Date: Wed, 31 Dec 2025 15:12:21 +0200 Subject: [PATCH 3/8] upgrade sdk version (#255) --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 7b052999..5885f5e2 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@d-id/client-sdk", "private": false, - "version": "1.1.21", + "version": "1.1.22", "type": "module", "description": "d-id client sdk", "repository": { From 5d1d66c31e8ee63eb7383a196288548fff63fc44 Mon Sep 17 00:00:00 2001 From: Ofek Simhi <158498125+osimhi213@users.noreply.github.com> Date: Mon, 5 Jan 2026 14:24:20 +0200 Subject: [PATCH 4/8] Feature/livekit streaming backend metrics (#257) * handle livekit datachannel stream events * prettier * CR --- .../streaming-manager/livekit-manager.ts | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/services/streaming-manager/livekit-manager.ts b/src/services/streaming-manager/livekit-manager.ts index 2ee2ed19..78f65649 100644 --- a/src/services/streaming-manager/livekit-manager.ts +++ b/src/services/streaming-manager/livekit-manager.ts @@ -239,18 +239,30 @@ export async function createLiveKitStreamingManager Date: Tue, 6 Jan 2026 11:01:01 +0200 Subject: [PATCH 5/8] Chore/session v2 readiness latency (#258) * session ready load time * CR * remove log * fix + add test --- .../agent-manager/connect-to-manager.test.ts | 17 +++++++++++++++++ .../agent-manager/connect-to-manager.ts | 18 ++++++++++++++++-- src/services/analytics/timestamp-tracker.ts | 1 + .../streaming-manager/livekit-manager.ts | 1 + .../streaming-manager/webrtc-manager.ts | 2 +- src/types/stream/stream.ts | 1 + 6 files changed, 37 insertions(+), 3 deletions(-) diff --git a/src/services/agent-manager/connect-to-manager.test.ts b/src/services/agent-manager/connect-to-manager.test.ts index bc94124d..3c241217 100644 --- a/src/services/agent-manager/connect-to-manager.test.ts +++ b/src/services/agent-manager/connect-to-manager.test.ts @@ -28,6 +28,7 @@ jest.mock('../../config/environment', () => ({ jest.mock('../analytics/timestamp-tracker', () => ({ latencyTimestampTracker: { reset: jest.fn(), update: jest.fn(), get: jest.fn(() => 1000) }, interruptTimestampTracker: { reset: jest.fn(), update: jest.fn(), get: jest.fn(() => 500) }, + streamReadyTimestampTracker: { reset: jest.fn(), update: jest.fn(), get: jest.fn(() => 1500) }, })); describe('connect-to-manager', () => { @@ -236,6 +237,7 @@ describe('connect-to-manager', () => { let onConnectionStateChange: (state: ConnectionState) => void; let onVideoStateChange: (state: StreamingState, statsReport?: any) => void; let onAgentActivityStateChange: (state: AgentActivityState) => void; + let onStreamReady: (() => void) | undefined; beforeEach(async () => { // Initialize callbacks to avoid undefined errors @@ -247,6 +249,7 @@ describe('connect-to-manager', () => { onConnectionStateChange = options.callbacks.onConnectionStateChange; onVideoStateChange = options.callbacks.onVideoStateChange; onAgentActivityStateChange = options.callbacks.onAgentActivityStateChange; + onStreamReady = options.callbacks.onStreamReady; return new Promise(resolve => { setTimeout(() => { @@ -370,6 +373,20 @@ describe('connect-to-manager', () => { ); }); }); + + describe('onStreamReady', () => { + it('should track mixpanel event with latency when stream is ready', () => { + const { streamReadyTimestampTracker } = require('../analytics/timestamp-tracker'); + streamReadyTimestampTracker.get.mockReturnValue(2000); + + onStreamReady?.(); + + expect(mockAnalytics.track).toHaveBeenCalledWith('agent-chat', { + event: 'ready', + latency: 2000, + }); + }); + }); }); describe('Stream Options Mapping', () => { diff --git a/src/services/agent-manager/connect-to-manager.ts b/src/services/agent-manager/connect-to-manager.ts index 2e443452..3864113a 100644 --- a/src/services/agent-manager/connect-to-manager.ts +++ b/src/services/agent-manager/connect-to-manager.ts @@ -23,7 +23,11 @@ import { } from '@sdk/types'; import { isStreamsV2Agent } from '@sdk/utils/agent'; import { Analytics } from '../analytics/mixpanel'; -import { interruptTimestampTracker, latencyTimestampTracker } from '../analytics/timestamp-tracker'; +import { + interruptTimestampTracker, + latencyTimestampTracker, + streamReadyTimestampTracker, +} from '../analytics/timestamp-tracker'; import { createChat } from '../chat'; const ChatPrefix = 'cht'; @@ -172,15 +176,21 @@ function connectToManager( signal?: AbortSignal ): Promise> { latencyTimestampTracker.reset(); + streamReadyTimestampTracker.update(); return new Promise(async (resolve, reject) => { try { let streamingManager: StreamingManager; let shouldResolveOnComplete = false; + const streamOptions = getAgentStreamOptions(agent, options); + + analytics.enrich({ + 'stream-version': streamOptions.version.toString(), + }); streamingManager = await createStreamingManager( agent, - getAgentStreamOptions(agent, options), + streamOptions, { ...options, analytics, @@ -226,6 +236,10 @@ function connectToManager( streamingManager.streamType ); }, + onStreamReady: () => { + const readyLatency = streamReadyTimestampTracker.get(true); + analytics.track('agent-chat', { event: 'ready', latency: readyLatency }); + }, }, }, signal diff --git a/src/services/analytics/timestamp-tracker.ts b/src/services/analytics/timestamp-tracker.ts index c3b217f5..381a89c0 100644 --- a/src/services/analytics/timestamp-tracker.ts +++ b/src/services/analytics/timestamp-tracker.ts @@ -10,3 +10,4 @@ function createTimestampTracker() { export const latencyTimestampTracker = createTimestampTracker(); export const interruptTimestampTracker = createTimestampTracker(); +export const streamReadyTimestampTracker = createTimestampTracker(); diff --git a/src/services/streaming-manager/livekit-manager.ts b/src/services/streaming-manager/livekit-manager.ts index 78f65649..904e9f98 100644 --- a/src/services/streaming-manager/livekit-manager.ts +++ b/src/services/streaming-manager/livekit-manager.ts @@ -226,6 +226,7 @@ export async function createLiveKitStreamingManager void; onVideoIdChange?: (videoId: string | null) => void; onStreamCreated?: (stream: { stream_id: string; session_id: string; agent_id: string }) => void; + onStreamReady?: () => void; } export type ManagerCallbackKeys = keyof ManagerCallbacks; From a7279d63808b7b74b0ee93eaa6470c8a98a26b9f Mon Sep 17 00:00:00 2001 From: Arik Sfaradi Date: Tue, 6 Jan 2026 17:11:08 +0200 Subject: [PATCH 6/8] Feature/microphone stream (#254) * add microphone stream tracking option * add mic stream option in agent manager options * Publish the MediaStreamTrack directly * split transribe events from answers * handle case when message role is user - add it to messages if its transcribed do not override last conent of message if its transcirbed * expose publsihMicrophoneStream to give more control to the client when to use the stream instead of passing it and decide for him * prevent issues when Multiple tracks published sumultaneosly or duplocate track publishing * expose in manager * demo adaptations * enable/disable microphone in app expose unpublishMicrophon * default to empty string if not provided * tests --- demo/app.tsx | 198 ++++++++++- demo/hooks/useAgentManager.ts | 41 ++- src/services/agent-manager/index.test.ts | 53 +++ src/services/agent-manager/index.ts | 12 + src/services/socket-manager/message-queue.ts | 60 +++- src/services/streaming-manager/common.ts | 15 + .../streaming-manager/factory.test.ts | 35 ++ .../streaming-manager/livekit-manager.test.ts | 310 ++++++++++++++++++ .../streaming-manager/livekit-manager.ts | 109 +++++- src/types/entities/agents/chat.ts | 1 + src/types/entities/agents/manager.ts | 18 + src/types/stream/stream.ts | 7 + 12 files changed, 839 insertions(+), 20 deletions(-) create mode 100644 src/services/streaming-manager/livekit-manager.test.ts diff --git a/demo/app.tsx b/demo/app.tsx index 6aa04403..aa2b19fc 100644 --- a/demo/app.tsx +++ b/demo/app.tsx @@ -1,5 +1,5 @@ import { ChatMode, ConnectionState } from '@sdk/types'; -import { useEffect, useRef, useState } from 'preact/hooks'; +import { useCallback, useEffect, useRef, useState } from 'preact/hooks'; import './app.css'; import { agentId, clientKey, debug, didApiUrl, didSocketApiUrl } from './environment'; @@ -14,35 +14,179 @@ export function App() { const [sessionTimeout, setSessionTimeout] = useState(); const [compatibilityMode, setCompatibilityMode] = useState<'on' | 'off' | 'auto'>(); const [fluent, setFluent] = useState(false); + const [enableMicrophone, setEnableMicrophone] = useState(true); + const [microphoneStream, setMicrophoneStream] = useState(undefined); + const microphoneStreamRef = useRef(undefined); + const [audioInputDevices, setAudioInputDevices] = useState([]); + const [selectedAudioDeviceId, setSelectedAudioDeviceId] = useState(''); const videoRef = useRef(null); - const { srcObject, connectionState, messages, isSpeaking, connect, disconnect, speak, chat, interrupt } = - useAgentManager({ - debug, - agentId, - baseURL: didApiUrl, - wsURL: didSocketApiUrl, - mode, - enableAnalytics: false, - auth: { type: 'key', clientKey }, - streamOptions: { streamWarmup: warmup, sessionTimeout, compatibilityMode, fluent }, - }); + const { + srcObject, + connectionState, + messages, + isSpeaking, + connect, + disconnect, + speak, + chat, + interrupt, + publishMicrophoneStream, + unpublishMicrophoneStream, + microphoneEnabled, + isMicrophonePublished, + } = useAgentManager({ + debug, + agentId, + baseURL: didApiUrl, + wsURL: didSocketApiUrl, + mode, + enableAnalytics: false, + auth: { type: 'key', clientKey }, + streamOptions: { streamWarmup: warmup, sessionTimeout, compatibilityMode, fluent }, + }); + + const cleanupMicrophoneStream = useCallback(() => { + if (microphoneStreamRef.current) { + microphoneStreamRef.current.getTracks().forEach(track => track.stop()); + microphoneStreamRef.current = undefined; + setMicrophoneStream(undefined); + } + }, []); + + const updateAudioDevices = useCallback(async () => { + try { + await navigator.mediaDevices.getUserMedia({ audio: true }); + const devices = await navigator.mediaDevices.enumerateDevices(); + const audioInputs = devices.filter(device => device.kind === 'audioinput'); + + const realDevices = audioInputs.filter( + device => !device.label.toLowerCase().includes('blackhole') && + !device.label.toLowerCase().includes('virtual') + ); + + const devicesToShow = realDevices.length > 0 ? realDevices : audioInputs; + + setAudioInputDevices(devicesToShow); + + if (devicesToShow.length > 0 && !selectedAudioDeviceId) { + setSelectedAudioDeviceId(devicesToShow[0].deviceId); + } + } catch (error) { + console.error('Failed to enumerate audio devices:', error); + } + }, [selectedAudioDeviceId]); + + const handleMicrophoneToggle = useCallback( + async (enabled: boolean) => { + if (connectionState !== ConnectionState.Connected) { + setEnableMicrophone(enabled); + return; + } + + if (enabled) { + if (!microphoneStreamRef.current) { + try { + const audioConstraints: MediaStreamConstraints['audio'] = selectedAudioDeviceId + ? { deviceId: { exact: selectedAudioDeviceId } } + : true; + const stream = await navigator.mediaDevices.getUserMedia({ audio: audioConstraints }); + setMicrophoneStream(stream); + microphoneStreamRef.current = stream; + } catch (error) { + console.error('Failed to get microphone access:', error); + alert('Failed to access microphone. Please check permissions.'); + return; + } + } + + if (microphoneStreamRef.current && publishMicrophoneStream) { + try { + await publishMicrophoneStream(microphoneStreamRef.current); + setEnableMicrophone(true); + } catch (error) { + console.error('Failed to publish microphone stream:', error); + } + } + } else { + if (unpublishMicrophoneStream) { + try { + await unpublishMicrophoneStream(); + setEnableMicrophone(false); + } catch (error) { + console.error('Failed to unpublish microphone stream:', error); + } + } + } + }, + [connectionState, selectedAudioDeviceId, publishMicrophoneStream, unpublishMicrophoneStream] + ); async function onClick() { if (connectionState === ConnectionState.New || connectionState === ConnectionState.Fail) { + if (enableMicrophone && !microphoneStreamRef.current) { + try { + const audioConstraints: MediaStreamConstraints['audio'] = selectedAudioDeviceId + ? { deviceId: { exact: selectedAudioDeviceId } } + : true; + + const stream = await navigator.mediaDevices.getUserMedia({ audio: audioConstraints }); + setMicrophoneStream(stream); + microphoneStreamRef.current = stream; + } catch (error) { + console.error('Failed to get microphone access:', error); + alert('Failed to access microphone. Please check permissions.'); + return; + } + } + await connect(); } else if (connectionState === ConnectionState.Connected && text) { await speak(text); } } + useEffect(() => { + return cleanupMicrophoneStream; + }, [cleanupMicrophoneStream]); + + useEffect(() => { + if (!enableMicrophone && microphoneStreamRef.current) { + cleanupMicrophoneStream(); + } + }, [enableMicrophone, cleanupMicrophoneStream]); + + useEffect(() => { + if (enableMicrophone) { + updateAudioDevices(); + } + }, [enableMicrophone, updateAudioDevices]); + useEffect(() => { if (srcObject && videoRef.current) { videoRef.current.srcObject = srcObject; } }, [srcObject]); + useEffect(() => { + if ( + connectionState === ConnectionState.Connected && + enableMicrophone && + microphoneEnabled && + publishMicrophoneStream && + microphoneStreamRef.current && + !isMicrophonePublished + ) { + const stream = microphoneStreamRef.current; + publishMicrophoneStream(stream).catch(error => { + if (error) { + console.error('Failed to publish microphone stream:', error); + } + }); + } + }, [connectionState, enableMicrophone, microphoneEnabled, publishMicrophoneStream, isMicrophonePublished]); + return (
@@ -105,7 +249,37 @@ export function App() { /> Fluent + + {microphoneEnabled && ( + + )}
+ {microphoneEnabled && enableMicrophone && audioInputDevices.length > 0 && ( +
+ +
+ )} diff --git a/demo/hooks/useAgentManager.ts b/demo/hooks/useAgentManager.ts index 3696d393..11338770 100644 --- a/demo/hooks/useAgentManager.ts +++ b/demo/hooks/useAgentManager.ts @@ -9,7 +9,7 @@ import { StreamType, StreamingState, } from '@sdk/types'; -import { useCallback, useEffect, useState } from 'preact/hooks'; +import { useCallback, useEffect, useMemo, useState } from 'preact/hooks'; interface UseAgentManagerOptions { agentId: string; @@ -52,6 +52,7 @@ export function useAgentManager(props: UseAgentManagerOptions) { const [srcObject, setSrcObject] = useState(null); const [agentManager, setAgentManager] = useState(null); const [connectionState, setConnectionState] = useState(ConnectionState.New); + const [isMicrophonePublished, setIsMicrophonePublished] = useState(false); const streamType = agentManager?.getStreamType(); useEffect(() => { @@ -183,6 +184,40 @@ export function useAgentManager(props: UseAgentManagerOptions) { } }, [agentManager, connectionState]); + const publishMicrophoneStream = useCallback( + async (stream: MediaStream) => { + if (!agentManager) { + console.warn('Agent manager is not initialized yet. Will retry when ready.'); + return; + } + if (!agentManager.publishMicrophoneStream) { + throw new Error('publishMicrophoneStream is not available for this streaming manager'); + } + await agentManager.publishMicrophoneStream(stream); + setIsMicrophonePublished(true); + }, + [agentManager] + ); + + const unpublishMicrophoneStream = useCallback(async () => { + if (!agentManager) { + console.warn('Agent manager is not initialized yet.'); + return; + } + if (!agentManager.unpublishMicrophoneStream) { + throw new Error('unpublishMicrophoneStream is not available for this streaming manager'); + } + await agentManager.unpublishMicrophoneStream(); + setIsMicrophonePublished(false); + }, [agentManager]); + + const microphoneEnabled = useMemo(() => { + return ( + agentManager?.agent?.presenter?.type === 'expressive' && + typeof agentManager?.publishMicrophoneStream === 'function' + ); + }, [agentManager]); + return { connectionState, messages, @@ -193,5 +228,9 @@ export function useAgentManager(props: UseAgentManagerOptions) { speak, chat, interrupt, + publishMicrophoneStream, + unpublishMicrophoneStream, + microphoneEnabled, + isMicrophonePublished, }; } diff --git a/src/services/agent-manager/index.test.ts b/src/services/agent-manager/index.test.ts index 430e0f28..14614e56 100644 --- a/src/services/agent-manager/index.test.ts +++ b/src/services/agent-manager/index.test.ts @@ -698,6 +698,59 @@ describe('createAgentManager', () => { }); }); + describe('publishMicrophoneStream', () => { + let manager: AgentManager; + + beforeEach(async () => { + manager = await createAgentManager('agent-123', mockOptions); + await manager.connect(); + }); + + it('should publish microphone stream when available', async () => { + const mockStream = new MediaStream(); + const mockPublish = jest.fn().mockResolvedValue(undefined); + mockStreamingManager.publishMicrophoneStream = mockPublish; + + await manager.publishMicrophoneStream?.(mockStream); + + expect(mockPublish).toHaveBeenCalledWith(mockStream); + }); + + it('should throw error when publishMicrophoneStream is not available', async () => { + mockStreamingManager.publishMicrophoneStream = undefined; + + await expect(manager.publishMicrophoneStream?.(new MediaStream())).rejects.toThrow( + 'publishMicrophoneStream is not available for this streaming manager' + ); + }); + }); + + describe('unpublishMicrophoneStream', () => { + let manager: AgentManager; + + beforeEach(async () => { + manager = await createAgentManager('agent-123', mockOptions); + await manager.connect(); + }); + + it('should unpublish microphone stream when available', async () => { + const mockUnpublish = jest.fn().mockResolvedValue(undefined); + mockStreamingManager.unpublishMicrophoneStream = mockUnpublish; + + await manager.unpublishMicrophoneStream?.(); + + expect(mockUnpublish).toHaveBeenCalled(); + }); + + it('should throw error when unpublishMicrophoneStream is not available', async () => { + mockStreamingManager.unpublishMicrophoneStream = undefined; + + await expect(manager.unpublishMicrophoneStream?.()).rejects.toThrow( + 'unpublishMicrophoneStream is not available for this streaming manager' + ); + }); + }); + describe('DirectPlayback mode', () => { it('should not create socket manager in DirectPlayback mode', async () => { const directPlaybackOptions = { ...mockOptions, mode: ChatMode.DirectPlayback }; diff --git a/src/services/agent-manager/index.ts b/src/services/agent-manager/index.ts index 8fbd1989..f5eea324 100644 --- a/src/services/agent-manager/index.ts +++ b/src/services/agent-manager/index.ts @@ -227,6 +227,18 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt mode: items.chatMode, }); }, + async publishMicrophoneStream(stream: MediaStream) { + if (!items.streamingManager?.publishMicrophoneStream) { + throw new Error('publishMicrophoneStream is not available for this streaming manager'); + } + return items.streamingManager.publishMicrophoneStream(stream); + }, + async unpublishMicrophoneStream() { + if (!items.streamingManager?.unpublishMicrophoneStream) { + throw new Error('unpublishMicrophoneStream is not available for this streaming manager'); + } + return items.streamingManager.unpublishMicrophoneStream(); + }, async chat(userMessage: string) { const validateChatRequest = () => { if (isChatModeWithoutChat(options.mode)) { diff --git a/src/services/socket-manager/message-queue.ts b/src/services/socket-manager/message-queue.ts index 4380d0a2..e73e3ec0 100644 --- a/src/services/socket-manager/message-queue.ts +++ b/src/services/socket-manager/message-queue.ts @@ -1,4 +1,5 @@ import { Agent, AgentManagerOptions, ChatProgress, StreamEvents } from '@sdk/types'; +import { Message } from '@sdk/types/entities/agents/chat'; import { getStreamAnalyticsProps } from '@sdk/utils/analytics'; import { AgentManagerItems } from '../agent-manager'; import { Analytics } from '../analytics/mixpanel'; @@ -23,6 +24,26 @@ function getMessageContent(chatEventQueue: ChatEventQueue) { return content; } +function handleAudioTranscribedMessage( + data: any, + items: AgentManagerItems, + onNewMessage: AgentManagerOptions['callbacks']['onNewMessage'] +) { + if (!data.content) { + return; + } + + const userMessage: Message = { + id: data.id || `user-${Date.now()}`, + role: data.role, + content: data.content, + created_at: data.created_at || new Date().toISOString(), + transcribed: true, + }; + items.messages.push(userMessage); + onNewMessage?.([...items.messages], 'user'); +} + function processChatEvent( event: ChatProgress, data: any, @@ -30,9 +51,30 @@ function processChatEvent( items: AgentManagerItems, onNewMessage: AgentManagerOptions['callbacks']['onNewMessage'] ) { + if (event === ChatProgress.Transcribe && data.content) { + handleAudioTranscribedMessage(data, items, onNewMessage); + return; + } + + if (!(event === ChatProgress.Partial || event === ChatProgress.Answer)) { + return; + } + const lastMessage = items.messages[items.messages.length - 1]; - if (!(event === ChatProgress.Partial || event === ChatProgress.Answer) || lastMessage?.role !== 'assistant') { + let currentMessage: Message; + if (lastMessage?.transcribed && lastMessage.role === 'user') { + const initialContent = event === ChatProgress.Answer ? data.content || '' : ''; + currentMessage = { + id: data.id || `assistant-${Date.now()}`, + role: data.role || 'assistant', + content: data.content || '', + created_at: data.created_at || new Date().toISOString(), + }; + items.messages.push(currentMessage); + } else if (lastMessage?.role === 'assistant') { + currentMessage = lastMessage; + } else { return; } @@ -46,8 +88,8 @@ function processChatEvent( const messageContent = getMessageContent(chatEventQueue); - if (lastMessage.content !== messageContent || event === ChatProgress.Answer) { - lastMessage.content = messageContent; + if (currentMessage.content !== messageContent || event === ChatProgress.Answer) { + currentMessage.content = messageContent; onNewMessage?.([...items.messages], event); } @@ -66,9 +108,15 @@ export function createMessageEventQueue( clearQueue: () => (chatEventQueue = {}), onMessage: (event: ChatProgress | StreamEvents, data: any) => { if ('content' in data) { - processChatEvent(event as ChatProgress, data, chatEventQueue, items, options.callbacks.onNewMessage); - - if (event === ChatProgress.Answer) { + const chatEvent = + event === StreamEvents.ChatAnswer + ? ChatProgress.Answer + : event === StreamEvents.ChatAudioTranscribed + ? ChatProgress.Transcribe + : (event as ChatProgress); + processChatEvent(chatEvent, data, chatEventQueue, items, options.callbacks.onNewMessage); + + if (chatEvent === ChatProgress.Answer) { analytics.track('agent-message-received', { messages: items.messages.length, mode: items.chatMode, diff --git a/src/services/streaming-manager/common.ts b/src/services/streaming-manager/common.ts index 434c79ab..d0ceb9cd 100644 --- a/src/services/streaming-manager/common.ts +++ b/src/services/streaming-manager/common.ts @@ -32,6 +32,21 @@ export type StreamingManager; + /** + * Publish a microphone stream to the DataChannel + * Can be called after connection to add microphone input + * @param stream The MediaStream containing the microphone audio track + * supported only for livekit manager + */ + publishMicrophoneStream?(stream: MediaStream): Promise; + + /** + * Unpublish the currently published microphone stream + * Can be called after connection to remove microphone input + * supported only for livekit manager + */ + unpublishMicrophoneStream?(): Promise; + /** * Session identifier information, should be returned in the body of all streaming requests */ diff --git a/src/services/streaming-manager/factory.test.ts b/src/services/streaming-manager/factory.test.ts index e9c557af..6211ce75 100644 --- a/src/services/streaming-manager/factory.test.ts +++ b/src/services/streaming-manager/factory.test.ts @@ -100,4 +100,39 @@ describe('createStreamingManager', () => { expect(mockCreateLiveKitStreamingManager).toHaveBeenCalledWith(agent.id, v2StreamOptions, mockOptions); expect(mockCreateWebRTCStreamingManager).not.toHaveBeenCalled(); }); + + it('passes microphoneStream to createLiveKitStreamingManager when provided', async () => { + const agent = AgentFactory.build({ + presenter: { + type: 'expressive', + voice: { + type: Providers.Microsoft, + voice_id: 'voice-123', + }, + }, + }); + + const v2StreamOptions: CreateSessionV2Options = { + transport_provider: TransportProvider.Livekit, + chat_persist: true, + }; + + const mockMicrophoneStream = new MediaStream(); + const optionsWithMicrophone = { + ...mockOptions, + microphoneStream: mockMicrophoneStream, + }; + + await createStreamingManager( + agent, + { version: StreamApiVersion.V2, ...v2StreamOptions }, + optionsWithMicrophone + ); + + expect(mockCreateLiveKitStreamingManager).toHaveBeenCalledWith( + agent.id, + v2StreamOptions, + expect.objectContaining({ microphoneStream: mockMicrophoneStream }) + ); + }); }); diff --git a/src/services/streaming-manager/livekit-manager.test.ts b/src/services/streaming-manager/livekit-manager.test.ts new file mode 100644 index 00000000..82009e85 --- /dev/null +++ b/src/services/streaming-manager/livekit-manager.test.ts @@ -0,0 +1,310 @@ +import { StreamingManagerOptionsFactory } from '../../test-utils/factories'; +import { CreateSessionV2Options, StreamingManagerOptions } from '../../types/index'; +import { createLiveKitStreamingManager } from './livekit-manager'; + +// Mock livekit-client +const mockPublishTrack = jest.fn(); +const mockUnpublishTrack = jest.fn(); +const mockLocalParticipant = { + publishTrack: mockPublishTrack, + unpublishTrack: mockUnpublishTrack, + sendText: jest.fn(), + audioTrackPublications: new Map(), +}; + +const mockRoom = { + on: jest.fn().mockReturnThis(), + connect: jest.fn().mockResolvedValue(undefined), + prepareConnection: jest.fn().mockResolvedValue(undefined), + disconnect: jest.fn().mockResolvedValue(undefined), + localParticipant: mockLocalParticipant, +}; + +const mockRoomConstructor = jest.fn().mockImplementation(() => mockRoom); + +const mockTrack = { + Source: { + Microphone: 'microphone', + }, +}; + +jest.mock('livekit-client', () => ({ + Room: mockRoomConstructor, + RoomEvent: { + ConnectionStateChanged: 'ConnectionStateChanged', + ConnectionQualityChanged: 'ConnectionQualityChanged', + ActiveSpeakersChanged: 'ActiveSpeakersChanged', + ParticipantConnected: 'ParticipantConnected', + TrackSubscribed: 'TrackSubscribed', + TrackUnsubscribed: 'TrackUnsubscribed', + DataReceived: 'DataReceived', + MediaDevicesError: 'MediaDevicesError', + EncryptionError: 'EncryptionError', + TrackSubscriptionFailed: 'TrackSubscriptionFailed', + }, + ConnectionState: { + Connecting: 'connecting', + Connected: 'connected', + Disconnected: 'disconnected', + Reconnecting: 'reconnecting', + SignalReconnecting: 'signalReconnecting', + }, + Track: mockTrack, +})); + +// Mock createStreamApiV2 +const mockCreateStream = jest.fn().mockResolvedValue({ + id: 'session-123', + session_token: 'token-123', + session_url: 'wss://test.livekit.cloud', +}); + +jest.mock('../../api/streams/streamsApiV2', () => ({ + createStreamApiV2: jest.fn(() => ({ + createStream: mockCreateStream, + })), +})); + +jest.mock('../../config/environment', () => ({ didApiUrl: 'http://test-api.com' })); + +// Test constants +const TEST_AGENT_ID = 'agent123'; +const TEST_TRACK_SID = 'track-sid-123'; +const TEST_AUDIO_TRACK_ID = 'audio-track-1'; +const TEST_AUDIO_TRACK_ID_2 = 'audio-track-2'; +const TEST_SESSION_ID = 'session-123'; +const ASYNC_WAIT_TIME = 10; + +// Helper functions to create mock objects +function createMockAudioTrack(id: string = TEST_AUDIO_TRACK_ID, additionalProps: any = {}) { + return { + kind: 'audio', + id, + enabled: true, + stop: jest.fn(), + ...additionalProps, + } as any; +} + +function createMockTrack(id: string = TEST_AUDIO_TRACK_ID, mediaStreamTrack?: MediaStreamTrack) { + return { + kind: 'audio', + id, + mediaStreamTrack: mediaStreamTrack || createMockAudioTrack(id), + } as any; +} + +function createMockPublication(trackId: string = TEST_AUDIO_TRACK_ID, trackSid: string = TEST_TRACK_SID) { + const mockTrack = createMockTrack(trackId); + return { + trackSid, + track: mockTrack, + source: 'microphone', + mediaStreamTrack: createMockAudioTrack(trackId), + } as any; +} + +function createMockStream(audioTracks: any[] = [createMockAudioTrack()]) { + const stream = new MediaStream(audioTracks); + (stream as any).getAudioTracks = jest.fn(() => audioTracks); + (stream as any).getTracks = jest.fn(() => audioTracks); + return stream; +} + +function getConnectionStateHandler(index?: number) { + const calls = mockRoom.on.mock.calls.filter((call: any[]) => call[0] === 'ConnectionStateChanged'); + if (index !== undefined && calls[index]) { + return calls[index][1]; + } + return calls.length > 0 ? calls[calls.length - 1][1] : undefined; +} + +async function simulateConnection(handlerIndex?: number) { + const handler = getConnectionStateHandler(handlerIndex); + if (handler) { + handler('connected'); + } + await new Promise(resolve => setTimeout(resolve, ASYNC_WAIT_TIME)); +} + +describe('LiveKit Streaming Manager - Microphone Stream', () => { + let agentId: string; + let sessionOptions: CreateSessionV2Options; + let options: StreamingManagerOptions; + + beforeEach(() => { + jest.clearAllMocks(); + agentId = TEST_AGENT_ID; + sessionOptions = { + chat_persist: true, + transport_provider: 'livekit' as any, + }; + options = StreamingManagerOptionsFactory.build(); + }); + + describe('Microphone Stream Publishing', () => { + it('should publish microphone track using publishMicrophoneStream method', async () => { + const mockAudioTrack = createMockAudioTrack(); + const mockStream = createMockStream([mockAudioTrack]); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + await manager.publishMicrophoneStream?.(mockStream); + + expect(mockPublishTrack).toHaveBeenCalledWith(mockAudioTrack, { + source: 'microphone', + }); + }); + + it('should not publish microphone track when publishMicrophoneStream is not called', async () => { + await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + expect(mockPublishTrack).not.toHaveBeenCalled(); + }); + + it('should extract first audio track from MediaStream', async () => { + const mockAudioTrack1 = createMockAudioTrack(TEST_AUDIO_TRACK_ID, { enabled: false }); + const mockAudioTrack2 = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + const mockStream = createMockStream([mockAudioTrack1, mockAudioTrack2]); + const mockPublication = createMockPublication(TEST_AUDIO_TRACK_ID); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + await manager.publishMicrophoneStream?.(mockStream); + + expect(mockPublishTrack).toHaveBeenCalledWith(mockAudioTrack1, { + source: 'microphone', + }); + expect(mockPublishTrack).toHaveBeenCalledTimes(1); + }); + }); + + describe('Error Handling', () => { + it('should throw error on publish failure', async () => { + const mockStream = createMockStream(); + const publishError = new Error('Failed to publish track'); + mockPublishTrack.mockRejectedValue(publishError); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + await expect(manager.publishMicrophoneStream?.(mockStream)).rejects.toThrow('Failed to publish track'); + }); + }); + + describe('Integration and Lifecycle', () => { + it('should publish track after room connects using publishMicrophoneStream', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + + expect(mockPublishTrack).not.toHaveBeenCalled(); + + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + + expect(mockPublishTrack).toHaveBeenCalled(); + }); + + it('should unpublish track on disconnect', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + mockUnpublishTrack.mockResolvedValue(undefined); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + await manager.disconnect(); + + expect(mockUnpublishTrack).toHaveBeenCalledWith(mockPublication.track); + }); + + it('should handle track publication lifecycle correctly', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + mockUnpublishTrack.mockResolvedValue(undefined); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + expect(mockPublishTrack).toHaveBeenCalledTimes(1); + + await manager.disconnect(); + expect(mockUnpublishTrack).toHaveBeenCalledTimes(1); + + await manager.disconnect(); + expect(mockUnpublishTrack).toHaveBeenCalledTimes(1); + }); + + it('should handle multiple connection/disconnect cycles', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + mockUnpublishTrack.mockResolvedValue(undefined); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(0); + await manager.publishMicrophoneStream?.(mockStream); + await manager.disconnect(); + + const manager2 = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(1); + await manager2.publishMicrophoneStream?.(mockStream); + await manager2.disconnect(); + + expect(mockPublishTrack).toHaveBeenCalledTimes(2); + }); + + it('should handle stream ending while published', async () => { + const mockAudioTrack = createMockAudioTrack(); + const mockStream = createMockStream([mockAudioTrack]); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + + mockAudioTrack.stop(); + + await expect(manager.disconnect()).resolves.not.toThrow(); + }); + + it('should allow publishing microphone stream after connection', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + // Initially no stream published + expect(mockPublishTrack).not.toHaveBeenCalled(); + + // Publish stream after connection + await manager.publishMicrophoneStream?.(mockStream); + + expect(mockPublishTrack).toHaveBeenCalledWith(mockStream.getAudioTracks()[0], { source: 'microphone' }); + }); + + it('should throw error when publishing stream before connection', async () => { + const mockStream = createMockStream(); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + + // Try to publish before connection + await expect(manager.publishMicrophoneStream?.(mockStream)).rejects.toThrow('Room is not connected'); + }); + }); +}); diff --git a/src/services/streaming-manager/livekit-manager.ts b/src/services/streaming-manager/livekit-manager.ts index 904e9f98..e3f6d7d4 100644 --- a/src/services/streaming-manager/livekit-manager.ts +++ b/src/services/streaming-manager/livekit-manager.ts @@ -19,12 +19,14 @@ import { createStreamingLogger, StreamingManager } from './common'; import type { ConnectionQuality, ConnectionState as LiveKitConnectionState, + LocalTrackPublication, Participant, RemoteParticipant, RemoteTrack, Room, RoomEvent, SubscriptionError, + Track, } from 'livekit-client'; async function importLiveKit(): Promise<{ @@ -33,6 +35,7 @@ async function importLiveKit(): Promise<{ ConnectionState: typeof LiveKitConnectionState; RemoteParticipant: typeof RemoteParticipant; RemoteTrack: typeof RemoteTrack; + Track: typeof Track; }> { try { return await import('livekit-client'); @@ -83,12 +86,13 @@ export async function createLiveKitStreamingManager { @@ -264,6 +269,12 @@ export async function createLiveKitStreamingManager { + if (!room) return null; + + const { Track } = await importLiveKit(); + const publishedTracks = room.localParticipant.audioTrackPublications; + + if (publishedTracks) { + for (const [_, publication] of publishedTracks) { + if (publication.source === Track.Source.Microphone && publication.track) { + const publishedTrack = publication.track; + const publishedMediaTrack = publishedTrack.mediaStreamTrack; + if ( + publishedMediaTrack === audioTrack || + (publishedMediaTrack && publishedMediaTrack.id === audioTrack.id) + ) { + return publication as LocalTrackPublication; + } + } + } + } + + return null; + } + + function hasDifferentMicrophoneTrackPublished(audioTrack: MediaStreamTrack): boolean { + if (!microphonePublication || !microphonePublication.track) { + return false; + } + + const publishedMediaTrack = microphonePublication.track.mediaStreamTrack; + return publishedMediaTrack !== audioTrack && publishedMediaTrack?.id !== audioTrack.id; + } + + async function publishMicrophoneStream(stream: MediaStream): Promise { + if (!isConnected || !room) { + log('Room is not connected, cannot publish microphone stream'); + throw new Error('Room is not connected'); + } + + const audioTracks = stream.getAudioTracks(); + if (audioTracks.length === 0) { + log('No audio track found in the provided MediaStream'); + return; + } + + const audioTrack = audioTracks[0]; + const { Track } = await importLiveKit(); + + const existingPublication = await findPublishedMicrophoneTrack(audioTrack); + if (existingPublication) { + log('Microphone track is already published, skipping', { + trackId: audioTrack.id, + publishedTrackId: existingPublication.track?.mediaStreamTrack?.id, + }); + microphonePublication = existingPublication; + return; + } + + if (hasDifferentMicrophoneTrackPublished(audioTrack)) { + log('Unpublishing existing microphone track before publishing new one'); + await unpublishMicrophoneStream(); + } + + log('Publishing microphone track from provided MediaStream', { trackId: audioTrack.id }); + + try { + microphonePublication = await room.localParticipant.publishTrack(audioTrack, { + source: Track.Source.Microphone, + }); + log('Microphone track published successfully', { trackSid: microphonePublication.trackSid }); + } catch (error) { + log('Failed to publish microphone track:', error); + throw error; + } + } + + async function unpublishMicrophoneStream(): Promise { + if (!microphonePublication || !microphonePublication.track) { + return; + } + + try { + if (room) { + await room.localParticipant.unpublishTrack(microphonePublication.track); + log('Microphone track unpublished'); + } + } catch (error) { + log('Error unpublishing microphone track:', error); + } finally { + microphonePublication = null; + } + } + function cleanMediaStream(): void { if (sharedMediaStream) { sharedMediaStream.getTracks().forEach(track => track.stop()); @@ -321,6 +425,7 @@ export async function createLiveKitStreamingManager Promise; + /** + * Publish a microphone stream to the data channel + * Can be called after connection to add microphone input + * @param stream The MediaStream containing the microphone audio track + * supported only for livekit manager + */ + publishMicrophoneStream?: (stream: MediaStream) => Promise; + /** + * Unpublish the currently published microphone stream + * Can be called after connection to remove microphone input + * supported only for livekit manager + */ + unpublishMicrophoneStream?: () => Promise; /** * Method to send a chat message to existing chat with the agent * @param messages diff --git a/src/types/stream/stream.ts b/src/types/stream/stream.ts index 72d6400a..74a9f807 100644 --- a/src/types/stream/stream.ts +++ b/src/types/stream/stream.ts @@ -26,6 +26,7 @@ export enum AgentActivityState { export enum StreamEvents { ChatAnswer = 'chat/answer', ChatPartial = 'chat/partial', + ChatAudioTranscribed = 'chat/audio-transcribed', StreamDone = 'stream/done', StreamStarted = 'stream/started', StreamFailed = 'stream/error', @@ -118,6 +119,12 @@ export interface StreamingManagerOptions { debug?: boolean; auth: Auth; analytics: Analytics; + /** + * Optional MediaStream to use for microphone input. + * If provided, the audio track from this stream will be published to the data channel. + * Supported by LiveKit streaming managers. + */ + microphoneStream?: MediaStream; } export interface SlimRTCStatsReport { From f7b0b90bde6fdd389cc3ceb07a4619c8ba8adbd2 Mon Sep 17 00:00:00 2001 From: ReutAtias3 <141619499+ReutAtias3@users.noreply.github.com> Date: Thu, 8 Jan 2026 15:40:47 +0200 Subject: [PATCH 7/8] Update agent manager options to make 'mode' optional with a default value of ChatMode.Functional (#261) * Update agent manager options to make 'mode' optional with a default value of ChatMode.Functional * createAgentManager add default * createAgentManager add default --- demo/hooks/useAgentManager.ts | 4 ++-- src/services/agent-manager/connect-to-manager.ts | 2 +- src/services/agent-manager/index.ts | 12 +++++++----- src/types/entities/agents/manager.ts | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/demo/hooks/useAgentManager.ts b/demo/hooks/useAgentManager.ts index 11338770..d3011e0d 100644 --- a/demo/hooks/useAgentManager.ts +++ b/demo/hooks/useAgentManager.ts @@ -15,7 +15,7 @@ interface UseAgentManagerOptions { agentId: string; baseURL: string; wsURL: string; - mode: ChatMode; + mode?: ChatMode; auth: Auth; streamOptions?: { streamWarmup?: boolean; @@ -35,7 +35,7 @@ export function useAgentManager(props: UseAgentManagerOptions) { agentId, baseURL, wsURL, - mode, + mode = ChatMode.Functional, auth, enableAnalytics, externalId, diff --git a/src/services/agent-manager/connect-to-manager.ts b/src/services/agent-manager/connect-to-manager.ts index 3864113a..d2e0a9ee 100644 --- a/src/services/agent-manager/connect-to-manager.ts +++ b/src/services/agent-manager/connect-to-manager.ts @@ -318,7 +318,7 @@ export async function initializeStreamAndChat( const { chatResult, streamingManager } = await resolveStreamAndChat(); const { chat: newChat, chatMode } = chatResult; - if (chatMode && chatMode !== options.mode) { + if (chatMode && options.mode !== undefined && chatMode !== options.mode) { options.mode = chatMode; options.callbacks.onModeChange?.(chatMode); diff --git a/src/services/agent-manager/index.ts b/src/services/agent-manager/index.ts index f5eea324..30b8d028 100644 --- a/src/services/agent-manager/index.ts +++ b/src/services/agent-manager/index.ts @@ -59,10 +59,11 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt const mxKey = options.mixpanelKey || mixpanelKey; const wsURL = options.wsURL || didSocketApiUrl; const baseURL = options.baseURL || didApiUrl; + const mode = options.mode || ChatMode.Functional; const items: AgentManagerItems = { messages: [], - chatMode: options.mode || ChatMode.Functional, + chatMode: mode, }; const analytics = initializeAnalytics({ token: mxKey, @@ -112,7 +113,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt } const websocketPromise = - options.mode === ChatMode.DirectPlayback || isStreamsV2 + mode === ChatMode.DirectPlayback || isStreamsV2 ? Promise.resolve(undefined) : createSocketManager( options.auth, @@ -127,6 +128,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt agentEntity, { ...options, + mode, callbacks: { ...options.callbacks, onVideoIdChange: updateVideoId, onMessage }, }, agentsApi, @@ -167,7 +169,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt mode: items.chatMode, }); - changeMode(chat?.chat_mode ?? options.mode ?? ChatMode.Functional); + changeMode(chat?.chat_mode ?? mode); } async function disconnect() { @@ -241,8 +243,8 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt }, async chat(userMessage: string) { const validateChatRequest = () => { - if (isChatModeWithoutChat(options.mode)) { - throw new ValidationError(`${options.mode} is enabled, chat is disabled`); + if (isChatModeWithoutChat(mode)) { + throw new ValidationError(`${mode} is enabled, chat is disabled`); } else if (userMessage.length >= 800) { throw new ValidationError('Message cannot be more than 800 characters'); } else if (userMessage.length === 0) { diff --git a/src/types/entities/agents/manager.ts b/src/types/entities/agents/manager.ts index a2058911..8a5ffd8b 100644 --- a/src/types/entities/agents/manager.ts +++ b/src/types/entities/agents/manager.ts @@ -148,7 +148,7 @@ interface StreamOptions { export interface AgentManagerOptions { auth: Auth; callbacks: ManagerCallbacks; - mode: ChatMode; + mode?: ChatMode; baseURL?: string; wsURL?: string; debug?: boolean; From bf2ee50a3e74a6d8b4f240c0561d267c3756e8b8 Mon Sep 17 00:00:00 2001 From: Arik Sfaradi Date: Sun, 11 Jan 2026 10:05:26 +0200 Subject: [PATCH 8/8] bump (#263) --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 5885f5e2..7f971e63 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@d-id/client-sdk", "private": false, - "version": "1.1.22", + "version": "1.1.23", "type": "module", "description": "d-id client sdk", "repository": {