Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@d-id/client-sdk",
"private": false,
"version": "1.1.31",
"version": "1.1.32",
"type": "module",
"description": "d-id client sdk",
"repository": {
Expand Down
40 changes: 16 additions & 24 deletions src/services/streaming-manager/livekit-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,9 @@ export enum DataChannelTopic {
export function handleInitError(
error: unknown,
log: (message?: any, ...optionalParams: any[]) => void,
callbacks: StreamingManagerOptions['callbacks'],
markInitialConnectionDone: () => void
callbacks: StreamingManagerOptions['callbacks']
): void {
log('Failed to connect to LiveKit room:', error);
markInitialConnectionDone();
callbacks.onConnectionStateChange?.(ConnectionState.Fail);
callbacks.onError?.(error as Error, { sessionId: '' });
throw error;
Expand All @@ -93,9 +91,10 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
let room: Room | null = null;
let isConnected = false;
const streamType = StreamType.Fluent;
let isInitialConnection = true;
let sharedMediaStream: MediaStream | null = null;
let microphonePublication: LocalTrackPublication | null = null;
// We defer Connected until video track is subscribed to align with WebRTC behavior
let hasEmittedConnected = false;

room = new Room({
adaptiveStream: false, // Must be false to use mediaStreamTrack directly
Expand Down Expand Up @@ -126,9 +125,7 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt

await room.prepareConnection(url, token);
} catch (error) {
handleInitError(error, log, callbacks, () => {
isInitialConnection = false;
});
handleInitError(error, log, callbacks);
}

if (!url || !token || !sessionId) {
Expand All @@ -148,8 +145,6 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
.on(RoomEvent.EncryptionError, handleEncryptionError)
.on(RoomEvent.TrackSubscriptionFailed, handleTrackSubscriptionFailed);

callbacks.onConnectionStateChange?.(ConnectionState.New);

function handleTranscriptionReceived(_segments: TranscriptionSegment[], participant?: Participant): void {
if (participant?.isLocal) {
latencyTimestampTracker.update();
Expand All @@ -173,12 +168,8 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
callbacks.onError?.(new Error('Track subscription timeout'), { sessionId });
disconnect();
}, TRACK_SUBSCRIPTION_TIMEOUT_MS);

isInitialConnection = false;
} catch (error) {
handleInitError(error, log, callbacks, () => {
isInitialConnection = false;
});
handleInitError(error, log, callbacks);
}

analytics.enrich({
Expand All @@ -189,24 +180,18 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
log('Connection state changed:', state);
switch (state) {
case LiveKitConnectionState.Connecting:
log('CALLBACK: onConnectionStateChange(Connecting)');
callbacks.onConnectionStateChange?.(ConnectionState.Connecting);
break;
case LiveKitConnectionState.Connected:
log('LiveKit room connected successfully');
isConnected = true;

// During initial connection, defer the callback to ensure manager is returned first
if (isInitialConnection) {
queueMicrotask(() => {
callbacks.onConnectionStateChange?.(ConnectionState.Connected);
});
} else {
callbacks.onConnectionStateChange?.(ConnectionState.Connected);
}
break;
case LiveKitConnectionState.Disconnected:
log('LiveKit room disconnected');
isConnected = false;
hasEmittedConnected = false;
callbacks.onConnectionStateChange?.(ConnectionState.Disconnected);
break;
case LiveKitConnectionState.Reconnecting:
Expand Down Expand Up @@ -268,7 +253,14 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt

if (track.kind === 'video') {
callbacks.onStreamReady?.();
log('CALLBACK: onSrcObjectReady');
callbacks.onSrcObjectReady?.(sharedMediaStream);
if (!hasEmittedConnected) {
hasEmittedConnected = true;
log('CALLBACK: onConnectionStateChange(Connected)');
callbacks.onConnectionStateChange?.(ConnectionState.Connected);
}
log('CALLBACK: onVideoStateChange(Start)');
callbacks.onVideoStateChange?.(StreamingState.Start);
}
}
Expand Down Expand Up @@ -495,6 +487,7 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
}
cleanMediaStream();
isConnected = false;
hasEmittedConnected = false;
callbacks.onConnectionStateChange?.(ConnectionState.Disconnected);
callbacks.onAgentActivityStateChange?.(AgentActivityState.Idle);
currentActivityState = AgentActivityState.Idle;
Expand All @@ -520,6 +513,7 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
}

log('Reconnecting to LiveKit room, state:', room.state);
hasEmittedConnected = false;
callbacks.onConnectionStateChange?.(ConnectionState.Connecting);

try {
Expand Down Expand Up @@ -554,8 +548,6 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt

log('Agent joined');
}

callbacks.onConnectionStateChange?.(ConnectionState.Connected);
} catch (error) {
log('Failed to reconnect:', error);
callbacks.onConnectionStateChange?.(ConnectionState.Fail);
Expand Down
17 changes: 16 additions & 1 deletion src/services/streaming-manager/webrtc-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,19 @@ function handleLegacyStreamState({
dataChannelSignal,
onVideoStateChange,
report,
log,
}: {
statsSignal?: StreamingState;
dataChannelSignal?: StreamingState;
onVideoStateChange: StreamingManagerOptions['callbacks']['onVideoStateChange'];
report?: VideoRTCStatsReport;
log: ReturnType<typeof createStreamingLogger>;
}) {
if (statsSignal === StreamingState.Start && dataChannelSignal === StreamingState.Start) {
log('CALLBACK: onVideoStateChange(Start)');
onVideoStateChange?.(StreamingState.Start);
} else if (statsSignal === StreamingState.Stop && dataChannelSignal === StreamingState.Stop) {
log('CALLBACK: onVideoStateChange(Stop)');
onVideoStateChange?.(StreamingState.Stop, report);
}
}
Expand All @@ -82,16 +86,20 @@ function handleFluentStreamState({
onVideoStateChange,
onAgentActivityStateChange,
report,
log,
}: {
statsSignal?: StreamingState;
dataChannelSignal?: StreamingState;
onVideoStateChange: StreamingManagerOptions['callbacks']['onVideoStateChange'];
onAgentActivityStateChange?: StreamingManagerOptions['callbacks']['onAgentActivityStateChange'];
report?: VideoRTCStatsReport;
log: ReturnType<typeof createStreamingLogger>;
}) {
if (statsSignal === StreamingState.Start) {
log('CALLBACK: onVideoStateChange(Start)');
onVideoStateChange?.(StreamingState.Start);
} else if (statsSignal === StreamingState.Stop) {
log('CALLBACK: onVideoStateChange(Stop)');
onVideoStateChange?.(StreamingState.Stop, report);
}

Expand All @@ -109,23 +117,26 @@ function handleStreamState({
onAgentActivityStateChange,
streamType,
report,
log,
}: {
statsSignal?: StreamingState;
dataChannelSignal?: StreamingState;
onVideoStateChange: StreamingManagerOptions['callbacks']['onVideoStateChange'];
onAgentActivityStateChange?: StreamingManagerOptions['callbacks']['onAgentActivityStateChange'];
streamType: StreamType;
report?: VideoRTCStatsReport;
log: ReturnType<typeof createStreamingLogger>;
}) {
if (streamType === StreamType.Legacy) {
handleLegacyStreamState({ statsSignal, dataChannelSignal, onVideoStateChange, report });
handleLegacyStreamState({ statsSignal, dataChannelSignal, onVideoStateChange, report, log });
} else if (streamType === StreamType.Fluent) {
handleFluentStreamState({
statsSignal,
dataChannelSignal,
onVideoStateChange,
onAgentActivityStateChange,
report,
log,
});
}
}
Expand Down Expand Up @@ -182,6 +193,7 @@ export async function createWebRTCStreamingManager<T extends CreateStreamOptions
isConnected = true;

if (isDatachannelOpen) {
log('CALLBACK: onConnectionStateChange(Connected)');
callbacks.onConnectionStateChange?.(ConnectionState.Connected);
}
};
Expand All @@ -198,6 +210,7 @@ export async function createWebRTCStreamingManager<T extends CreateStreamOptions
onAgentActivityStateChange: callbacks.onAgentActivityStateChange,
report,
streamType,
log,
}),
state => callbacks.onConnectivityStateChange?.(state)
);
Expand Down Expand Up @@ -258,6 +271,7 @@ export async function createWebRTCStreamingManager<T extends CreateStreamOptions
onVideoStateChange: callbacks.onVideoStateChange,
onAgentActivityStateChange: callbacks.onAgentActivityStateChange,
streamType,
log,
});
}

Expand Down Expand Up @@ -290,6 +304,7 @@ export async function createWebRTCStreamingManager<T extends CreateStreamOptions

peerConnection.ontrack = (event: RTCTrackEvent) => {
log('peerConnection.ontrack', event);
log('CALLBACK: onSrcObjectReady');
callbacks.onSrcObjectReady?.(event.streams[0]);
};

Expand Down