diff --git a/demo/app.tsx b/demo/app.tsx index 6aa04403..aa2b19fc 100644 --- a/demo/app.tsx +++ b/demo/app.tsx @@ -1,5 +1,5 @@ import { ChatMode, ConnectionState } from '@sdk/types'; -import { useEffect, useRef, useState } from 'preact/hooks'; +import { useCallback, useEffect, useRef, useState } from 'preact/hooks'; import './app.css'; import { agentId, clientKey, debug, didApiUrl, didSocketApiUrl } from './environment'; @@ -14,35 +14,179 @@ export function App() { const [sessionTimeout, setSessionTimeout] = useState(); const [compatibilityMode, setCompatibilityMode] = useState<'on' | 'off' | 'auto'>(); const [fluent, setFluent] = useState(false); + const [enableMicrophone, setEnableMicrophone] = useState(true); + const [microphoneStream, setMicrophoneStream] = useState(undefined); + const microphoneStreamRef = useRef(undefined); + const [audioInputDevices, setAudioInputDevices] = useState([]); + const [selectedAudioDeviceId, setSelectedAudioDeviceId] = useState(''); const videoRef = useRef(null); - const { srcObject, connectionState, messages, isSpeaking, connect, disconnect, speak, chat, interrupt } = - useAgentManager({ - debug, - agentId, - baseURL: didApiUrl, - wsURL: didSocketApiUrl, - mode, - enableAnalytics: false, - auth: { type: 'key', clientKey }, - streamOptions: { streamWarmup: warmup, sessionTimeout, compatibilityMode, fluent }, - }); + const { + srcObject, + connectionState, + messages, + isSpeaking, + connect, + disconnect, + speak, + chat, + interrupt, + publishMicrophoneStream, + unpublishMicrophoneStream, + microphoneEnabled, + isMicrophonePublished, + } = useAgentManager({ + debug, + agentId, + baseURL: didApiUrl, + wsURL: didSocketApiUrl, + mode, + enableAnalytics: false, + auth: { type: 'key', clientKey }, + streamOptions: { streamWarmup: warmup, sessionTimeout, compatibilityMode, fluent }, + }); + + const cleanupMicrophoneStream = useCallback(() => { + if (microphoneStreamRef.current) { + microphoneStreamRef.current.getTracks().forEach(track => track.stop()); + microphoneStreamRef.current = undefined; + setMicrophoneStream(undefined); + } + }, []); + + const updateAudioDevices = useCallback(async () => { + try { + await navigator.mediaDevices.getUserMedia({ audio: true }); + const devices = await navigator.mediaDevices.enumerateDevices(); + const audioInputs = devices.filter(device => device.kind === 'audioinput'); + + const realDevices = audioInputs.filter( + device => !device.label.toLowerCase().includes('blackhole') && + !device.label.toLowerCase().includes('virtual') + ); + + const devicesToShow = realDevices.length > 0 ? realDevices : audioInputs; + + setAudioInputDevices(devicesToShow); + + if (devicesToShow.length > 0 && !selectedAudioDeviceId) { + setSelectedAudioDeviceId(devicesToShow[0].deviceId); + } + } catch (error) { + console.error('Failed to enumerate audio devices:', error); + } + }, [selectedAudioDeviceId]); + + const handleMicrophoneToggle = useCallback( + async (enabled: boolean) => { + if (connectionState !== ConnectionState.Connected) { + setEnableMicrophone(enabled); + return; + } + + if (enabled) { + if (!microphoneStreamRef.current) { + try { + const audioConstraints: MediaStreamConstraints['audio'] = selectedAudioDeviceId + ? { deviceId: { exact: selectedAudioDeviceId } } + : true; + const stream = await navigator.mediaDevices.getUserMedia({ audio: audioConstraints }); + setMicrophoneStream(stream); + microphoneStreamRef.current = stream; + } catch (error) { + console.error('Failed to get microphone access:', error); + alert('Failed to access microphone. Please check permissions.'); + return; + } + } + + if (microphoneStreamRef.current && publishMicrophoneStream) { + try { + await publishMicrophoneStream(microphoneStreamRef.current); + setEnableMicrophone(true); + } catch (error) { + console.error('Failed to publish microphone stream:', error); + } + } + } else { + if (unpublishMicrophoneStream) { + try { + await unpublishMicrophoneStream(); + setEnableMicrophone(false); + } catch (error) { + console.error('Failed to unpublish microphone stream:', error); + } + } + } + }, + [connectionState, selectedAudioDeviceId, publishMicrophoneStream, unpublishMicrophoneStream] + ); async function onClick() { if (connectionState === ConnectionState.New || connectionState === ConnectionState.Fail) { + if (enableMicrophone && !microphoneStreamRef.current) { + try { + const audioConstraints: MediaStreamConstraints['audio'] = selectedAudioDeviceId + ? { deviceId: { exact: selectedAudioDeviceId } } + : true; + + const stream = await navigator.mediaDevices.getUserMedia({ audio: audioConstraints }); + setMicrophoneStream(stream); + microphoneStreamRef.current = stream; + } catch (error) { + console.error('Failed to get microphone access:', error); + alert('Failed to access microphone. Please check permissions.'); + return; + } + } + await connect(); } else if (connectionState === ConnectionState.Connected && text) { await speak(text); } } + useEffect(() => { + return cleanupMicrophoneStream; + }, [cleanupMicrophoneStream]); + + useEffect(() => { + if (!enableMicrophone && microphoneStreamRef.current) { + cleanupMicrophoneStream(); + } + }, [enableMicrophone, cleanupMicrophoneStream]); + + useEffect(() => { + if (enableMicrophone) { + updateAudioDevices(); + } + }, [enableMicrophone, updateAudioDevices]); + useEffect(() => { if (srcObject && videoRef.current) { videoRef.current.srcObject = srcObject; } }, [srcObject]); + useEffect(() => { + if ( + connectionState === ConnectionState.Connected && + enableMicrophone && + microphoneEnabled && + publishMicrophoneStream && + microphoneStreamRef.current && + !isMicrophonePublished + ) { + const stream = microphoneStreamRef.current; + publishMicrophoneStream(stream).catch(error => { + if (error) { + console.error('Failed to publish microphone stream:', error); + } + }); + } + }, [connectionState, enableMicrophone, microphoneEnabled, publishMicrophoneStream, isMicrophonePublished]); + return (
@@ -105,7 +249,37 @@ export function App() { /> Fluent + + {microphoneEnabled && ( + + )}
+ {microphoneEnabled && enableMicrophone && audioInputDevices.length > 0 && ( +
+ +
+ )} diff --git a/demo/hooks/useAgentManager.ts b/demo/hooks/useAgentManager.ts index 3696d393..d3011e0d 100644 --- a/demo/hooks/useAgentManager.ts +++ b/demo/hooks/useAgentManager.ts @@ -9,13 +9,13 @@ import { StreamType, StreamingState, } from '@sdk/types'; -import { useCallback, useEffect, useState } from 'preact/hooks'; +import { useCallback, useEffect, useMemo, useState } from 'preact/hooks'; interface UseAgentManagerOptions { agentId: string; baseURL: string; wsURL: string; - mode: ChatMode; + mode?: ChatMode; auth: Auth; streamOptions?: { streamWarmup?: boolean; @@ -35,7 +35,7 @@ export function useAgentManager(props: UseAgentManagerOptions) { agentId, baseURL, wsURL, - mode, + mode = ChatMode.Functional, auth, enableAnalytics, externalId, @@ -52,6 +52,7 @@ export function useAgentManager(props: UseAgentManagerOptions) { const [srcObject, setSrcObject] = useState(null); const [agentManager, setAgentManager] = useState(null); const [connectionState, setConnectionState] = useState(ConnectionState.New); + const [isMicrophonePublished, setIsMicrophonePublished] = useState(false); const streamType = agentManager?.getStreamType(); useEffect(() => { @@ -183,6 +184,40 @@ export function useAgentManager(props: UseAgentManagerOptions) { } }, [agentManager, connectionState]); + const publishMicrophoneStream = useCallback( + async (stream: MediaStream) => { + if (!agentManager) { + console.warn('Agent manager is not initialized yet. Will retry when ready.'); + return; + } + if (!agentManager.publishMicrophoneStream) { + throw new Error('publishMicrophoneStream is not available for this streaming manager'); + } + await agentManager.publishMicrophoneStream(stream); + setIsMicrophonePublished(true); + }, + [agentManager] + ); + + const unpublishMicrophoneStream = useCallback(async () => { + if (!agentManager) { + console.warn('Agent manager is not initialized yet.'); + return; + } + if (!agentManager.unpublishMicrophoneStream) { + throw new Error('unpublishMicrophoneStream is not available for this streaming manager'); + } + await agentManager.unpublishMicrophoneStream(); + setIsMicrophonePublished(false); + }, [agentManager]); + + const microphoneEnabled = useMemo(() => { + return ( + agentManager?.agent?.presenter?.type === 'expressive' && + typeof agentManager?.publishMicrophoneStream === 'function' + ); + }, [agentManager]); + return { connectionState, messages, @@ -193,5 +228,9 @@ export function useAgentManager(props: UseAgentManagerOptions) { speak, chat, interrupt, + publishMicrophoneStream, + unpublishMicrophoneStream, + microphoneEnabled, + isMicrophonePublished, }; } diff --git a/package.json b/package.json index 5885f5e2..7f971e63 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@d-id/client-sdk", "private": false, - "version": "1.1.22", + "version": "1.1.23", "type": "module", "description": "d-id client sdk", "repository": { diff --git a/src/services/agent-manager/connect-to-manager.test.ts b/src/services/agent-manager/connect-to-manager.test.ts index bc94124d..3c241217 100644 --- a/src/services/agent-manager/connect-to-manager.test.ts +++ b/src/services/agent-manager/connect-to-manager.test.ts @@ -28,6 +28,7 @@ jest.mock('../../config/environment', () => ({ jest.mock('../analytics/timestamp-tracker', () => ({ latencyTimestampTracker: { reset: jest.fn(), update: jest.fn(), get: jest.fn(() => 1000) }, interruptTimestampTracker: { reset: jest.fn(), update: jest.fn(), get: jest.fn(() => 500) }, + streamReadyTimestampTracker: { reset: jest.fn(), update: jest.fn(), get: jest.fn(() => 1500) }, })); describe('connect-to-manager', () => { @@ -236,6 +237,7 @@ describe('connect-to-manager', () => { let onConnectionStateChange: (state: ConnectionState) => void; let onVideoStateChange: (state: StreamingState, statsReport?: any) => void; let onAgentActivityStateChange: (state: AgentActivityState) => void; + let onStreamReady: (() => void) | undefined; beforeEach(async () => { // Initialize callbacks to avoid undefined errors @@ -247,6 +249,7 @@ describe('connect-to-manager', () => { onConnectionStateChange = options.callbacks.onConnectionStateChange; onVideoStateChange = options.callbacks.onVideoStateChange; onAgentActivityStateChange = options.callbacks.onAgentActivityStateChange; + onStreamReady = options.callbacks.onStreamReady; return new Promise(resolve => { setTimeout(() => { @@ -370,6 +373,20 @@ describe('connect-to-manager', () => { ); }); }); + + describe('onStreamReady', () => { + it('should track mixpanel event with latency when stream is ready', () => { + const { streamReadyTimestampTracker } = require('../analytics/timestamp-tracker'); + streamReadyTimestampTracker.get.mockReturnValue(2000); + + onStreamReady?.(); + + expect(mockAnalytics.track).toHaveBeenCalledWith('agent-chat', { + event: 'ready', + latency: 2000, + }); + }); + }); }); describe('Stream Options Mapping', () => { diff --git a/src/services/agent-manager/connect-to-manager.ts b/src/services/agent-manager/connect-to-manager.ts index 2e443452..d2e0a9ee 100644 --- a/src/services/agent-manager/connect-to-manager.ts +++ b/src/services/agent-manager/connect-to-manager.ts @@ -23,7 +23,11 @@ import { } from '@sdk/types'; import { isStreamsV2Agent } from '@sdk/utils/agent'; import { Analytics } from '../analytics/mixpanel'; -import { interruptTimestampTracker, latencyTimestampTracker } from '../analytics/timestamp-tracker'; +import { + interruptTimestampTracker, + latencyTimestampTracker, + streamReadyTimestampTracker, +} from '../analytics/timestamp-tracker'; import { createChat } from '../chat'; const ChatPrefix = 'cht'; @@ -172,15 +176,21 @@ function connectToManager( signal?: AbortSignal ): Promise> { latencyTimestampTracker.reset(); + streamReadyTimestampTracker.update(); return new Promise(async (resolve, reject) => { try { let streamingManager: StreamingManager; let shouldResolveOnComplete = false; + const streamOptions = getAgentStreamOptions(agent, options); + + analytics.enrich({ + 'stream-version': streamOptions.version.toString(), + }); streamingManager = await createStreamingManager( agent, - getAgentStreamOptions(agent, options), + streamOptions, { ...options, analytics, @@ -226,6 +236,10 @@ function connectToManager( streamingManager.streamType ); }, + onStreamReady: () => { + const readyLatency = streamReadyTimestampTracker.get(true); + analytics.track('agent-chat', { event: 'ready', latency: readyLatency }); + }, }, }, signal @@ -304,7 +318,7 @@ export async function initializeStreamAndChat( const { chatResult, streamingManager } = await resolveStreamAndChat(); const { chat: newChat, chatMode } = chatResult; - if (chatMode && chatMode !== options.mode) { + if (chatMode && options.mode !== undefined && chatMode !== options.mode) { options.mode = chatMode; options.callbacks.onModeChange?.(chatMode); diff --git a/src/services/agent-manager/index.test.ts b/src/services/agent-manager/index.test.ts index 430e0f28..14614e56 100644 --- a/src/services/agent-manager/index.test.ts +++ b/src/services/agent-manager/index.test.ts @@ -698,6 +698,59 @@ describe('createAgentManager', () => { }); }); + describe('publishMicrophoneStream', () => { + let manager: AgentManager; + + beforeEach(async () => { + manager = await createAgentManager('agent-123', mockOptions); + await manager.connect(); + }); + + it('should publish microphone stream when available', async () => { + const mockStream = new MediaStream(); + const mockPublish = jest.fn().mockResolvedValue(undefined); + mockStreamingManager.publishMicrophoneStream = mockPublish; + + await manager.publishMicrophoneStream?.(mockStream); + + expect(mockPublish).toHaveBeenCalledWith(mockStream); + }); + + it('should throw error when publishMicrophoneStream is not available', async () => { + mockStreamingManager.publishMicrophoneStream = undefined; + + await expect(manager.publishMicrophoneStream?.(new MediaStream())).rejects.toThrow( + 'publishMicrophoneStream is not available for this streaming manager' + ); + }); + }); + + describe('unpublishMicrophoneStream', () => { + let manager: AgentManager; + + beforeEach(async () => { + manager = await createAgentManager('agent-123', mockOptions); + await manager.connect(); + }); + + it('should unpublish microphone stream when available', async () => { + const mockUnpublish = jest.fn().mockResolvedValue(undefined); + mockStreamingManager.unpublishMicrophoneStream = mockUnpublish; + + await manager.unpublishMicrophoneStream?.(); + + expect(mockUnpublish).toHaveBeenCalled(); + }); + + it('should throw error when unpublishMicrophoneStream is not available', async () => { + mockStreamingManager.unpublishMicrophoneStream = undefined; + + await expect(manager.unpublishMicrophoneStream?.()).rejects.toThrow( + 'unpublishMicrophoneStream is not available for this streaming manager' + ); + }); + }); + describe('DirectPlayback mode', () => { it('should not create socket manager in DirectPlayback mode', async () => { const directPlaybackOptions = { ...mockOptions, mode: ChatMode.DirectPlayback }; diff --git a/src/services/agent-manager/index.ts b/src/services/agent-manager/index.ts index 8fbd1989..30b8d028 100644 --- a/src/services/agent-manager/index.ts +++ b/src/services/agent-manager/index.ts @@ -59,10 +59,11 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt const mxKey = options.mixpanelKey || mixpanelKey; const wsURL = options.wsURL || didSocketApiUrl; const baseURL = options.baseURL || didApiUrl; + const mode = options.mode || ChatMode.Functional; const items: AgentManagerItems = { messages: [], - chatMode: options.mode || ChatMode.Functional, + chatMode: mode, }; const analytics = initializeAnalytics({ token: mxKey, @@ -112,7 +113,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt } const websocketPromise = - options.mode === ChatMode.DirectPlayback || isStreamsV2 + mode === ChatMode.DirectPlayback || isStreamsV2 ? Promise.resolve(undefined) : createSocketManager( options.auth, @@ -127,6 +128,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt agentEntity, { ...options, + mode, callbacks: { ...options.callbacks, onVideoIdChange: updateVideoId, onMessage }, }, agentsApi, @@ -167,7 +169,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt mode: items.chatMode, }); - changeMode(chat?.chat_mode ?? options.mode ?? ChatMode.Functional); + changeMode(chat?.chat_mode ?? mode); } async function disconnect() { @@ -227,10 +229,22 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt mode: items.chatMode, }); }, + async publishMicrophoneStream(stream: MediaStream) { + if (!items.streamingManager?.publishMicrophoneStream) { + throw new Error('publishMicrophoneStream is not available for this streaming manager'); + } + return items.streamingManager.publishMicrophoneStream(stream); + }, + async unpublishMicrophoneStream() { + if (!items.streamingManager?.unpublishMicrophoneStream) { + throw new Error('unpublishMicrophoneStream is not available for this streaming manager'); + } + return items.streamingManager.unpublishMicrophoneStream(); + }, async chat(userMessage: string) { const validateChatRequest = () => { - if (isChatModeWithoutChat(options.mode)) { - throw new ValidationError(`${options.mode} is enabled, chat is disabled`); + if (isChatModeWithoutChat(mode)) { + throw new ValidationError(`${mode} is enabled, chat is disabled`); } else if (userMessage.length >= 800) { throw new ValidationError('Message cannot be more than 800 characters'); } else if (userMessage.length === 0) { diff --git a/src/services/analytics/timestamp-tracker.ts b/src/services/analytics/timestamp-tracker.ts index c3b217f5..381a89c0 100644 --- a/src/services/analytics/timestamp-tracker.ts +++ b/src/services/analytics/timestamp-tracker.ts @@ -10,3 +10,4 @@ function createTimestampTracker() { export const latencyTimestampTracker = createTimestampTracker(); export const interruptTimestampTracker = createTimestampTracker(); +export const streamReadyTimestampTracker = createTimestampTracker(); diff --git a/src/services/socket-manager/message-queue.ts b/src/services/socket-manager/message-queue.ts index 4380d0a2..e73e3ec0 100644 --- a/src/services/socket-manager/message-queue.ts +++ b/src/services/socket-manager/message-queue.ts @@ -1,4 +1,5 @@ import { Agent, AgentManagerOptions, ChatProgress, StreamEvents } from '@sdk/types'; +import { Message } from '@sdk/types/entities/agents/chat'; import { getStreamAnalyticsProps } from '@sdk/utils/analytics'; import { AgentManagerItems } from '../agent-manager'; import { Analytics } from '../analytics/mixpanel'; @@ -23,6 +24,26 @@ function getMessageContent(chatEventQueue: ChatEventQueue) { return content; } +function handleAudioTranscribedMessage( + data: any, + items: AgentManagerItems, + onNewMessage: AgentManagerOptions['callbacks']['onNewMessage'] +) { + if (!data.content) { + return; + } + + const userMessage: Message = { + id: data.id || `user-${Date.now()}`, + role: data.role, + content: data.content, + created_at: data.created_at || new Date().toISOString(), + transcribed: true, + }; + items.messages.push(userMessage); + onNewMessage?.([...items.messages], 'user'); +} + function processChatEvent( event: ChatProgress, data: any, @@ -30,9 +51,30 @@ function processChatEvent( items: AgentManagerItems, onNewMessage: AgentManagerOptions['callbacks']['onNewMessage'] ) { + if (event === ChatProgress.Transcribe && data.content) { + handleAudioTranscribedMessage(data, items, onNewMessage); + return; + } + + if (!(event === ChatProgress.Partial || event === ChatProgress.Answer)) { + return; + } + const lastMessage = items.messages[items.messages.length - 1]; - if (!(event === ChatProgress.Partial || event === ChatProgress.Answer) || lastMessage?.role !== 'assistant') { + let currentMessage: Message; + if (lastMessage?.transcribed && lastMessage.role === 'user') { + const initialContent = event === ChatProgress.Answer ? data.content || '' : ''; + currentMessage = { + id: data.id || `assistant-${Date.now()}`, + role: data.role || 'assistant', + content: data.content || '', + created_at: data.created_at || new Date().toISOString(), + }; + items.messages.push(currentMessage); + } else if (lastMessage?.role === 'assistant') { + currentMessage = lastMessage; + } else { return; } @@ -46,8 +88,8 @@ function processChatEvent( const messageContent = getMessageContent(chatEventQueue); - if (lastMessage.content !== messageContent || event === ChatProgress.Answer) { - lastMessage.content = messageContent; + if (currentMessage.content !== messageContent || event === ChatProgress.Answer) { + currentMessage.content = messageContent; onNewMessage?.([...items.messages], event); } @@ -66,9 +108,15 @@ export function createMessageEventQueue( clearQueue: () => (chatEventQueue = {}), onMessage: (event: ChatProgress | StreamEvents, data: any) => { if ('content' in data) { - processChatEvent(event as ChatProgress, data, chatEventQueue, items, options.callbacks.onNewMessage); - - if (event === ChatProgress.Answer) { + const chatEvent = + event === StreamEvents.ChatAnswer + ? ChatProgress.Answer + : event === StreamEvents.ChatAudioTranscribed + ? ChatProgress.Transcribe + : (event as ChatProgress); + processChatEvent(chatEvent, data, chatEventQueue, items, options.callbacks.onNewMessage); + + if (chatEvent === ChatProgress.Answer) { analytics.track('agent-message-received', { messages: items.messages.length, mode: items.chatMode, diff --git a/src/services/streaming-manager/common.ts b/src/services/streaming-manager/common.ts index 434c79ab..d0ceb9cd 100644 --- a/src/services/streaming-manager/common.ts +++ b/src/services/streaming-manager/common.ts @@ -32,6 +32,21 @@ export type StreamingManager; + /** + * Publish a microphone stream to the DataChannel + * Can be called after connection to add microphone input + * @param stream The MediaStream containing the microphone audio track + * supported only for livekit manager + */ + publishMicrophoneStream?(stream: MediaStream): Promise; + + /** + * Unpublish the currently published microphone stream + * Can be called after connection to remove microphone input + * supported only for livekit manager + */ + unpublishMicrophoneStream?(): Promise; + /** * Session identifier information, should be returned in the body of all streaming requests */ diff --git a/src/services/streaming-manager/factory.test.ts b/src/services/streaming-manager/factory.test.ts index e9c557af..6211ce75 100644 --- a/src/services/streaming-manager/factory.test.ts +++ b/src/services/streaming-manager/factory.test.ts @@ -100,4 +100,39 @@ describe('createStreamingManager', () => { expect(mockCreateLiveKitStreamingManager).toHaveBeenCalledWith(agent.id, v2StreamOptions, mockOptions); expect(mockCreateWebRTCStreamingManager).not.toHaveBeenCalled(); }); + + it('passes microphoneStream to createLiveKitStreamingManager when provided', async () => { + const agent = AgentFactory.build({ + presenter: { + type: 'expressive', + voice: { + type: Providers.Microsoft, + voice_id: 'voice-123', + }, + }, + }); + + const v2StreamOptions: CreateSessionV2Options = { + transport_provider: TransportProvider.Livekit, + chat_persist: true, + }; + + const mockMicrophoneStream = new MediaStream(); + const optionsWithMicrophone = { + ...mockOptions, + microphoneStream: mockMicrophoneStream, + }; + + await createStreamingManager( + agent, + { version: StreamApiVersion.V2, ...v2StreamOptions }, + optionsWithMicrophone + ); + + expect(mockCreateLiveKitStreamingManager).toHaveBeenCalledWith( + agent.id, + v2StreamOptions, + expect.objectContaining({ microphoneStream: mockMicrophoneStream }) + ); + }); }); diff --git a/src/services/streaming-manager/livekit-manager.test.ts b/src/services/streaming-manager/livekit-manager.test.ts new file mode 100644 index 00000000..82009e85 --- /dev/null +++ b/src/services/streaming-manager/livekit-manager.test.ts @@ -0,0 +1,310 @@ +import { StreamingManagerOptionsFactory } from '../../test-utils/factories'; +import { CreateSessionV2Options, StreamingManagerOptions } from '../../types/index'; +import { createLiveKitStreamingManager } from './livekit-manager'; + +// Mock livekit-client +const mockPublishTrack = jest.fn(); +const mockUnpublishTrack = jest.fn(); +const mockLocalParticipant = { + publishTrack: mockPublishTrack, + unpublishTrack: mockUnpublishTrack, + sendText: jest.fn(), + audioTrackPublications: new Map(), +}; + +const mockRoom = { + on: jest.fn().mockReturnThis(), + connect: jest.fn().mockResolvedValue(undefined), + prepareConnection: jest.fn().mockResolvedValue(undefined), + disconnect: jest.fn().mockResolvedValue(undefined), + localParticipant: mockLocalParticipant, +}; + +const mockRoomConstructor = jest.fn().mockImplementation(() => mockRoom); + +const mockTrack = { + Source: { + Microphone: 'microphone', + }, +}; + +jest.mock('livekit-client', () => ({ + Room: mockRoomConstructor, + RoomEvent: { + ConnectionStateChanged: 'ConnectionStateChanged', + ConnectionQualityChanged: 'ConnectionQualityChanged', + ActiveSpeakersChanged: 'ActiveSpeakersChanged', + ParticipantConnected: 'ParticipantConnected', + TrackSubscribed: 'TrackSubscribed', + TrackUnsubscribed: 'TrackUnsubscribed', + DataReceived: 'DataReceived', + MediaDevicesError: 'MediaDevicesError', + EncryptionError: 'EncryptionError', + TrackSubscriptionFailed: 'TrackSubscriptionFailed', + }, + ConnectionState: { + Connecting: 'connecting', + Connected: 'connected', + Disconnected: 'disconnected', + Reconnecting: 'reconnecting', + SignalReconnecting: 'signalReconnecting', + }, + Track: mockTrack, +})); + +// Mock createStreamApiV2 +const mockCreateStream = jest.fn().mockResolvedValue({ + id: 'session-123', + session_token: 'token-123', + session_url: 'wss://test.livekit.cloud', +}); + +jest.mock('../../api/streams/streamsApiV2', () => ({ + createStreamApiV2: jest.fn(() => ({ + createStream: mockCreateStream, + })), +})); + +jest.mock('../../config/environment', () => ({ didApiUrl: 'http://test-api.com' })); + +// Test constants +const TEST_AGENT_ID = 'agent123'; +const TEST_TRACK_SID = 'track-sid-123'; +const TEST_AUDIO_TRACK_ID = 'audio-track-1'; +const TEST_AUDIO_TRACK_ID_2 = 'audio-track-2'; +const TEST_SESSION_ID = 'session-123'; +const ASYNC_WAIT_TIME = 10; + +// Helper functions to create mock objects +function createMockAudioTrack(id: string = TEST_AUDIO_TRACK_ID, additionalProps: any = {}) { + return { + kind: 'audio', + id, + enabled: true, + stop: jest.fn(), + ...additionalProps, + } as any; +} + +function createMockTrack(id: string = TEST_AUDIO_TRACK_ID, mediaStreamTrack?: MediaStreamTrack) { + return { + kind: 'audio', + id, + mediaStreamTrack: mediaStreamTrack || createMockAudioTrack(id), + } as any; +} + +function createMockPublication(trackId: string = TEST_AUDIO_TRACK_ID, trackSid: string = TEST_TRACK_SID) { + const mockTrack = createMockTrack(trackId); + return { + trackSid, + track: mockTrack, + source: 'microphone', + mediaStreamTrack: createMockAudioTrack(trackId), + } as any; +} + +function createMockStream(audioTracks: any[] = [createMockAudioTrack()]) { + const stream = new MediaStream(audioTracks); + (stream as any).getAudioTracks = jest.fn(() => audioTracks); + (stream as any).getTracks = jest.fn(() => audioTracks); + return stream; +} + +function getConnectionStateHandler(index?: number) { + const calls = mockRoom.on.mock.calls.filter((call: any[]) => call[0] === 'ConnectionStateChanged'); + if (index !== undefined && calls[index]) { + return calls[index][1]; + } + return calls.length > 0 ? calls[calls.length - 1][1] : undefined; +} + +async function simulateConnection(handlerIndex?: number) { + const handler = getConnectionStateHandler(handlerIndex); + if (handler) { + handler('connected'); + } + await new Promise(resolve => setTimeout(resolve, ASYNC_WAIT_TIME)); +} + +describe('LiveKit Streaming Manager - Microphone Stream', () => { + let agentId: string; + let sessionOptions: CreateSessionV2Options; + let options: StreamingManagerOptions; + + beforeEach(() => { + jest.clearAllMocks(); + agentId = TEST_AGENT_ID; + sessionOptions = { + chat_persist: true, + transport_provider: 'livekit' as any, + }; + options = StreamingManagerOptionsFactory.build(); + }); + + describe('Microphone Stream Publishing', () => { + it('should publish microphone track using publishMicrophoneStream method', async () => { + const mockAudioTrack = createMockAudioTrack(); + const mockStream = createMockStream([mockAudioTrack]); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + await manager.publishMicrophoneStream?.(mockStream); + + expect(mockPublishTrack).toHaveBeenCalledWith(mockAudioTrack, { + source: 'microphone', + }); + }); + + it('should not publish microphone track when publishMicrophoneStream is not called', async () => { + await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + expect(mockPublishTrack).not.toHaveBeenCalled(); + }); + + it('should extract first audio track from MediaStream', async () => { + const mockAudioTrack1 = createMockAudioTrack(TEST_AUDIO_TRACK_ID, { enabled: false }); + const mockAudioTrack2 = createMockAudioTrack(TEST_AUDIO_TRACK_ID_2); + const mockStream = createMockStream([mockAudioTrack1, mockAudioTrack2]); + const mockPublication = createMockPublication(TEST_AUDIO_TRACK_ID); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + await manager.publishMicrophoneStream?.(mockStream); + + expect(mockPublishTrack).toHaveBeenCalledWith(mockAudioTrack1, { + source: 'microphone', + }); + expect(mockPublishTrack).toHaveBeenCalledTimes(1); + }); + }); + + describe('Error Handling', () => { + it('should throw error on publish failure', async () => { + const mockStream = createMockStream(); + const publishError = new Error('Failed to publish track'); + mockPublishTrack.mockRejectedValue(publishError); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + await expect(manager.publishMicrophoneStream?.(mockStream)).rejects.toThrow('Failed to publish track'); + }); + }); + + describe('Integration and Lifecycle', () => { + it('should publish track after room connects using publishMicrophoneStream', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + + expect(mockPublishTrack).not.toHaveBeenCalled(); + + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + + expect(mockPublishTrack).toHaveBeenCalled(); + }); + + it('should unpublish track on disconnect', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + mockUnpublishTrack.mockResolvedValue(undefined); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + await manager.disconnect(); + + expect(mockUnpublishTrack).toHaveBeenCalledWith(mockPublication.track); + }); + + it('should handle track publication lifecycle correctly', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + mockUnpublishTrack.mockResolvedValue(undefined); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + expect(mockPublishTrack).toHaveBeenCalledTimes(1); + + await manager.disconnect(); + expect(mockUnpublishTrack).toHaveBeenCalledTimes(1); + + await manager.disconnect(); + expect(mockUnpublishTrack).toHaveBeenCalledTimes(1); + }); + + it('should handle multiple connection/disconnect cycles', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + mockUnpublishTrack.mockResolvedValue(undefined); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(0); + await manager.publishMicrophoneStream?.(mockStream); + await manager.disconnect(); + + const manager2 = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(1); + await manager2.publishMicrophoneStream?.(mockStream); + await manager2.disconnect(); + + expect(mockPublishTrack).toHaveBeenCalledTimes(2); + }); + + it('should handle stream ending while published', async () => { + const mockAudioTrack = createMockAudioTrack(); + const mockStream = createMockStream([mockAudioTrack]); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + await manager.publishMicrophoneStream?.(mockStream); + + mockAudioTrack.stop(); + + await expect(manager.disconnect()).resolves.not.toThrow(); + }); + + it('should allow publishing microphone stream after connection', async () => { + const mockStream = createMockStream(); + const mockPublication = createMockPublication(); + mockPublishTrack.mockResolvedValue(mockPublication); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + // Initially no stream published + expect(mockPublishTrack).not.toHaveBeenCalled(); + + // Publish stream after connection + await manager.publishMicrophoneStream?.(mockStream); + + expect(mockPublishTrack).toHaveBeenCalledWith(mockStream.getAudioTracks()[0], { source: 'microphone' }); + }); + + it('should throw error when publishing stream before connection', async () => { + const mockStream = createMockStream(); + + const manager = await createLiveKitStreamingManager(agentId, sessionOptions, options); + + // Try to publish before connection + await expect(manager.publishMicrophoneStream?.(mockStream)).rejects.toThrow('Room is not connected'); + }); + }); +}); diff --git a/src/services/streaming-manager/livekit-manager.ts b/src/services/streaming-manager/livekit-manager.ts index 2ee2ed19..e3f6d7d4 100644 --- a/src/services/streaming-manager/livekit-manager.ts +++ b/src/services/streaming-manager/livekit-manager.ts @@ -19,12 +19,14 @@ import { createStreamingLogger, StreamingManager } from './common'; import type { ConnectionQuality, ConnectionState as LiveKitConnectionState, + LocalTrackPublication, Participant, RemoteParticipant, RemoteTrack, Room, RoomEvent, SubscriptionError, + Track, } from 'livekit-client'; async function importLiveKit(): Promise<{ @@ -33,6 +35,7 @@ async function importLiveKit(): Promise<{ ConnectionState: typeof LiveKitConnectionState; RemoteParticipant: typeof RemoteParticipant; RemoteTrack: typeof RemoteTrack; + Track: typeof Track; }> { try { return await import('livekit-client'); @@ -83,12 +86,13 @@ export async function createLiveKitStreamingManager { @@ -226,6 +231,7 @@ export async function createLiveKitStreamingManager { + if (!room) return null; + + const { Track } = await importLiveKit(); + const publishedTracks = room.localParticipant.audioTrackPublications; + + if (publishedTracks) { + for (const [_, publication] of publishedTracks) { + if (publication.source === Track.Source.Microphone && publication.track) { + const publishedTrack = publication.track; + const publishedMediaTrack = publishedTrack.mediaStreamTrack; + if ( + publishedMediaTrack === audioTrack || + (publishedMediaTrack && publishedMediaTrack.id === audioTrack.id) + ) { + return publication as LocalTrackPublication; + } + } + } + } + + return null; + } + + function hasDifferentMicrophoneTrackPublished(audioTrack: MediaStreamTrack): boolean { + if (!microphonePublication || !microphonePublication.track) { + return false; + } + + const publishedMediaTrack = microphonePublication.track.mediaStreamTrack; + return publishedMediaTrack !== audioTrack && publishedMediaTrack?.id !== audioTrack.id; + } + + async function publishMicrophoneStream(stream: MediaStream): Promise { + if (!isConnected || !room) { + log('Room is not connected, cannot publish microphone stream'); + throw new Error('Room is not connected'); + } + + const audioTracks = stream.getAudioTracks(); + if (audioTracks.length === 0) { + log('No audio track found in the provided MediaStream'); + return; + } + + const audioTrack = audioTracks[0]; + const { Track } = await importLiveKit(); + + const existingPublication = await findPublishedMicrophoneTrack(audioTrack); + if (existingPublication) { + log('Microphone track is already published, skipping', { + trackId: audioTrack.id, + publishedTrackId: existingPublication.track?.mediaStreamTrack?.id, + }); + microphonePublication = existingPublication; + return; + } + + if (hasDifferentMicrophoneTrackPublished(audioTrack)) { + log('Unpublishing existing microphone track before publishing new one'); + await unpublishMicrophoneStream(); + } + + log('Publishing microphone track from provided MediaStream', { trackId: audioTrack.id }); + + try { + microphonePublication = await room.localParticipant.publishTrack(audioTrack, { + source: Track.Source.Microphone, + }); + log('Microphone track published successfully', { trackSid: microphonePublication.trackSid }); + } catch (error) { + log('Failed to publish microphone track:', error); + throw error; + } + } + + async function unpublishMicrophoneStream(): Promise { + if (!microphonePublication || !microphonePublication.track) { + return; + } + + try { + if (room) { + await room.localParticipant.unpublishTrack(microphonePublication.track); + log('Microphone track unpublished'); + } + } catch (error) { + log('Error unpublishing microphone track:', error); + } finally { + microphonePublication = null; + } + } + function cleanMediaStream(): void { if (sharedMediaStream) { sharedMediaStream.getTracks().forEach(track => track.stop()); @@ -308,6 +425,7 @@ export async function createLiveKitStreamingManager Promise; + /** + * Publish a microphone stream to the data channel + * Can be called after connection to add microphone input + * @param stream The MediaStream containing the microphone audio track + * supported only for livekit manager + */ + publishMicrophoneStream?: (stream: MediaStream) => Promise; + /** + * Unpublish the currently published microphone stream + * Can be called after connection to remove microphone input + * supported only for livekit manager + */ + unpublishMicrophoneStream?: () => Promise; /** * Method to send a chat message to existing chat with the agent * @param messages diff --git a/src/types/stream/stream.ts b/src/types/stream/stream.ts index 09b54526..74a9f807 100644 --- a/src/types/stream/stream.ts +++ b/src/types/stream/stream.ts @@ -26,6 +26,7 @@ export enum AgentActivityState { export enum StreamEvents { ChatAnswer = 'chat/answer', ChatPartial = 'chat/partial', + ChatAudioTranscribed = 'chat/audio-transcribed', StreamDone = 'stream/done', StreamStarted = 'stream/started', StreamFailed = 'stream/error', @@ -63,6 +64,7 @@ export interface ManagerCallbacks { onAgentActivityStateChange?: (state: AgentActivityState) => void; onVideoIdChange?: (videoId: string | null) => void; onStreamCreated?: (stream: { stream_id: string; session_id: string; agent_id: string }) => void; + onStreamReady?: () => void; } export type ManagerCallbackKeys = keyof ManagerCallbacks; @@ -117,6 +119,12 @@ export interface StreamingManagerOptions { debug?: boolean; auth: Auth; analytics: Analytics; + /** + * Optional MediaStream to use for microphone input. + * If provided, the audio track from this stream will be published to the data channel. + * Supported by LiveKit streaming managers. + */ + microphoneStream?: MediaStream; } export interface SlimRTCStatsReport {