Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@d-id/client-sdk",
"private": false,
"version": "1.1.23",
"version": "1.1.24",
"type": "module",
"description": "d-id client sdk",
"repository": {
Expand Down
27 changes: 24 additions & 3 deletions src/services/agent-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
ChatMode,
ChatResponse,
ConnectionState,
CreateSessionV2Options,
CreateStreamOptions,
Interrupt,
Message,
Expand All @@ -26,7 +27,7 @@ import { initializeAnalytics } from '../analytics/mixpanel';
import { interruptTimestampTracker, latencyTimestampTracker } from '../analytics/timestamp-tracker';
import { createChat, getRequestHeaders } from '../chat';
import { getInitialMessages } from '../chat/intial-messages';
import { sendInterrupt, validateInterrupt } from '../interrupt';
import { sendInterrupt, sendInterruptV2, validateInterrupt } from '../interrupt';
import { SocketManager, createSocketManager } from '../socket-manager';
import { createMessageEventQueue } from '../socket-manager/message-queue';
import { StreamingManager } from '../streaming-manager';
Expand Down Expand Up @@ -213,6 +214,22 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
});
},
async reconnect() {
const streamingManager = items.streamingManager as { reconnect?: () => Promise<void> } | undefined;
if (isStreamsV2 && streamingManager?.reconnect) {
try {
await streamingManager.reconnect();

analytics.track('agent-chat', {
event: 'reconnect',
mode: items.chatMode,
});
} catch (error) {
await disconnect();
await connect(false);
}
return;
}

await disconnect();
await connect(false);

Expand Down Expand Up @@ -487,7 +504,6 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
});
},
async interrupt({ type }: Interrupt) {
validateInterrupt(items.streamingManager, items.streamingManager?.streamType, videoId);
const lastMessage = items.messages[items.messages.length - 1];

analytics.track('agent-video-interrupt', {
Expand All @@ -498,7 +514,12 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt

lastMessage.interrupted = true;
options.callbacks.onNewMessage?.([...items.messages], 'answer');
sendInterrupt(items.streamingManager!, videoId!);
if (isStreamsV2) {
sendInterruptV2(items.streamingManager! as StreamingManager<CreateSessionV2Options>);
} else {
validateInterrupt(items.streamingManager, items.streamingManager?.streamType, videoId);
sendInterrupt(items.streamingManager!, videoId!);
}
},
};
}
16 changes: 15 additions & 1 deletion src/services/interrupt/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { CreateStreamOptions, StreamEvents, StreamInterruptPayload, StreamType } from '@sdk/types';
import {
CreateSessionV2Options,
CreateStreamOptions,
StreamEvents,
StreamInterruptPayload,
StreamType,
} from '@sdk/types';
import { StreamingManager } from '../streaming-manager';
import { DataChannelTopic } from '../streaming-manager/livekit-manager';

export function validateInterrupt(
streamingManager: StreamingManager<CreateStreamOptions> | undefined,
Expand Down Expand Up @@ -35,3 +42,10 @@ export async function sendInterrupt(

streamingManager.sendDataChannelMessage(JSON.stringify(payload));
}

export async function sendInterruptV2(streamingManager: StreamingManager<CreateSessionV2Options>): Promise<void> {
const payload = {
topic: DataChannelTopic.Interrupt,
};
streamingManager.sendDataChannelMessage(JSON.stringify(payload));
}
112 changes: 97 additions & 15 deletions src/services/streaming-manager/livekit-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ const internalErrorMassage = JSON.stringify({
description: 'Stream Error',
});

enum DataChannelTopic {
export enum DataChannelTopic {
Chat = 'lk.chat',
Speak = 'did.speak',
Interrupt = 'did.interrupt',
}

export function handleInitError(
Expand All @@ -81,7 +82,7 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
agentId: string,
sessionOptions: CreateSessionV2Options,
options: StreamingManagerOptions
): Promise<StreamingManager<T>> {
): Promise<StreamingManager<T> & { reconnect(): Promise<void> }> {
const log = createStreamingLogger(options.debug || false, 'LiveKitStreamingManager');

const { Room, RoomEvent, ConnectionState: LiveKitConnectionState } = await importLiveKit();
Expand Down Expand Up @@ -132,6 +133,7 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
.on(RoomEvent.ConnectionQualityChanged, handleConnectionQualityChanged)
.on(RoomEvent.ActiveSpeakersChanged, handleActiveSpeakersChanged)
.on(RoomEvent.ParticipantConnected, handleParticipantConnected)
.on(RoomEvent.ParticipantDisconnected, handleParticipantDisconnected)
.on(RoomEvent.TrackSubscribed, handleTrackSubscribed)
.on(RoomEvent.TrackUnsubscribed, handleTrackUnsubscribed)
.on(RoomEvent.DataReceived, handleDataReceived)
Expand Down Expand Up @@ -212,6 +214,13 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
log('Participant connected:', participant.identity);
}

function handleParticipantDisconnected(participant: RemoteParticipant): void {
log('Participant disconnected:', participant.identity);

// Agent left the room - treat as disconnect
disconnect();
}

function handleTrackSubscribed(track: RemoteTrack, publication: any, participant: RemoteParticipant): void {
log(`Track subscribed: ${track.kind} from ${participant.identity}`);

Expand Down Expand Up @@ -399,7 +408,7 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
}
}

async function sendTextMessage(message: string, topic: DataChannelTopic = DataChannelTopic.Chat) {
async function sendMessage(message: string, topic: DataChannelTopic) {
if (!isConnected || !room) {
log('Room is not connected for sending messages');
callbacks.onError?.(new Error(internalErrorMassage), {
Expand All @@ -417,25 +426,96 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
}
}

async function sendDataChannelMessage(payload: string) {
try {
const parsed = JSON.parse(payload);
const topic = parsed.topic;
return sendMessage('', topic);
} catch (error) {
log('Failed to send data channel message:', error);
callbacks.onError?.(new Error(internalErrorMassage), { sessionId });
}
}

function sendTextMessage(message: string) {
return sendMessage(message, DataChannelTopic.Chat);
}

async function disconnect() {
if (room) {
await unpublishMicrophoneStream();
await room.disconnect();
}
cleanMediaStream();
isConnected = false;
callbacks.onConnectionStateChange?.(ConnectionState.Disconnected);
callbacks.onAgentActivityStateChange?.(AgentActivityState.Idle);
}

return {
speak(payload: PayloadType<T>) {
const message = typeof payload === 'string' ? payload : JSON.stringify(payload);
return sendTextMessage(message, DataChannelTopic.Speak);
return sendMessage(message, DataChannelTopic.Speak);
},

async disconnect() {
if (room) {
await unpublishMicrophoneStream();
await room.disconnect();
room = null;
disconnect,

async reconnect() {
if (room?.state === LiveKitConnectionState.Connected) {
log('Room is already connected');
return;
}

if (!room || !url || !token) {
log('Cannot reconnect: missing room, URL or token');
throw new Error('Cannot reconnect: session not available');
}

log('Reconnecting to LiveKit room, state:', room.state);
callbacks.onConnectionStateChange?.(ConnectionState.Connecting);

try {
await room.connect(url, token);
log('Room reconnected');
isConnected = true;

// If no remote participants, wait for agent to join
if (room.remoteParticipants.size === 0) {
log('Waiting for agent to join...');

const agentJoined = await new Promise<boolean>(resolve => {
const timeout = setTimeout(() => {
room?.off(RoomEvent.ParticipantConnected, onParticipantConnected);
resolve(false);
}, 5000);

const onParticipantConnected = () => {
clearTimeout(timeout);
room?.off(RoomEvent.ParticipantConnected, onParticipantConnected);
resolve(true);
};

room?.on(RoomEvent.ParticipantConnected, onParticipantConnected);
});

if (!agentJoined) {
log('Agent did not join within timeout');
await room.disconnect();
throw new Error('Agent did not rejoin the room');
}

log('Agent joined');
}

callbacks.onConnectionStateChange?.(ConnectionState.Connected);
} catch (error) {
log('Failed to reconnect:', error);
callbacks.onConnectionStateChange?.(ConnectionState.Fail);
throw error;
}
cleanMediaStream();
isConnected = false;
callbacks.onConnectionStateChange?.(ConnectionState.Completed);
callbacks.onAgentActivityStateChange?.(AgentActivityState.Idle);
},

sendDataChannelMessage: sendTextMessage,
sendDataChannelMessage,
sendTextMessage,
publishMicrophoneStream,
unpublishMicrophoneStream,
Expand All @@ -448,4 +528,6 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
};
}

export type LiveKitStreamingManager<T extends CreateStreamOptions> = StreamingManager<T>;
export type LiveKitStreamingManager<T extends CreateStreamOptions> = StreamingManager<T> & {
reconnect(): Promise<void>;
};