diff --git a/package.json b/package.json index 7f971e63..5e7503e3 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@d-id/client-sdk", "private": false, - "version": "1.1.23", + "version": "1.1.24", "type": "module", "description": "d-id client sdk", "repository": { diff --git a/src/services/agent-manager/index.ts b/src/services/agent-manager/index.ts index 30b8d028..054f389a 100644 --- a/src/services/agent-manager/index.ts +++ b/src/services/agent-manager/index.ts @@ -5,6 +5,7 @@ import { ChatMode, ChatResponse, ConnectionState, + CreateSessionV2Options, CreateStreamOptions, Interrupt, Message, @@ -26,7 +27,7 @@ import { initializeAnalytics } from '../analytics/mixpanel'; import { interruptTimestampTracker, latencyTimestampTracker } from '../analytics/timestamp-tracker'; import { createChat, getRequestHeaders } from '../chat'; import { getInitialMessages } from '../chat/intial-messages'; -import { sendInterrupt, validateInterrupt } from '../interrupt'; +import { sendInterrupt, sendInterruptV2, validateInterrupt } from '../interrupt'; import { SocketManager, createSocketManager } from '../socket-manager'; import { createMessageEventQueue } from '../socket-manager/message-queue'; import { StreamingManager } from '../streaming-manager'; @@ -213,6 +214,22 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt }); }, async reconnect() { + const streamingManager = items.streamingManager as { reconnect?: () => Promise } | undefined; + if (isStreamsV2 && streamingManager?.reconnect) { + try { + await streamingManager.reconnect(); + + analytics.track('agent-chat', { + event: 'reconnect', + mode: items.chatMode, + }); + } catch (error) { + await disconnect(); + await connect(false); + } + return; + } + await disconnect(); await connect(false); @@ -487,7 +504,6 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt }); }, async interrupt({ type }: Interrupt) { - validateInterrupt(items.streamingManager, items.streamingManager?.streamType, videoId); const lastMessage = items.messages[items.messages.length - 1]; analytics.track('agent-video-interrupt', { @@ -498,7 +514,12 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt lastMessage.interrupted = true; options.callbacks.onNewMessage?.([...items.messages], 'answer'); - sendInterrupt(items.streamingManager!, videoId!); + if (isStreamsV2) { + sendInterruptV2(items.streamingManager! as StreamingManager); + } else { + validateInterrupt(items.streamingManager, items.streamingManager?.streamType, videoId); + sendInterrupt(items.streamingManager!, videoId!); + } }, }; } diff --git a/src/services/interrupt/index.ts b/src/services/interrupt/index.ts index 5864a4ba..f5b8a6a9 100644 --- a/src/services/interrupt/index.ts +++ b/src/services/interrupt/index.ts @@ -1,5 +1,12 @@ -import { CreateStreamOptions, StreamEvents, StreamInterruptPayload, StreamType } from '@sdk/types'; +import { + CreateSessionV2Options, + CreateStreamOptions, + StreamEvents, + StreamInterruptPayload, + StreamType, +} from '@sdk/types'; import { StreamingManager } from '../streaming-manager'; +import { DataChannelTopic } from '../streaming-manager/livekit-manager'; export function validateInterrupt( streamingManager: StreamingManager | undefined, @@ -35,3 +42,10 @@ export async function sendInterrupt( streamingManager.sendDataChannelMessage(JSON.stringify(payload)); } + +export async function sendInterruptV2(streamingManager: StreamingManager): Promise { + const payload = { + topic: DataChannelTopic.Interrupt, + }; + streamingManager.sendDataChannelMessage(JSON.stringify(payload)); +} diff --git a/src/services/streaming-manager/livekit-manager.ts b/src/services/streaming-manager/livekit-manager.ts index e3f6d7d4..ed329f86 100644 --- a/src/services/streaming-manager/livekit-manager.ts +++ b/src/services/streaming-manager/livekit-manager.ts @@ -59,9 +59,10 @@ const internalErrorMassage = JSON.stringify({ description: 'Stream Error', }); -enum DataChannelTopic { +export enum DataChannelTopic { Chat = 'lk.chat', Speak = 'did.speak', + Interrupt = 'did.interrupt', } export function handleInitError( @@ -81,7 +82,7 @@ export async function createLiveKitStreamingManager> { +): Promise & { reconnect(): Promise }> { const log = createStreamingLogger(options.debug || false, 'LiveKitStreamingManager'); const { Room, RoomEvent, ConnectionState: LiveKitConnectionState } = await importLiveKit(); @@ -132,6 +133,7 @@ export async function createLiveKitStreamingManager) { const message = typeof payload === 'string' ? payload : JSON.stringify(payload); - return sendTextMessage(message, DataChannelTopic.Speak); + return sendMessage(message, DataChannelTopic.Speak); }, - async disconnect() { - if (room) { - await unpublishMicrophoneStream(); - await room.disconnect(); - room = null; + disconnect, + + async reconnect() { + if (room?.state === LiveKitConnectionState.Connected) { + log('Room is already connected'); + return; + } + + if (!room || !url || !token) { + log('Cannot reconnect: missing room, URL or token'); + throw new Error('Cannot reconnect: session not available'); + } + + log('Reconnecting to LiveKit room, state:', room.state); + callbacks.onConnectionStateChange?.(ConnectionState.Connecting); + + try { + await room.connect(url, token); + log('Room reconnected'); + isConnected = true; + + // If no remote participants, wait for agent to join + if (room.remoteParticipants.size === 0) { + log('Waiting for agent to join...'); + + const agentJoined = await new Promise(resolve => { + const timeout = setTimeout(() => { + room?.off(RoomEvent.ParticipantConnected, onParticipantConnected); + resolve(false); + }, 5000); + + const onParticipantConnected = () => { + clearTimeout(timeout); + room?.off(RoomEvent.ParticipantConnected, onParticipantConnected); + resolve(true); + }; + + room?.on(RoomEvent.ParticipantConnected, onParticipantConnected); + }); + + if (!agentJoined) { + log('Agent did not join within timeout'); + await room.disconnect(); + throw new Error('Agent did not rejoin the room'); + } + + log('Agent joined'); + } + + callbacks.onConnectionStateChange?.(ConnectionState.Connected); + } catch (error) { + log('Failed to reconnect:', error); + callbacks.onConnectionStateChange?.(ConnectionState.Fail); + throw error; } - cleanMediaStream(); - isConnected = false; - callbacks.onConnectionStateChange?.(ConnectionState.Completed); - callbacks.onAgentActivityStateChange?.(AgentActivityState.Idle); }, - sendDataChannelMessage: sendTextMessage, + sendDataChannelMessage, sendTextMessage, publishMicrophoneStream, unpublishMicrophoneStream, @@ -448,4 +528,6 @@ export async function createLiveKitStreamingManager = StreamingManager; +export type LiveKitStreamingManager = StreamingManager & { + reconnect(): Promise; +};