Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@d-id/client-sdk",
"private": false,
"version": "1.1.56",
"version": "1.1.57",
"type": "module",
"description": "d-id client sdk",
"repository": {
Expand Down
84 changes: 76 additions & 8 deletions src/services/agent-manager/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ describe('createAgentManager', () => {

describe('interrupt', () => {
beforeEach(async () => {
mockStreamingManager.interruptAvailable = true;
await manager.connect();
});

Expand Down Expand Up @@ -742,12 +743,10 @@ describe('createAgentManager', () => {
expect(mockUnpublish).toHaveBeenCalled();
});

it('should throw error when unpublishMicrophoneStream is not available', async () => {
it('should no-op when unpublishMicrophoneStream is not available', async () => {
mockStreamingManager.unpublishMicrophoneStream = undefined;

await expect(manager.unpublishMicrophoneStream?.()).rejects.toThrow(
'unpublishMicrophoneStream is not available for this streaming manager'
);
await expect(manager.unpublishMicrophoneStream?.()).resolves.toBeUndefined();
});
});

Expand Down Expand Up @@ -795,12 +794,10 @@ describe('createAgentManager', () => {
expect(mockUnpublish).toHaveBeenCalled();
});

it('should throw error when unpublishCameraStream is not available', async () => {
it('should no-op when unpublishCameraStream is not available', async () => {
mockStreamingManager.unpublishCameraStream = undefined;

await expect(manager.unpublishCameraStream?.()).rejects.toThrow(
'unpublishCameraStream is not available for this streaming manager'
);
await expect(manager.unpublishCameraStream?.()).resolves.toBeUndefined();
});
});

Expand Down Expand Up @@ -835,4 +832,75 @@ describe('createAgentManager', () => {
});
});
});

describe('registerClientTool', () => {
let manager: AgentManager;

beforeEach(async () => {
mockStreamingManager.registerRpcMethod = jest.fn();
mockStreamingManager.unregisterRpcMethod = jest.fn();
manager = await createAgentManager('agent-123', mockOptions);
});

it('should register tool and call registerRpcMethod after connect', async () => {
const handler = jest.fn().mockResolvedValue('result');

await manager.connect();
manager.registerClientTool('testTool', handler);

expect(mockStreamingManager.registerRpcMethod).toHaveBeenCalledWith('testTool', expect.any(Function));
});

it('should buffer tool registration before connect and flush on connect', async () => {
const handler = jest.fn().mockResolvedValue('result');

manager.registerClientTool('testTool', handler);
expect(mockStreamingManager.registerRpcMethod).not.toHaveBeenCalled();

await manager.connect();
expect(mockStreamingManager.registerRpcMethod).toHaveBeenCalledWith('testTool', expect.any(Function));
});

it('should not call registerRpcMethod twice for same tool name', async () => {
const handler1 = jest.fn().mockResolvedValue('result1');
const handler2 = jest.fn().mockResolvedValue('result2');

await manager.connect();
manager.registerClientTool('testTool', handler1);
manager.registerClientTool('testTool', handler2);

expect(mockStreamingManager.registerRpcMethod).toHaveBeenCalledTimes(1);
});

it('should unregister tool from map and room', async () => {
const handler = jest.fn().mockResolvedValue('result');

await manager.connect();
manager.registerClientTool('testTool', handler);
manager.unregisterClientTool('testTool');

expect(mockStreamingManager.unregisterRpcMethod).toHaveBeenCalledWith('testTool');
});

it('should invoke latest handler when RPC is called after re-registration', async () => {
const handler1 = jest.fn().mockResolvedValue('result1');
const handler2 = jest.fn().mockResolvedValue('result2');

await manager.connect();
manager.registerClientTool('testTool', handler1);

// Get the RPC handler that was registered
const rpcHandler = mockStreamingManager.registerRpcMethod.mock.calls[0][1];

// Re-register with new handler (same name — only updates Map)
manager.registerClientTool('testTool', handler2);

// Invoke the RPC handler — should use handler2 from Map
const result = await rpcHandler({ payload: '{"key": "val"}' });

expect(handler1).not.toHaveBeenCalled();
expect(handler2).toHaveBeenCalledWith({ key: 'val' });
expect(result).toBe('result2');
});
});
});
50 changes: 47 additions & 3 deletions src/services/agent-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
Chat,
ChatMode,
ChatResponse,
ClientToolHandler,
ConnectionState,
CreateSessionV2Options,
CreateStreamOptions,
Expand Down Expand Up @@ -104,6 +105,9 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
};

const interrupt = ({ type }: Interrupt) => {
if (!items.streamingManager?.interruptAvailable) {
return;
}
if (!items.streamingManager?.isInterruptible) return;

const lastMessage = items.messages[items.messages.length - 1];
Expand All @@ -125,6 +129,43 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
}
};

const clientToolHandlers = new Map<string, ClientToolHandler>();

function createRpcHandler(toolName: string) {
return async (data: { payload: string }): Promise<string> => {
const handler = clientToolHandlers.get(toolName);
if (!handler) {
throw new Error(`No handler registered for client tool: ${toolName}`);
}
try {
const args = JSON.parse(data.payload);
return await handler(args);
} catch (error) {
throw new Error(`Client tool "${toolName}" failed: ${(error as Error).message}`);
}
};
}

function flushClientToolsToRoom() {
for (const [name] of clientToolHandlers) {
items.streamingManager?.unregisterRpcMethod?.(name);
items.streamingManager?.registerRpcMethod?.(name, createRpcHandler(name));
}
}

function registerClientTool(name: string, handler: ClientToolHandler): void {
const isNew = !clientToolHandlers.has(name);
clientToolHandlers.set(name, handler);
if (isNew) {
items.streamingManager?.registerRpcMethod?.(name, createRpcHandler(name));
}
}

function unregisterClientTool(name: string): void {
clientToolHandlers.delete(name);
items.streamingManager?.unregisterRpcMethod?.(name);
}

const loadedTimestamp = Date.now();
defer(() => {
analytics.track('agent-sdk', { event: 'loaded', ...getAnalyticsInfo(agentEntity) }, loadedTimestamp);
Expand Down Expand Up @@ -194,6 +235,8 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
items.socketManager = socketManager;
items.chat = chat;

flushClientToolsToRoom();

firstConnection = false;

analytics.enrich({
Expand Down Expand Up @@ -232,7 +275,6 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
agent: agentEntity,
getStreamType: () => items.streamingManager?.streamType,
getIsInterruptAvailable: () => items.streamingManager?.interruptAvailable ?? false,
getIsTriggersAvailable: () => items.streamingManager?.triggersAvailable ?? false,
starterMessages: agentEntity.knowledge?.starter_message || [],
getSTTToken: () => agentsApi.getSTTToken(agentEntity.id),
changeMode,
Expand Down Expand Up @@ -286,7 +328,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
},
async unpublishMicrophoneStream() {
if (!items.streamingManager?.unpublishMicrophoneStream) {
throw new Error('unpublishMicrophoneStream is not available for this streaming manager');
return;
}
return items.streamingManager.unpublishMicrophoneStream();
},
Expand All @@ -298,7 +340,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
},
async unpublishCameraStream() {
if (!items.streamingManager?.unpublishCameraStream) {
throw new Error('unpublishCameraStream is not available for this streaming manager');
return;
}
return items.streamingManager.unpublishCameraStream();
},
Expand Down Expand Up @@ -552,5 +594,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
});
},
interrupt,
registerClientTool,
unregisterClientTool,
};
}
12 changes: 10 additions & 2 deletions src/services/streaming-manager/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,15 @@ export type StreamingManager<T extends CreateStreamOptions | CreateSessionV2Opti
isInterruptible: boolean;

/**
* Whether triggers functionality is available for this stream
* Register an RPC method handler on the LiveKit room.
* Used internally by the agent-manager for client tool delegation.
* supported only for livekit manager
*/
registerRpcMethod?(method: string, handler: (data: any) => Promise<string>): void;

/**
* Unregister a previously registered RPC method.
* supported only for livekit manager
*/
triggersAvailable: boolean;
unregisterRpcMethod?(method: string): void;
};
14 changes: 11 additions & 3 deletions src/services/streaming-manager/livekit-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,20 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt

let token: string | undefined;
let url: string | undefined;
let interruptEnabled = true;

try {
const streamResponse = await streamApi.createStream({
transport: sessionOptions.transport,
chat_persist: sessionOptions.chat_persist ?? true,
});

const { id, session_token, session_url } = streamResponse;
const { id, session_token, session_url, interrupt_enabled } = streamResponse;
callbacks.onStreamCreated?.({ session_id: id, stream_id: id, agent_id: agentId });
sessionId = id;
token = session_token;
url = session_url;
interruptEnabled = interrupt_enabled ?? true;

await room.prepareConnection(url, token);
} catch (error) {
Expand Down Expand Up @@ -719,12 +721,18 @@ export async function createLiveKitStreamingManager<T extends CreateSessionV2Opt
publishCameraStream,
unpublishCameraStream,

registerRpcMethod(method: string, handler: (data: any) => Promise<string>) {
room?.registerRpcMethod(method, handler);
},
unregisterRpcMethod(method: string) {
room?.unregisterRpcMethod(method);
},

sessionId,
streamId: sessionId,
streamType,
interruptAvailable: true,
interruptAvailable: interruptEnabled,
isInterruptible: currentInterruptible,
triggersAvailable: false,
};
}

Expand Down
2 changes: 0 additions & 2 deletions src/services/streaming-manager/webrtc-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ export async function createWebRTCStreamingManager<T extends CreateStreamOptions
session_id,
fluent,
interrupt_enabled: interruptAvailable,
triggers_enabled: triggersAvailable,
} = await createStream(streamOptions, signal);
callbacks.onStreamCreated?.({ stream_id: streamIdFromServer, session_id: session_id as string, agent_id: agentId });
const peerConnection = new actualRTCPC({ iceServers: ice_servers });
Expand Down Expand Up @@ -398,7 +397,6 @@ export async function createWebRTCStreamingManager<T extends CreateStreamOptions
streamType,
interruptAvailable: interruptAvailable ?? false,
isInterruptible: true,
triggersAvailable: triggersAvailable ?? false,
};
}

Expand Down
20 changes: 15 additions & 5 deletions src/types/entities/agents/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { STTTokenResponse } from '@sdk/types';
import { Auth } from '@sdk/types/auth';
import {
AgentActivityState,
ClientToolHandler,
CompatibilityMode,
ConnectionState,
ConnectivityState,
Expand Down Expand Up @@ -194,11 +195,6 @@ export interface AgentManager {
*/
getIsInterruptAvailable: () => boolean;

/**
* Get if the stream supports triggers
*/
getIsTriggersAvailable: () => boolean;

/**
* Array of starter messages that will be sent to the agent when the chat starts
*/
Expand Down Expand Up @@ -284,4 +280,18 @@ export interface AgentManager {
* Only available for Fluent streams and when there's an active video to interrupt
*/
interrupt: (interrupt: Interrupt) => void;

/**
* Register a handler for a client tool. When the agent's LLM calls this tool,
* the handler executes on the client and returns the result to the LLM.
* @param name - Tool name (must match the tool name defined in the agent config)
* @param handler - Async function receiving args, must return a JSON string (max 15KiB)
*/
registerClientTool: (name: string, handler: ClientToolHandler) => void;

/**
* Remove a previously registered client tool handler.
* @param name - Tool name to unregister
*/
unregisterClientTool: (name: string) => void;
}
1 change: 0 additions & 1 deletion src/types/stream/rtc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ export interface ICreateStreamRequestResponse extends StickyRequest {
ice_servers: IceServer[];
fluent?: boolean;
interrupt_enabled?: boolean;
triggers_enabled?: boolean;
}

export interface IceCandidate {
Expand Down
2 changes: 2 additions & 0 deletions src/types/stream/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ export interface StreamInterruptPayload {
timestamp: number;
}

export type ClientToolHandler = (args: Record<string, unknown>) => Promise<string>;

export interface ToolCallingPayload {
execution_id: string;
tool_name: string;
Expand Down
1 change: 1 addition & 0 deletions src/types/stream/streams-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ export interface CreateSessionV2Response {
id: string;
session_url: string;
session_token: string;
interrupt_enabled: boolean;
}
2 changes: 1 addition & 1 deletion src/utils/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export function getAgentInfo(agent: Agent) {
maxResponseLength: promptCustomization?.max_response_length,
agentId: agent.id,
access: agent.access,
name: agent.preview_name,
agentName: agent.preview_name,
...(agent.access === 'public' ? { from: 'agent-template' } : {}),
};
}
Expand Down
Loading