Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@d-id/client-sdk",
"private": false,
"version": "1.1.13",
"version": "1.1.14",
"type": "module",
"description": "d-id client sdk",
"repository": {
Expand Down
116 changes: 95 additions & 21 deletions src/services/agent-manager/connect-to-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import {
ConnectionState,
Providers,
StreamEvents,
StreamType,
StreamingState,
StreamType,
TransportProvider,
} from '../../types';
import { Analytics } from '../analytics/mixpanel';
import { createChat } from '../chat';
import { createStreamingManager } from '../streaming-manager';
import { createStreamingManager, StreamApiVersion } from '../streaming-manager';
import { initializeStreamAndChat } from './connect-to-manager';

// Mock dependencies
Expand Down Expand Up @@ -116,7 +117,8 @@ describe('connect-to-manager', () => {
mockAgentsApi = { getById: jest.fn().mockResolvedValue(mockAgent), chat: jest.fn(), createRating: jest.fn() };

// Setup mocks
(createStreamingManager as jest.Mock).mockImplementation((agentId, streamArgs, options) => {
(createStreamingManager as jest.Mock).mockReset();
(createStreamingManager as jest.Mock).mockImplementation((agent, streamOptions, options) => {
// Immediately trigger the connection state change to Connected
setTimeout(() => {
if (options.callbacks.onConnectionStateChange) {
Expand All @@ -128,6 +130,7 @@ describe('connect-to-manager', () => {
return Promise.resolve(mockStreamingManager);
});

(createChat as jest.Mock).mockReset();
(createChat as jest.Mock).mockResolvedValue({ chat: mockChat, chatMode: ChatMode.Functional });
});

Expand All @@ -137,9 +140,18 @@ describe('connect-to-manager', () => {

expect(result.streamingManager).toBe(mockStreamingManager);
expect(result.chat).toBe(mockChat);
expect(createChat).toHaveBeenCalledWith(
mockAgent,
mockAgentsApi,
mockAnalytics,
ChatMode.Functional,
true,
undefined
);
expect(createStreamingManager).toHaveBeenCalledWith(
mockAgent,
{
version: StreamApiVersion.V1,
output_resolution: 1080,
session_timeout: 30000,
stream_warmup: true,
Expand All @@ -155,14 +167,6 @@ describe('connect-to-manager', () => {
}),
})
);
expect(createChat).toHaveBeenCalledWith(
mockAgent,
mockAgentsApi,
mockAnalytics,
ChatMode.Functional,
true,
undefined
);
});

it('should initialize with existing chat', async () => {
Expand Down Expand Up @@ -233,7 +237,7 @@ describe('connect-to-manager', () => {
onVideoStateChange = jest.fn();
onAgentActivityStateChange = jest.fn();

(createStreamingManager as jest.Mock).mockImplementation((agentId, streamArgs, options) => {
(createStreamingManager as jest.Mock).mockImplementation((agent, streamOptions, options) => {
onConnectionStateChange = options.callbacks.onConnectionStateChange;
onVideoStateChange = options.callbacks.onVideoStateChange;
onAgentActivityStateChange = options.callbacks.onAgentActivityStateChange;
Expand Down Expand Up @@ -380,13 +384,16 @@ describe('connect-to-manager', () => {
expect(createStreamingManager).toHaveBeenCalledWith(
mockAgent,
{
version: StreamApiVersion.V1,
output_resolution: 720,
session_timeout: 60000,
stream_warmup: false,
compatibility_mode: 'on',
fluent: true,
},
expect.any(Object)
expect.not.objectContaining({
chatId: expect.anything(),
})
);
});

Expand All @@ -398,13 +405,40 @@ describe('connect-to-manager', () => {
expect(createStreamingManager).toHaveBeenCalledWith(
mockAgent,
{
version: StreamApiVersion.V1,
output_resolution: undefined,
session_timeout: undefined,
stream_warmup: undefined,
compatibility_mode: undefined,
fluent: undefined,
},
expect.any(Object)
expect.not.objectContaining({
chatId: expect.anything(),
})
);
});

it('should include analytics data when provided', async () => {
const optionsWithAnalytics = {
...mockOptions,
distinctId: 'analytics-user',
mixpanelAdditionalProperties: { plan: 'scale' },
};

await initializeStreamAndChat(mockAgent, optionsWithAnalytics, mockAgentsApi, mockAnalytics);

expect(createStreamingManager).toHaveBeenCalledWith(
mockAgent,
expect.objectContaining({
version: StreamApiVersion.V1,
end_user_data: {
distinct_id: 'analytics-user',
plan: 'scale',
},
}),
expect.not.objectContaining({
chatId: expect.anything(),
})
);
});
});
Expand Down Expand Up @@ -458,16 +492,14 @@ describe('connect-to-manager', () => {
(createStreamingManager as jest.Mock).mockRejectedValueOnce(streamError);
(createChat as jest.Mock).mockRejectedValueOnce(chatError);

// Should reject with the first error encountered
await expect(
initializeStreamAndChat(mockAgent, mockOptions, mockAgentsApi, mockAnalytics)
).rejects.toThrow();
await expect(initializeStreamAndChat(mockAgent, mockOptions, mockAgentsApi, mockAnalytics)).rejects.toThrow(
'Chat failed'
);
});
});

describe('Concurrent Operations', () => {
it('should handle streaming manager and chat creation concurrently', async () => {
// Simplified test to avoid timing issues
describe('Sequential Operations', () => {
it('should handle streaming manager and chat creation sequentially', async () => {
const result = await initializeStreamAndChat(mockAgent, mockOptions, mockAgentsApi, mockAnalytics);

expect(result.streamingManager).toBeDefined();
Expand All @@ -476,4 +508,46 @@ describe('connect-to-manager', () => {
expect(createChat).toHaveBeenCalled();
});
});

describe('Streams V2 Support', () => {
it('should use CreateStreamV2Options for expressive agents', async () => {
const expressiveAgent = {
...mockAgent,
presenter: {
type: 'expressive' as const,
voice: { type: Providers.Microsoft, voice_id: 'voice-123' },
},
};

await initializeStreamAndChat(expressiveAgent, mockOptions, mockAgentsApi, mockAnalytics);

expect(createStreamingManager).toHaveBeenCalledWith(
expressiveAgent,
{
version: StreamApiVersion.V2,
transport_provider: TransportProvider.Livekit,
chat_id: 'chat-123',
},
expect.objectContaining({
chatId: 'chat-123',
})
);
});

it('should use CreateStreamOptions for non-expressive agents', async () => {
await initializeStreamAndChat(mockAgent, mockOptions, mockAgentsApi, mockAnalytics);

expect(createStreamingManager).toHaveBeenCalledWith(
mockAgent,
expect.objectContaining({
version: StreamApiVersion.V1,
output_resolution: 1080,
session_timeout: 30000,
}),
expect.not.objectContaining({
chatId: expect.anything(),
})
);
});
});
});
83 changes: 72 additions & 11 deletions src/services/agent-manager/connect-to-manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { ChatModeDowngraded } from '$/errors';
import { StreamingManager, createStreamingManager } from '$/services/streaming-manager';
import {
ExtendedStreamOptions,
StreamApiVersion,
StreamingManager,
createStreamingManager,
} from '$/services/streaming-manager';
import {
Agent,
AgentActivityState,
Expand All @@ -9,24 +14,52 @@ import {
ChatMode,
ConnectionState,
CreateStreamOptions,
CreateStreamV2Options,
StreamEvents,
StreamType,
StreamingState,
TransportProvider,
} from '$/types';
import { isStreamsV2Agent } from '$/utils/agent';
import { Analytics } from '../analytics/mixpanel';
import { interruptTimestampTracker, latencyTimestampTracker } from '../analytics/timestamp-tracker';
import { createChat } from '../chat';

function getAgentStreamArgs(options?: AgentManagerOptions): CreateStreamOptions {
function getAgentStreamV2Options(options?: ConnectToManagerOptions): CreateStreamV2Options {
return {
transport_provider: TransportProvider.Livekit,
chat_id: options?.chatId,
};
}

function getAgentStreamV1Options(options?: ConnectToManagerOptions): CreateStreamOptions {
const { streamOptions } = options ?? {};

return {
const endUserData =
options?.distinctId || options?.mixpanelAdditionalProperties?.plan !== undefined
? {
...(options?.distinctId ? { distinct_id: options.distinctId } : {}),
...(options?.mixpanelAdditionalProperties?.plan !== undefined
? { plan: options.mixpanelAdditionalProperties?.plan }
: {}),
}
: undefined;

const streamArgs = {
output_resolution: streamOptions?.outputResolution,
session_timeout: streamOptions?.sessionTimeout,
stream_warmup: streamOptions?.streamWarmup,
compatibility_mode: streamOptions?.compatibilityMode,
fluent: streamOptions?.fluent,
};

return { ...streamArgs, ...(endUserData && { end_user_data: endUserData }) };
}

function getAgentStreamOptions(agent: Agent, options?: ConnectToManagerOptions): ExtendedStreamOptions {
return isStreamsV2Agent(agent.presenter.type)
? { version: StreamApiVersion.V2, ...getAgentStreamV2Options(options) }
: { version: StreamApiVersion.V1, ...getAgentStreamV1Options(options) };
}

function trackVideoStateChangeAnalytics(
Expand Down Expand Up @@ -129,19 +162,20 @@ type ConnectToManagerOptions = AgentManagerOptions & {
callbacks: AgentManagerOptions['callbacks'] & {
onVideoIdChange?: (videoId: string | null) => void;
};
chatId?: string;
};

function connectToManager(
agent: Agent,
options: ConnectToManagerOptions,
analytics: Analytics
): Promise<StreamingManager<CreateStreamOptions>> {
): Promise<StreamingManager<CreateStreamOptions | CreateStreamV2Options>> {
latencyTimestampTracker.reset();

return new Promise(async (resolve, reject) => {
try {
let streamingManager: StreamingManager<CreateStreamOptions>;
streamingManager = await createStreamingManager(agent, getAgentStreamArgs(options), {
let streamingManager: StreamingManager<CreateStreamOptions | CreateStreamV2Options>;
streamingManager = await createStreamingManager(agent, getAgentStreamOptions(agent, options), {
...options,
analytics,
callbacks: {
Expand Down Expand Up @@ -194,12 +228,39 @@ export async function initializeStreamAndChat(
agentsApi: AgentsAPI,
analytics: Analytics,
chat?: Chat
): Promise<{ chat?: Chat; streamingManager?: StreamingManager<CreateStreamOptions> }> {
const createChatPromise = createChat(agent, agentsApi, analytics, options.mode, options.persistentChat, chat);
const connectToManagerPromise = connectToManager(agent, options, analytics);

const [chatResult, streamingManager] = await Promise.all([createChatPromise, connectToManagerPromise]);
): Promise<{ chat?: Chat; streamingManager?: StreamingManager<CreateStreamOptions | CreateStreamV2Options> }> {
const resolveStreamAndChat = async () => {
if (isStreamsV2Agent(agent.presenter.type)) {
const chatResult = await createChat(
agent,
agentsApi,
analytics,
options.mode,
options.persistentChat,
chat
);
const streamingManager = await connectToManager(
agent,
{ ...options, chatId: chatResult.chat?.id },
analytics
);
return { chatResult, streamingManager };
} else {
const createChatPromise = createChat(
agent,
agentsApi,
analytics,
options.mode,
options.persistentChat,
chat
);
const connectToManagerPromise = connectToManager(agent, options, analytics);
const [chatResult, streamingManager] = await Promise.all([createChatPromise, connectToManagerPromise]);
return { chatResult, streamingManager };
}
};

const { chatResult, streamingManager } = await resolveStreamAndChat();
const { chat: newChat, chatMode } = chatResult;

if (chatMode && chatMode !== options.mode) {
Expand Down
2 changes: 1 addition & 1 deletion src/services/agent-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
id: getRandom(),
role: 'assistant',
content: script.input,
created_at: new Date(latencyTimestampTracker.get(true)).toISOString(),
created_at: new Date().toISOString(),
});
options.callbacks.onNewMessage?.([...items.messages], 'answer');
}
Expand Down
4 changes: 2 additions & 2 deletions src/services/streaming-manager/common.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CreateStreamOptions, PayloadType, StreamType } from '$/types';
import { CreateStreamOptions, CreateStreamV2Options, PayloadType, StreamType } from '$/types';

export const createStreamingLogger = (debug: boolean, prefix: string) => (message: string, extra?: any) =>
debug && console.log(`[${prefix}] ${message}`, extra ?? '');
Expand All @@ -7,7 +7,7 @@ export const createStreamingLogger = (debug: boolean, prefix: string) => (messag
* Shared type for all streaming managers (LiveKit, WebRTC, etc.)
* This type represents the return value of any streaming manager implementation
*/
export type StreamingManager<T extends CreateStreamOptions> = {
export type StreamingManager<T extends CreateStreamOptions | CreateStreamV2Options> = {
/**
* Method to send request to server to get clip or talk depending on payload
* @param payload The payload to send to the streaming service
Expand Down
Loading