Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@d-id/client-sdk",
"private": false,
"version": "1.1.21",
"version": "1.1.22",
"type": "module",
"description": "d-id client sdk",
"repository": {
Expand Down
32 changes: 20 additions & 12 deletions src/api/streams/streamApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,28 @@ export function createStreamApi(
const client = createClient(auth, `${host}/agents/${agentId}`, onError);

return {
createStream(options: CreateStreamOptions) {
return client.post<ICreateStreamRequestResponse>('/streams', options);
createStream(options: CreateStreamOptions, signal?: AbortSignal) {
return client.post<ICreateStreamRequestResponse>('/streams', options, { signal });
},
startConnection(streamId: string, answer: RTCSessionDescriptionInit, sessionId?: string) {
return client.post<Status>(`/streams/${streamId}/sdp`, {
session_id: sessionId,
answer,
});
startConnection(streamId: string, answer: RTCSessionDescriptionInit, sessionId?: string, signal?: AbortSignal) {
return client.post<Status>(
`/streams/${streamId}/sdp`,
{
session_id: sessionId,
answer,
},
{ signal }
);
},
addIceCandidate(streamId: string, candidate: IceCandidate, sessionId: string) {
return client.post<Status>(`/streams/${streamId}/ice`, {
session_id: sessionId,
...candidate,
});
addIceCandidate(streamId: string, candidate: IceCandidate, sessionId: string, signal?: AbortSignal) {
return client.post<Status>(
`/streams/${streamId}/ice`,
{
session_id: sessionId,
...candidate,
},
{ signal }
);
},
sendStreamRequest(streamId: string, sessionId: string, payload: SendClipStreamPayload | SendTalkStreamPayload) {
return client.post<SendStreamPayloadResponse>(`/streams/${streamId}`, {
Expand Down
133 changes: 126 additions & 7 deletions src/services/agent-manager/connect-to-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,15 @@ describe('connect-to-manager', () => {
let mockAnalytics: Analytics;
let mockStreamingManager: any;
let mockChat: any;
let mockAbortController: AbortController;
let mockAbortSignal: AbortSignal;

beforeEach(() => {
jest.clearAllMocks();

mockAbortController = new AbortController();
mockAbortSignal = mockAbortController.signal;

mockAgent = {
id: 'agent-123',
name: 'Test Agent',
Expand Down Expand Up @@ -165,7 +170,8 @@ describe('connect-to-manager', () => {
onVideoStateChange: expect.any(Function),
onAgentActivityStateChange: expect.any(Function),
}),
})
}),
mockAbortSignal
);
});

Expand Down Expand Up @@ -393,7 +399,8 @@ describe('connect-to-manager', () => {
},
expect.not.objectContaining({
chatId: expect.anything(),
})
}),
mockAbortSignal
);
});

Expand All @@ -414,7 +421,8 @@ describe('connect-to-manager', () => {
},
expect.not.objectContaining({
chatId: expect.anything(),
})
}),
mockAbortSignal
);
});

Expand All @@ -437,7 +445,8 @@ describe('connect-to-manager', () => {
}),
expect.not.objectContaining({
chatId: expect.anything(),
})
}),
mockAbortSignal
);
});
});
Expand Down Expand Up @@ -518,7 +527,7 @@ describe('connect-to-manager', () => {
},
};

await initializeStreamAndChat(expressiveAgent, mockOptions, mockAgentsApi, mockAnalytics);
const result = await initializeStreamAndChat(expressiveAgent, mockOptions, mockAgentsApi, mockAnalytics);

expect(createStreamingManager).toHaveBeenCalledWith(
expressiveAgent,
Expand All @@ -528,8 +537,19 @@ describe('connect-to-manager', () => {
},
expect.not.objectContaining({
chatId: expect.anything(),
})
}),
undefined
);

// Verify Streams V2 path creates chat with correct chatId format
expect(result.chat).toBeDefined();
expect(result.chat?.id).toMatch(/^cht_/);
expect(result.chat?.id).toContain(mockStreamingManager.sessionId);
expect(result.chat?.chat_mode).toBe(ChatMode.Functional);
expect(result.chat?.agent_id).toBe(expressiveAgent.id);

// Verify createChat is NOT called for V2 agents (chat is created internally)
expect(createChat).not.toHaveBeenCalled();
});

it('should use CreateStreamOptions for non-expressive agents', async () => {
Expand All @@ -544,8 +564,107 @@ describe('connect-to-manager', () => {
}),
expect.not.objectContaining({
chatId: expect.anything(),
})
}),
mockAbortSignal
);
});
});

describe('Error Handling with AbortController', () => {
it('should abort and disconnect streaming manager when error occurs during initialization', async () => {
const error = new Error('Connection failed');
let streamingManagerRef: any;

(createStreamingManager as jest.Mock).mockImplementationOnce((agent, streamOptions, options, signal) => {
streamingManagerRef = {
...mockStreamingManager,
disconnect: jest.fn().mockResolvedValue(undefined),
};
return new Promise((resolve, reject) => {
Promise.resolve().then(() => {
if (options.callbacks.onConnectionStateChange) {
options.callbacks.onConnectionStateChange(ConnectionState.Connecting);
}
reject(error);
});
});
});

(createChat as jest.Mock).mockResolvedValueOnce({
chat: mockChat,
chatMode: ChatMode.Functional,
});

await expect(initializeStreamAndChat(mockAgent, mockOptions, mockAgentsApi, mockAnalytics)).rejects.toThrow(
'Connection failed'
);
});

it('should handle error when streaming manager is created but chat creation fails', async () => {
const chatError = new Error('Chat creation failed');
const disconnectSpy = jest.fn().mockResolvedValue(undefined);
const streamingManagerWithDisconnect = {
...mockStreamingManager,
disconnect: disconnectSpy,
};

// Make streaming manager succeed immediately to set streamingManagerRef
(createStreamingManager as jest.Mock).mockImplementationOnce((agent, streamOptions, options) => {
// Trigger connection state change to Connected in next microtask
Promise.resolve().then(() => {
if (options.callbacks.onConnectionStateChange) {
options.callbacks.onConnectionStateChange(ConnectionState.Connected);
}
});
return Promise.resolve(streamingManagerWithDisconnect);
});

// Make chat creation fail AFTER streaming manager resolves
// The .then() callback on connectToManagerPromise sets streamingManagerRef,
// so we need connectToManager to resolve before Promise.all rejects
(createChat as jest.Mock).mockImplementationOnce(() => {
return new Promise((resolve, reject) => {
Promise.resolve().then(() => {
reject(chatError);
});
});
});

await expect(initializeStreamAndChat(mockAgent, mockOptions, mockAgentsApi, mockAnalytics)).rejects.toThrow(
'Chat creation failed'
);

expect(disconnectSpy).toHaveBeenCalled();
});
});

describe('Connection State Handling', () => {
it('should resolve when connection state changes to Connected before manager is ready', async () => {
let onConnectionStateChange: ((state: ConnectionState) => void) | undefined;
let managerResolved = false;

(createStreamingManager as jest.Mock).mockImplementationOnce((agent, streamOptions, options) => {
onConnectionStateChange = options.callbacks.onConnectionStateChange;

return new Promise(resolve => {
// Trigger connection state change to Connected BEFORE manager is created
// This should set shouldResolveOnComplete = true
if (onConnectionStateChange) {
onConnectionStateChange(ConnectionState.Connected);
}

// Resolve the manager in the next microtask
Promise.resolve().then(() => {
managerResolved = true;
resolve(mockStreamingManager);
});
});
});

const result = await initializeStreamAndChat(mockAgent, mockOptions, mockAgentsApi, mockAnalytics);

expect(managerResolved).toBe(true);
expect(result.streamingManager).toBe(mockStreamingManager);
});
});
});
Loading