Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@d-id/client-sdk",
"private": false,
"version": "1.1.27",
"version": "1.1.28",
"type": "module",
"description": "d-id client sdk",
"repository": {
Expand Down
3 changes: 3 additions & 0 deletions src/services/agent-manager/connect-to-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
ConnectionState,
CreateSessionV2Options,
CreateStreamOptions,
Interrupt,
StreamEvents,
StreamType,
StreamingState,
Expand Down Expand Up @@ -165,6 +166,8 @@ type ConnectToManagerOptions = AgentManagerOptions & {
onVideoIdChange?: (videoId: string | null) => void;
/** Internal callback for livekit-manager data channel events */
onMessage?: ChatProgressCallback;
/** Internal callback for when interrupt is detected by streaming manager */
onInterruptDetected?: (interrupt: Interrupt) => void;
};
chatId?: string;
};
Expand Down
4 changes: 2 additions & 2 deletions src/services/agent-manager/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ describe('createAgentManager', () => {
// Add a message to interrupt
await manager.chat('Hello');

await manager.interrupt({ type: 'click' });
manager.interrupt({ type: 'click' });

expect(validateInterrupt).toHaveBeenCalledWith(mockStreamingManager, StreamType.Legacy, null);
expect(sendInterrupt).toHaveBeenCalledWith(mockStreamingManager, null);
Expand All @@ -538,7 +538,7 @@ describe('createAgentManager', () => {
throw new Error('Interrupt validation failed');
});

await expect(manager.interrupt({ type: 'click' })).rejects.toThrow('Interrupt validation failed');
expect(() => manager.interrupt({ type: 'click' })).toThrow('Interrupt validation failed');

// Verify validateInterrupt was called
expect(validateInterrupt).toHaveBeenCalledWith(mockStreamingManager, StreamType.Legacy, null);
Expand Down
46 changes: 27 additions & 19 deletions src/services/agent-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
videoId = newVideoId;
};

const interrupt = ({ type }: Interrupt) => {
const lastMessage = items.messages[items.messages.length - 1];

analytics.track('agent-video-interrupt', {
type: type || 'click',
video_duration_to_interrupt: interruptTimestampTracker.get(true),
message_duration_to_interrupt: latencyTimestampTracker.get(true),
});

lastMessage.interrupted = true;
options.callbacks.onNewMessage?.([...items.messages], 'answer');

if (isStreamsV2) {
sendInterruptV2(items.streamingManager! as StreamingManager<CreateSessionV2Options>);
} else {
validateInterrupt(items.streamingManager, items.streamingManager?.streamType, videoId);
sendInterrupt(items.streamingManager!, videoId!);
}
};

const loadedTimestamp = Date.now();
defer(() => {
analytics.track('agent-sdk', { event: 'loaded', ...getAnalyticsInfo(agentEntity) }, loadedTimestamp);
Expand Down Expand Up @@ -129,7 +149,12 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
{
...options,
mode,
callbacks: { ...options.callbacks, onVideoIdChange: updateVideoId, onMessage },
callbacks: {
...options.callbacks,
onVideoIdChange: updateVideoId,
onMessage,
onInterruptDetected: interrupt,
},
},
agentsApi,
analytics,
Expand Down Expand Up @@ -506,23 +531,6 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
metadata: { chat_id: items.chat?.id, agent_id: agentEntity.id },
});
},
async interrupt({ type }: Interrupt) {
const lastMessage = items.messages[items.messages.length - 1];

analytics.track('agent-video-interrupt', {
type: type || 'click',
video_duration_to_interrupt: interruptTimestampTracker.get(true),
message_duration_to_interrupt: latencyTimestampTracker.get(true),
});

lastMessage.interrupted = true;
options.callbacks.onNewMessage?.([...items.messages], 'answer');
if (isStreamsV2) {
sendInterruptV2(items.streamingManager! as StreamingManager<CreateSessionV2Options>);
} else {
validateInterrupt(items.streamingManager, items.streamingManager?.streamType, videoId);
sendInterrupt(items.streamingManager!, videoId!);
}
},
interrupt,
};
}
22 changes: 20 additions & 2 deletions src/services/streaming-manager/livekit-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt

let trackSubscriptionTimeoutId: ReturnType<typeof setTimeout> | null = null;
const TRACK_SUBSCRIPTION_TIMEOUT_MS = 20000;
let currentActivityState: AgentActivityState = AgentActivityState.Idle;

const streamApi = createStreamApiV2(auth, baseURL || didApiUrl, agentId, callbacks.onError);
let sessionId: string | undefined;
Expand Down Expand Up @@ -216,11 +217,22 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt

function handleActiveSpeakersChanged(activeSpeakers: Participant[]): void {
log('Active speakers changed:', activeSpeakers);
const activeSpeaker = activeSpeakers[0];
if (activeSpeaker) {
const isLocalParticipantSpeaking = activeSpeakers.find(speaker => speaker.isLocal);
const isRemoteParticipantSpeaking = activeSpeakers.find(speaker => !speaker.isLocal);

if (isLocalParticipantSpeaking) {
callbacks.onAgentActivityStateChange?.(AgentActivityState.Idle);

if (currentActivityState !== AgentActivityState.Idle) {
callbacks.onInterruptDetected?.({ type: 'audio' });
currentActivityState = AgentActivityState.Idle;
}
} else if (isRemoteParticipantSpeaking) {
currentActivityState = AgentActivityState.Talking;
callbacks.onAgentActivityStateChange?.(AgentActivityState.Talking);
} else {
callbacks.onAgentActivityStateChange?.(AgentActivityState.Idle);
currentActivityState = AgentActivityState.Idle;
}
}

Expand Down Expand Up @@ -310,6 +322,11 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
event: eventName,
...data,
});
// Set loading state after transcribed message is processed (similar to v1)
// Use queueMicrotask to ensure message is added before setting loading state
queueMicrotask(() => {
callbacks.onAgentActivityStateChange?.(AgentActivityState.Loading);
});
}
} catch (e) {
log('Failed to parse data channel message:', e);
Expand Down Expand Up @@ -481,6 +498,7 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
isConnected = false;
callbacks.onConnectionStateChange?.(ConnectionState.Disconnected);
callbacks.onAgentActivityStateChange?.(AgentActivityState.Idle);
currentActivityState = AgentActivityState.Idle;
}

return {
Expand Down
3 changes: 3 additions & 0 deletions src/types/stream/stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Analytics } from '@sdk/services/analytics/mixpanel';
import { VideoRTCStatsReport } from '@sdk/services/streaming-manager/stats/report';
import { Auth } from '../auth';
import { Interrupt } from '../entities';
import { ChatProgressCallback } from '../entities/agents/manager';
import { CreateClipStreamRequest, CreateTalkStreamRequest, SendClipStreamPayload, SendTalkStreamPayload } from './api';
import { ICreateStreamRequestResponse, IceCandidate, SendStreamPayloadResponse, Status } from './rtc';
Expand All @@ -20,6 +21,7 @@ export enum ConnectivityState {

export enum AgentActivityState {
Idle = 'IDLE',
Loading = 'LOADING',
Talking = 'TALKING',
}

Expand Down Expand Up @@ -65,6 +67,7 @@ export interface ManagerCallbacks {
onVideoIdChange?: (videoId: string | null) => void;
onStreamCreated?: (stream: { stream_id: string; session_id: string; agent_id: string }) => void;
onStreamReady?: () => void;
onInterruptDetected?: (interrupt: Interrupt) => void;
}

export type ManagerCallbackKeys = keyof ManagerCallbacks;
Expand Down