From 49865394b443c64589b1729de53ddbddc6e2d245 Mon Sep 17 00:00:00 2001 From: dor-eitan <164745144+dor-eitan@users.noreply.github.com> Date: Wed, 15 Apr 2026 15:45:35 +0300 Subject: [PATCH 1/7] Merge pull request #359 from de-id/chore/upgrade-node-22 chore: upgrade Node.js from 20 to 22 --- .github/workflows/manual-e2e.yml | 4 ++-- .github/workflows/pr-main-e2e.yml | 4 ++-- .github/workflows/pr-prod-e2e.yml | 2 +- .github/workflows/publish-on-merge.yml | 2 +- .github/workflows/test.yml | 8 ++++---- .nvmrc | 1 + 6 files changed, 11 insertions(+), 10 deletions(-) create mode 100644 .nvmrc diff --git a/.github/workflows/manual-e2e.yml b/.github/workflows/manual-e2e.yml index 8294f152..149612b6 100644 --- a/.github/workflows/manual-e2e.yml +++ b/.github/workflows/manual-e2e.yml @@ -46,7 +46,7 @@ jobs: - name: Setup Node.js for SDK uses: actions/setup-node@v4 with: - node-version: 20 + node-version: 22 cache-dependency-path: agents-sdk/yarn.lock - name: Install Yarn @@ -80,7 +80,7 @@ jobs: - name: Setup Node.js for agents-ui uses: actions/setup-node@v4 with: - node-version: 20 + node-version: 22 - name: Render .npmrc for agents-ui working-directory: agents-ui diff --git a/.github/workflows/pr-main-e2e.yml b/.github/workflows/pr-main-e2e.yml index e3dafec2..2293b64e 100644 --- a/.github/workflows/pr-main-e2e.yml +++ b/.github/workflows/pr-main-e2e.yml @@ -42,7 +42,7 @@ jobs: - name: Setup Node.js for SDK uses: actions/setup-node@v4 with: - node-version: 20 + node-version: 22 cache-dependency-path: agents-sdk/yarn.lock - name: Install Yarn @@ -76,7 +76,7 @@ jobs: - name: Setup Node.js for agents-ui uses: actions/setup-node@v4 with: - node-version: 20 + node-version: 22 - name: Render .npmrc for agents-ui working-directory: agents-ui diff --git a/.github/workflows/pr-prod-e2e.yml b/.github/workflows/pr-prod-e2e.yml index fd9e82df..996abfd2 100644 --- a/.github/workflows/pr-prod-e2e.yml +++ b/.github/workflows/pr-prod-e2e.yml @@ -48,7 +48,7 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v4 with: - node-version: 20 + node-version: 22 - name: Render .npmrc for agents-ui working-directory: agents-ui diff --git a/.github/workflows/publish-on-merge.yml b/.github/workflows/publish-on-merge.yml index 79bf3c60..dbf9b186 100644 --- a/.github/workflows/publish-on-merge.yml +++ b/.github/workflows/publish-on-merge.yml @@ -43,7 +43,7 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v6 with: - node-version: 20 + node-version: 22 registry-url: "https://registry.npmjs.org" cache: "yarn" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9f22ccb0..05e0255a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -25,7 +25,7 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v4 with: - node-version: 20 + node-version: 22 cache: 'yarn' cache-dependency-path: yarn.lock @@ -46,7 +46,7 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v4 with: - node-version: 20 + node-version: 22 cache: 'yarn' cache-dependency-path: yarn.lock @@ -77,7 +77,7 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v4 with: - node-version: 20 + node-version: 22 cache: 'yarn' cache-dependency-path: yarn.lock @@ -100,7 +100,7 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v4 with: - node-version: 20 + node-version: 22 cache: 'yarn' cache-dependency-path: yarn.lock diff --git a/.nvmrc b/.nvmrc new file mode 100644 index 00000000..53d1c14d --- /dev/null +++ b/.nvmrc @@ -0,0 +1 @@ +v22 From 71d0124634fb282bc09d2606a5480d36169c17b0 Mon Sep 17 00:00:00 2001 From: dor-eitan <164745144+dor-eitan@users.noreply.github.com> Date: Mon, 20 Apr 2026 09:45:30 +0300 Subject: [PATCH 2/7] bugfix: create assistant message for streamed greetings (#363) The first assistant turn of a session (an unsolicited greeting) was being dropped because processChatEvent's message-creation branch only fired when the last message was a transcribed user turn. With an empty message list, every partial and answer hit the fallback early return and the greeting never appeared in the UI. Broaden the creation branch to also fire when there is no last message. Co-authored-by: Claude Opus 4.7 (1M context) --- .../socket-manager/message-queue.test.ts | 42 +++++++++++++++++++ src/services/socket-manager/message-queue.ts | 7 ++-- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/src/services/socket-manager/message-queue.test.ts b/src/services/socket-manager/message-queue.test.ts index 52e4277b..2eae84e5 100644 --- a/src/services/socket-manager/message-queue.test.ts +++ b/src/services/socket-manager/message-queue.test.ts @@ -190,6 +190,48 @@ describe('createMessageEventQueue', () => { }); }); + describe('first assistant turn (greeting)', () => { + it('creates an assistant message when partials arrive with no prior messages', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + onMessage(ChatProgress.Partial, { content: 'Hello', sequence: 0 }); + onMessage(ChatProgress.Partial, { content: ' there', sequence: 1 }); + onMessage(ChatProgress.Answer, { content: 'Hello there!' }); + + expect(mockItems.messages).toHaveLength(1); + expect(mockItems.messages[0]).toMatchObject({ + role: 'assistant', + content: 'Hello there!', + }); + expect(mockOnNewMessage).toHaveBeenCalled(); + }); + + it('streams partials live for a greeting before the final answer', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + onMessage(ChatProgress.Partial, { content: 'Hel', sequence: 0 }); + onMessage(ChatProgress.Partial, { content: 'lo', sequence: 1 }); + + expect(mockItems.messages).toHaveLength(1); + expect(mockItems.messages[0]).toMatchObject({ role: 'assistant', content: 'Hello' }); + expect(mockOnNewMessage).toHaveBeenCalled(); + const lastCall = mockOnNewMessage.mock.calls[mockOnNewMessage.mock.calls.length - 1]; + expect(lastCall[1]).toBe(ChatProgress.Partial); + }); + }); + describe('clearQueue function', () => { it('should expose clearQueue for external use', () => { const { clearQueue } = createMessageEventQueue( diff --git a/src/services/socket-manager/message-queue.ts b/src/services/socket-manager/message-queue.ts index 23448d7f..f5ff7a90 100644 --- a/src/services/socket-manager/message-queue.ts +++ b/src/services/socket-manager/message-queue.ts @@ -69,8 +69,9 @@ function processChatEvent( const lastMessage = items.messages[items.messages.length - 1]; let currentMessage: Message; - if (lastMessage?.transcribed && lastMessage.role === 'user') { - const initialContent = event === ChatProgress.Answer ? data.content || '' : ''; + if (lastMessage?.role === 'assistant') { + currentMessage = lastMessage; + } else if (!lastMessage || (lastMessage.transcribed && lastMessage.role === 'user')) { currentMessage = { id: data.id || `assistant-${Date.now()}`, role: data.role || 'assistant', @@ -78,8 +79,6 @@ function processChatEvent( created_at: data.created_at || new Date().toISOString(), }; items.messages.push(currentMessage); - } else if (lastMessage?.role === 'assistant') { - currentMessage = lastMessage; } else { return; } From cd2291936b71bd41d52a50cd58800d51b5726ffc Mon Sep 17 00:00:00 2001 From: dariusz-did Date: Mon, 20 Apr 2026 11:55:39 +0200 Subject: [PATCH 3/7] =?UTF-8?q?feat:=20add=20message.parts=20=E2=80=94=20s?= =?UTF-8?q?tructured=20content=20representation=20(#361)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add message.parts — structured content representation Add MessagePart type and optional parts field to Message interface. Create content-parser utility that parses markdown content (images, videos, links, HTML anchors) into typed MessagePart arrays. Integrate parser into all message construction paths: - Streaming pipeline (processChatEvent partials + answers) - Audio transcription (handleAudioTranscribedMessage) - REST chat (user message + assistant response) - Speak (assistant message) Export parseMessageParts utility and MessagePart type from SDK public API for consumer use. * fix: harden regexes against ReDoS (CodeQL polynomial backtracking) Restrict character classes to prevent catastrophic backtracking: - Alt text: [^\]* → [^\[\]]* (disallow nested brackets) - URLs: [^)]+ → [^)\s]+ (disallow whitespace in URLs) - HTML attrs: [^>]* → [^>]*? (lazy quantifier) * style: fix prettier formatting * refactor: use parseMessagePartsMemo consistently across internal SDK code Unify message parsing calls inside the SDK on the memoized variant. The non-memo parseMessageParts remains the public API export for consumers. --- src/index.ts | 1 + src/services/agent-manager/index.test.ts | 35 +++++ src/services/agent-manager/index.ts | 4 + .../socket-manager/message-queue.test.ts | 102 +++++++++++++ src/services/socket-manager/message-queue.ts | 3 + src/types/entities/agents/chat.ts | 7 + src/utils/content-parser.test.ts | 135 ++++++++++++++++++ src/utils/content-parser.ts | 119 +++++++++++++++ 8 files changed, 406 insertions(+) create mode 100644 src/utils/content-parser.test.ts create mode 100644 src/utils/content-parser.ts diff --git a/src/index.ts b/src/index.ts index c67b20ce..854fb3cd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,4 @@ export * from './errors'; export * from './services/agent-manager'; export * from './types'; +export { parseMessageParts } from './utils/content-parser'; diff --git a/src/services/agent-manager/index.test.ts b/src/services/agent-manager/index.test.ts index e5b914c0..b638f895 100644 --- a/src/services/agent-manager/index.test.ts +++ b/src/services/agent-manager/index.test.ts @@ -297,6 +297,30 @@ describe('createAgentManager', () => { }); }); + it('should populate parts on user message', async () => { + const mockCallback = mockOptions.callbacks.onNewMessage as jest.Mock; + mockCallback.mockClear(); + + await manager.chat('Hello, how are you?'); + + // First call is the user message + const [userMessages] = mockCallback.mock.calls[0]; + const userMsg = userMessages[userMessages.length - 1]; + expect(userMsg.parts).toEqual([{ type: 'text', text: 'Hello, how are you?' }]); + }); + + it('should populate parts on assistant response message', async () => { + const mockCallback = mockOptions.callbacks.onNewMessage as jest.Mock; + mockCallback.mockClear(); + + await manager.chat('Hello, how are you?'); + + // Second call is the answer + const [answerMessages] = mockCallback.mock.calls[1]; + const assistantMsg = answerMessages[answerMessages.length - 1]; + expect(assistantMsg.parts).toEqual([{ type: 'text', text: 'Agent response' }]); + }); + it('should validate chat request - empty message', async () => { await expect(manager.chat('')).rejects.toThrow('Message cannot be empty'); }); @@ -447,6 +471,17 @@ describe('createAgentManager', () => { expect(lastMessage.created_at).toBeDefined(); }); + it('should populate parts on speak message', async () => { + const mockCallback = mockOptions.callbacks.onNewMessage as jest.Mock; + mockCallback.mockClear(); + + await manager.speak('Hello from speak'); + + const [messages] = mockCallback.mock.calls[0]; + const lastMessage = messages[messages.length - 1]; + expect(lastMessage.parts).toEqual([{ type: 'text', text: 'Hello from speak' }]); + }); + it('should trigger onNewMessage with script object', async () => { const script = { type: 'text' as const, input: 'Hello from script', ssml: false }; const mockCallback = mockOptions.callbacks.onNewMessage as jest.Mock; diff --git a/src/services/agent-manager/index.ts b/src/services/agent-manager/index.ts index 0b719f37..a4175a80 100644 --- a/src/services/agent-manager/index.ts +++ b/src/services/agent-manager/index.ts @@ -21,6 +21,7 @@ import { ChatCreationFailed, ValidationError } from '@sdk/errors'; import { getRandom } from '@sdk/utils'; import { isStreamsV2Agent } from '@sdk/utils/agent'; import { isChatModeWithoutChat, isTextualChat } from '@sdk/utils/chat'; +import { parseMessagePartsMemo } from '@sdk/utils/content-parser'; import { createAgentsApi } from '../../api/agents'; import { getAgentInfo, getAnalyticsInfo } from '../../utils/analytics'; import { defer } from '../../utils/defer'; @@ -439,6 +440,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt id: getRandom(), role: 'user', content: userMessage, + parts: parseMessagePartsMemo(userMessage), created_at: new Date(latencyTimestampTracker.update()).toISOString(), }); @@ -451,6 +453,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt id: getRandom(), role: 'assistant', content: response.result || '', + parts: parseMessagePartsMemo(response.result || ''), created_at: new Date().toISOString(), context: response.context, matches: response.matches, @@ -568,6 +571,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt id: getRandom(), role: 'assistant', content: script.input, + parts: parseMessagePartsMemo(script.input), created_at: new Date().toISOString(), }); options.callbacks.onNewMessage?.([...items.messages], 'answer'); diff --git a/src/services/socket-manager/message-queue.test.ts b/src/services/socket-manager/message-queue.test.ts index 2eae84e5..8985414e 100644 --- a/src/services/socket-manager/message-queue.test.ts +++ b/src/services/socket-manager/message-queue.test.ts @@ -274,4 +274,106 @@ describe('createMessageEventQueue', () => { expect(lastMessage.content).toBe('Fresh'); }); }); + + describe('message parts population', () => { + it('should populate parts on partial messages', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + // Start with an existing assistant message so partials update it + mockItems.messages.push({ + id: 'assistant-1', + role: 'assistant', + content: '', + created_at: new Date().toISOString(), + }); + + onMessage(ChatProgress.Partial, { content: 'Hello ![img](https://example.com/pic.png)', sequence: 0 }); + + const lastCall = mockOnNewMessage.mock.calls[mockOnNewMessage.mock.calls.length - 1]; + const lastMessage = lastCall[0][lastCall[0].length - 1]; + expect(lastMessage.parts).toEqual([ + { type: 'text', text: 'Hello ' }, + { type: 'image', src: 'https://example.com/pic.png', alt: 'img' }, + ]); + }); + + it('should populate parts on answer messages', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + mockItems.messages.push({ + id: 'user-1', + role: 'user', + content: 'test', + created_at: new Date().toISOString(), + transcribed: true, + }); + + onMessage(ChatProgress.Answer, { content: 'Check [this](https://example.com)' }); + + const lastCall = mockOnNewMessage.mock.calls[mockOnNewMessage.mock.calls.length - 1]; + const lastMessage = lastCall[0][lastCall[0].length - 1]; + expect(lastMessage.parts).toEqual([ + { type: 'text', text: 'Check ' }, + { type: 'link', href: 'https://example.com', label: 'this' }, + ]); + }); + + it('should populate parts for plain text content', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + mockItems.messages.push({ + id: 'user-1', + role: 'user', + content: 'test', + created_at: new Date().toISOString(), + transcribed: true, + }); + + onMessage(ChatProgress.Answer, { content: 'Just plain text' }); + + const lastCall = mockOnNewMessage.mock.calls[mockOnNewMessage.mock.calls.length - 1]; + const lastMessage = lastCall[0][lastCall[0].length - 1]; + expect(lastMessage.parts).toEqual([{ type: 'text', text: 'Just plain text' }]); + }); + + it('should populate parts on transcribed user messages', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + onMessage(ChatProgress.Transcribe, { + content: 'Hello there', + role: 'user', + id: 'user-transcribed-1', + }); + + const lastCall = mockOnNewMessage.mock.calls[mockOnNewMessage.mock.calls.length - 1]; + const lastMessage = lastCall[0][lastCall[0].length - 1]; + expect(lastMessage.role).toBe('user'); + expect(lastMessage.transcribed).toBe(true); + expect(lastMessage.parts).toEqual([{ type: 'text', text: 'Hello there' }]); + }); + }); }); diff --git a/src/services/socket-manager/message-queue.ts b/src/services/socket-manager/message-queue.ts index f5ff7a90..2bba2eab 100644 --- a/src/services/socket-manager/message-queue.ts +++ b/src/services/socket-manager/message-queue.ts @@ -1,6 +1,7 @@ import { Agent, AgentManagerOptions, ChatProgress, StreamEvents } from '@sdk/types'; import { Message } from '@sdk/types/entities/agents/chat'; import { getStreamAnalyticsProps } from '@sdk/utils/analytics'; +import { parseMessagePartsMemo } from '@sdk/utils/content-parser'; import { AgentManagerItems } from '../agent-manager'; import { Analytics } from '../analytics/mixpanel'; @@ -43,6 +44,7 @@ function handleAudioTranscribedMessage( id: data.id || `user-${Date.now()}`, role: data.role, content: data.content, + parts: parseMessagePartsMemo(data.content), created_at: data.created_at || new Date().toISOString(), transcribed: true, }; @@ -95,6 +97,7 @@ function processChatEvent( if (currentMessage.content !== messageContent || event === ChatProgress.Answer) { currentMessage.content = messageContent; + currentMessage.parts = parseMessagePartsMemo(messageContent); onNewMessage?.([...items.messages], event); } diff --git a/src/types/entities/agents/chat.ts b/src/types/entities/agents/chat.ts index 4d9b67d3..8523dfeb 100644 --- a/src/types/entities/agents/chat.ts +++ b/src/types/entities/agents/chat.ts @@ -24,10 +24,17 @@ export type RatingPayload = Omit< 'owner_id' | 'id' | 'created_at' | 'modified_at' | 'created_by' | 'external_id' | 'agent_id' | 'chat_id' >; +export type MessagePart = + | { type: 'text'; text: string } + | { type: 'image'; src: string; alt: string; mimeType?: string } + | { type: 'video'; src: string; alt: string; thumbnail?: string } + | { type: 'link'; href: string; label: string }; + export interface Message { id: string; role?: 'system' | 'assistant' | 'user' | 'function' | 'tool'; content: string; + parts?: MessagePart[]; created_at?: string; matches?: ChatResponse['matches']; context?: string; diff --git a/src/utils/content-parser.test.ts b/src/utils/content-parser.test.ts new file mode 100644 index 00000000..cb26abf5 --- /dev/null +++ b/src/utils/content-parser.test.ts @@ -0,0 +1,135 @@ +import { parseMessageParts, parseMessagePartsMemo } from './content-parser'; + +describe('parseMessageParts', () => { + describe('plain text', () => { + it('should return a single text part for plain text', () => { + const result = parseMessageParts('Hello, world!'); + expect(result).toEqual([{ type: 'text', text: 'Hello, world!' }]); + }); + + it('should return empty array for empty string', () => { + const result = parseMessageParts(''); + expect(result).toEqual([]); + }); + + it('should return a single text part for whitespace-only content', () => { + const result = parseMessageParts(' \n '); + expect(result).toEqual([{ type: 'text', text: ' \n ' }]); + }); + }); + + describe('markdown images', () => { + it('should parse a markdown image', () => { + const result = parseMessageParts('![alt text](https://example.com/image.png)'); + expect(result).toEqual([{ type: 'image', src: 'https://example.com/image.png', alt: 'alt text' }]); + }); + + it('should detect GIF images with mimeType', () => { + const result = parseMessageParts('![animation](https://example.com/funny.gif)'); + expect(result).toEqual([ + { type: 'image', src: 'https://example.com/funny.gif', alt: 'animation', mimeType: 'image/gif' }, + ]); + }); + + it('should handle image with empty alt text', () => { + const result = parseMessageParts('![](https://example.com/image.jpg)'); + expect(result).toEqual([{ type: 'image', src: 'https://example.com/image.jpg', alt: '' }]); + }); + }); + + describe('markdown video (thumbnail syntax)', () => { + it('should parse video with thumbnail syntax [![alt](thumb)](video)', () => { + const result = parseMessageParts( + '[![video title](https://example.com/thumb.jpg)](https://example.com/video.mp4)' + ); + expect(result).toEqual([ + { + type: 'video', + src: 'https://example.com/video.mp4', + alt: 'video title', + thumbnail: 'https://example.com/thumb.jpg', + }, + ]); + }); + }); + + describe('markdown links', () => { + it('should parse a markdown link', () => { + const result = parseMessageParts('[click here](https://example.com)'); + expect(result).toEqual([{ type: 'link', href: 'https://example.com', label: 'click here' }]); + }); + }); + + describe('HTML links', () => { + it('should parse an HTML anchor tag', () => { + const result = parseMessageParts('Visit'); + expect(result).toEqual([{ type: 'link', href: 'https://example.com', label: 'Visit' }]); + }); + }); + + describe('mixed content', () => { + it('should parse text interleaved with an image', () => { + const result = parseMessageParts('Hello ![pic](https://example.com/pic.png) world'); + expect(result).toEqual([ + { type: 'text', text: 'Hello ' }, + { type: 'image', src: 'https://example.com/pic.png', alt: 'pic' }, + { type: 'text', text: ' world' }, + ]); + }); + + it('should parse multiple different part types in order', () => { + const content = + 'Check this out: ![img](https://example.com/img.png)\nAnd visit [our site](https://example.com)'; + const result = parseMessageParts(content); + expect(result).toEqual([ + { type: 'text', text: 'Check this out: ' }, + { type: 'image', src: 'https://example.com/img.png', alt: 'img' }, + { type: 'text', text: '\nAnd visit ' }, + { type: 'link', href: 'https://example.com', label: 'our site' }, + ]); + }); + + it('should handle content starting with an asset', () => { + const result = parseMessageParts('![img](https://example.com/a.png) followed by text'); + expect(result).toEqual([ + { type: 'image', src: 'https://example.com/a.png', alt: 'img' }, + { type: 'text', text: ' followed by text' }, + ]); + }); + + it('should handle content ending with an asset', () => { + const result = parseMessageParts('text then ![img](https://example.com/a.png)'); + expect(result).toEqual([ + { type: 'text', text: 'text then ' }, + { type: 'image', src: 'https://example.com/a.png', alt: 'img' }, + ]); + }); + }); + + describe('incomplete markdown (streaming partials)', () => { + it('should keep incomplete image markdown as text', () => { + const result = parseMessageParts('Hello ![loading](https://example.com/pic'); + expect(result).toEqual([{ type: 'text', text: 'Hello ![loading](https://example.com/pic' }]); + }); + + it('should keep incomplete link markdown as text', () => { + const result = parseMessageParts('Check [this](https://exam'); + expect(result).toEqual([{ type: 'text', text: 'Check [this](https://exam' }]); + }); + }); +}); + +describe('parseMessagePartsMemo', () => { + it('should return same reference for same input', () => { + const content = 'Hello ![img](https://example.com/pic.png)'; + const result1 = parseMessagePartsMemo(content); + const result2 = parseMessagePartsMemo(content); + expect(result1).toBe(result2); + }); + + it('should return new result for different input', () => { + const result1 = parseMessagePartsMemo('Hello'); + const result2 = parseMessagePartsMemo('World'); + expect(result1).not.toBe(result2); + }); +}); diff --git a/src/utils/content-parser.ts b/src/utils/content-parser.ts new file mode 100644 index 00000000..9c99d3c7 --- /dev/null +++ b/src/utils/content-parser.ts @@ -0,0 +1,119 @@ +import { MessagePart } from '@sdk/types/entities/agents/chat'; + +// Video thumbnail syntax: [![alt](thumbnail-url)](video-url) +const VIDEO_THUMBNAIL_RE = /\[!\[([^\[\]]*)\]\(([^)\s]+)\)\]\(([^)\s]+)\)/g; + +// Standard markdown image: ![alt](url) +const IMAGE_RE = /!\[([^\[\]]*)\]\(([^)\s]+)\)/g; + +// Standard markdown link: [label](url) — but NOT images (no leading !) +const MD_LINK_RE = /(?label +const HTML_LINK_RE = /]*?>([^<]*)<\/a>/gi; + +interface MatchEntry { + index: number; + length: number; + part: MessagePart; +} + +export function parseMessageParts(content: string): MessagePart[] { + if (content.length === 0) { + return []; + } + + const matches: MatchEntry[] = []; + + let m: RegExpExecArray | null; + + // 1. Video thumbnail: [![alt](thumb)](video) — must be matched first + VIDEO_THUMBNAIL_RE.lastIndex = 0; + while ((m = VIDEO_THUMBNAIL_RE.exec(content)) !== null) { + matches.push({ + index: m.index, + length: m[0].length, + part: { type: 'video', src: m[3], alt: m[1], thumbnail: m[2] }, + }); + } + + // 2. Markdown images: ![alt](url) — skip those already consumed by video thumbnails + IMAGE_RE.lastIndex = 0; + while ((m = IMAGE_RE.exec(content)) !== null) { + const overlaps = matches.some(entry => m!.index >= entry.index && m!.index < entry.index + entry.length); + if (!overlaps) { + const src = m[2]; + const part: MessagePart = { type: 'image', src, alt: m[1] }; + if (src.toLowerCase().endsWith('.gif')) { + (part as Extract).mimeType = 'image/gif'; + } + matches.push({ index: m.index, length: m[0].length, part }); + } + } + + // 3. Markdown links: [label](url) — skip those already consumed + MD_LINK_RE.lastIndex = 0; + while ((m = MD_LINK_RE.exec(content)) !== null) { + const overlaps = matches.some(entry => m!.index >= entry.index && m!.index < entry.index + entry.length); + if (!overlaps) { + matches.push({ + index: m.index, + length: m[0].length, + part: { type: 'link', href: m[2], label: m[1] }, + }); + } + } + + // 4. HTML links: label — skip those already consumed + HTML_LINK_RE.lastIndex = 0; + while ((m = HTML_LINK_RE.exec(content)) !== null) { + const overlaps = matches.some(entry => m!.index >= entry.index && m!.index < entry.index + entry.length); + if (!overlaps) { + matches.push({ + index: m.index, + length: m[0].length, + part: { type: 'link', href: m[1], label: m[2] }, + }); + } + } + + // No matches → single text part + if (matches.length === 0) { + return [{ type: 'text', text: content }]; + } + + // Sort by position + matches.sort((a, b) => a.index - b.index); + + // Build parts array with text gaps + const parts: MessagePart[] = []; + let cursor = 0; + + for (const entry of matches) { + if (entry.index > cursor) { + parts.push({ type: 'text', text: content.slice(cursor, entry.index) }); + } + parts.push(entry.part); + cursor = entry.index + entry.length; + } + + if (cursor < content.length) { + parts.push({ type: 'text', text: content.slice(cursor) }); + } + + return parts; +} + +// Single-entry memoization — optimal for streaming where the same content string +// is checked multiple times per render cycle +let memoKey: string = ''; +let memoValue: MessagePart[] = []; + +export function parseMessagePartsMemo(content: string): MessagePart[] { + if (content === memoKey) { + return memoValue; + } + memoKey = content; + memoValue = parseMessageParts(content); + return memoValue; +} From 308866f54753b67370bfbf07a1f8230924e71495 Mon Sep 17 00:00:00 2001 From: dariusz-did Date: Mon, 20 Apr 2026 16:54:36 +0200 Subject: [PATCH 4/7] refactor: make Message.parts non-optional (#364) Change Message.parts from optional to required so consumers can rely on it without defensive fallbacks. This closes a narrow contract gap where the SDK briefly pushes a Message into items.messages before the first content chunk arrives, leaving parts undefined. Initialize parts: [] in the only construction site that lacked it (streaming assistant message in message-queue.ts). All other paths already populated parts via parseMessagePartsMemo. This is a type-level tightening matching the actual runtime contract. --- src/services/agent-manager/index.test.ts | 2 +- src/services/socket-manager/message-queue.test.ts | 9 +++++++++ src/services/socket-manager/message-queue.ts | 1 + src/types/entities/agents/chat.ts | 2 +- 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/services/agent-manager/index.test.ts b/src/services/agent-manager/index.test.ts index b638f895..b2c51c51 100644 --- a/src/services/agent-manager/index.test.ts +++ b/src/services/agent-manager/index.test.ts @@ -140,7 +140,7 @@ describe('createAgentManager', () => { it('should handle initial messages correctly', async () => { const initialMessages = [ - { id: '1', role: 'user' as const, content: 'Hello', created_at: new Date().toISOString() }, + { id: '1', role: 'user' as const, content: 'Hello', parts: [], created_at: new Date().toISOString() }, ]; (getInitialMessages as jest.Mock).mockReturnValue(initialMessages); diff --git a/src/services/socket-manager/message-queue.test.ts b/src/services/socket-manager/message-queue.test.ts index 8985414e..f2d2d924 100644 --- a/src/services/socket-manager/message-queue.test.ts +++ b/src/services/socket-manager/message-queue.test.ts @@ -51,6 +51,7 @@ describe('createMessageEventQueue', () => { id: 'user-1', role: 'user', content: 'first question', + parts: [], created_at: new Date().toISOString(), transcribed: true, }); @@ -86,6 +87,7 @@ describe('createMessageEventQueue', () => { id: 'user-1', role: 'user', content: 'test', + parts: [], created_at: new Date().toISOString(), transcribed: true, }); @@ -111,6 +113,7 @@ describe('createMessageEventQueue', () => { id: 'user-1', role: 'user', content: 'test', + parts: [], created_at: new Date().toISOString(), transcribed: true, }); @@ -137,6 +140,7 @@ describe('createMessageEventQueue', () => { id: 'user-1', role: 'user', content: 'test', + parts: [], created_at: new Date().toISOString(), transcribed: true, }); @@ -164,6 +168,7 @@ describe('createMessageEventQueue', () => { id: 'user-1', role: 'user', content: 'first message', + parts: [], created_at: new Date().toISOString(), transcribed: true, }); @@ -258,6 +263,7 @@ describe('createMessageEventQueue', () => { id: 'user-1', role: 'user', content: 'test', + parts: [], created_at: new Date().toISOString(), transcribed: true, }); @@ -290,6 +296,7 @@ describe('createMessageEventQueue', () => { id: 'assistant-1', role: 'assistant', content: '', + parts: [], created_at: new Date().toISOString(), }); @@ -316,6 +323,7 @@ describe('createMessageEventQueue', () => { id: 'user-1', role: 'user', content: 'test', + parts: [], created_at: new Date().toISOString(), transcribed: true, }); @@ -343,6 +351,7 @@ describe('createMessageEventQueue', () => { id: 'user-1', role: 'user', content: 'test', + parts: [], created_at: new Date().toISOString(), transcribed: true, }); diff --git a/src/services/socket-manager/message-queue.ts b/src/services/socket-manager/message-queue.ts index 2bba2eab..930d2dd5 100644 --- a/src/services/socket-manager/message-queue.ts +++ b/src/services/socket-manager/message-queue.ts @@ -78,6 +78,7 @@ function processChatEvent( id: data.id || `assistant-${Date.now()}`, role: data.role || 'assistant', content: data.content || '', + parts: [], created_at: data.created_at || new Date().toISOString(), }; items.messages.push(currentMessage); diff --git a/src/types/entities/agents/chat.ts b/src/types/entities/agents/chat.ts index 8523dfeb..fd5d075d 100644 --- a/src/types/entities/agents/chat.ts +++ b/src/types/entities/agents/chat.ts @@ -34,7 +34,7 @@ export interface Message { id: string; role?: 'system' | 'assistant' | 'user' | 'function' | 'tool'; content: string; - parts?: MessagePart[]; + parts: MessagePart[]; created_at?: string; matches?: ChatResponse['matches']; context?: string; From fd0f3f4f458913bb645dc4736420ddc4af7fa56d Mon Sep 17 00:00:00 2001 From: Dariusz Date: Tue, 21 Apr 2026 08:51:24 +0200 Subject: [PATCH 5/7] v1.1.58 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 7e3f4db0..d14dd80b 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@d-id/client-sdk", "private": false, - "version": "1.1.57", + "version": "1.1.58", "type": "module", "description": "d-id client sdk", "repository": { From 042b8d56e0a87f045ebdccda44bf656f3f6292b8 Mon Sep 17 00:00:00 2001 From: dariusz-did Date: Tue, 21 Apr 2026 10:05:29 +0200 Subject: [PATCH 6/7] fix: preserve multiple assistant messages within the same turn (#362) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: preserve multiple assistant messages within the same turn When a backend emits more than one `chat/answer` event for a single user turn (e.g. after a client tool call, or when the agent sends several messages in a row), the previous assistant message was being overwritten because `processChatEvent` reused the last assistant message in the list without comparing ids. Detect the case via `data.id !== lastMessage.id` on `ChatProgress.Answer` and push a new message, clearing the streaming buffer so its content does not leak into the new one. Same id or missing id keeps the existing overwrite behaviour for backward compatibility. Asana: https://app.asana.com/1/856614567666442/project/1213882276505882/task/1214081847771983 * style: apply prettier formatting * fix: detect new assistant message on partial events too New assistant messages typically start streaming with Partial events before the final Answer, so the id-change discriminator must fire on both event types — otherwise the first Partial of the new message overwrites the previous one. --- .../socket-manager/message-queue.test.ts | 148 ++++++++++++++++++ src/services/socket-manager/message-queue.ts | 20 ++- 2 files changed, 164 insertions(+), 4 deletions(-) diff --git a/src/services/socket-manager/message-queue.test.ts b/src/services/socket-manager/message-queue.test.ts index f2d2d924..ddee43ce 100644 --- a/src/services/socket-manager/message-queue.test.ts +++ b/src/services/socket-manager/message-queue.test.ts @@ -281,6 +281,154 @@ describe('createMessageEventQueue', () => { }); }); + describe('multi-message assistant turns', () => { + beforeEach(() => { + mockItems.messages.push({ + id: 'user-1', + role: 'user', + content: 'please book a meeting', + parts: [], + created_at: new Date().toISOString(), + transcribed: true, + }); + }); + + it('should append a new assistant message when answer arrives with a different id', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + onMessage(ChatProgress.Answer, { id: 'assistant-1', content: "ok, i'll book a meeting" }); + onMessage(ChatProgress.Answer, { id: 'assistant-2', content: 'i booked a meeting for you' }); + + const assistantMessages = mockItems.messages.filter(m => m.role === 'assistant'); + expect(assistantMessages).toHaveLength(2); + expect(assistantMessages[0].id).toBe('assistant-1'); + expect(assistantMessages[0].content).toBe("ok, i'll book a meeting"); + expect(assistantMessages[1].id).toBe('assistant-2'); + expect(assistantMessages[1].content).toBe('i booked a meeting for you'); + }); + + it('should preserve the first assistant message when a second arrives after a tool call', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + onMessage(ChatProgress.Answer, { id: 'assistant-1', content: "ok, i'll book a meeting" }); + // ToolCalling / ToolResult events are dispatched via a separate callback and do not + // touch messages; simulating that gap here means the next answer event arrives with + // a fresh id. + onMessage(ChatProgress.Answer, { id: 'assistant-2', content: 'i booked a meeting for you' }); + + expect(mockItems.messages.map(m => m.content)).toEqual([ + 'please book a meeting', + "ok, i'll book a meeting", + 'i booked a meeting for you', + ]); + }); + + it('should overwrite the last assistant message when answer has the same id', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + onMessage(ChatProgress.Answer, { id: 'assistant-1', content: 'first draft' }); + onMessage(ChatProgress.Answer, { id: 'assistant-1', content: 'final answer' }); + + const assistantMessages = mockItems.messages.filter(m => m.role === 'assistant'); + expect(assistantMessages).toHaveLength(1); + expect(assistantMessages[0].content).toBe('final answer'); + }); + + it('should overwrite the last assistant message when answer has no id (legacy backends)', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + onMessage(ChatProgress.Answer, { content: 'first' }); + onMessage(ChatProgress.Answer, { content: 'second' }); + + const assistantMessages = mockItems.messages.filter(m => m.role === 'assistant'); + expect(assistantMessages).toHaveLength(1); + expect(assistantMessages[0].content).toBe('second'); + }); + + it('should not leak content from the previous assistant message into the new one', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + onMessage(ChatProgress.Answer, { id: 'assistant-1', content: "ok, i'll book a meeting" }); + onMessage(ChatProgress.Answer, { id: 'assistant-2', content: 'done' }); + + const assistantMessages = mockItems.messages.filter(m => m.role === 'assistant'); + expect(assistantMessages).toHaveLength(2); + expect(assistantMessages[1].content).toBe('done'); + expect(assistantMessages[1].content).not.toContain("ok, i'll book"); + }); + + it('should keep streaming partials into the same assistant message', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + onMessage(ChatProgress.Partial, { content: 'Hello', sequence: 0 }); + onMessage(ChatProgress.Partial, { content: ' World', sequence: 1 }); + + const assistantMessages = mockItems.messages.filter(m => m.role === 'assistant'); + expect(assistantMessages).toHaveLength(1); + expect(assistantMessages[0].content).toBe('Hello World'); + }); + + it('should append a new assistant message when partials arrive with a different id', () => { + const { onMessage } = createMessageEventQueue( + mockAnalytics, + mockItems, + mockOptions, + mockAgent, + mockOnStreamDone + ); + + onMessage(ChatProgress.Partial, { id: 'assistant-1', content: 'first ', sequence: 0 }); + onMessage(ChatProgress.Partial, { id: 'assistant-1', content: 'message', sequence: 1 }); + onMessage(ChatProgress.Answer, { id: 'assistant-1', content: 'first message' }); + + onMessage(ChatProgress.Partial, { id: 'assistant-2', content: 'second ', sequence: 0 }); + onMessage(ChatProgress.Partial, { id: 'assistant-2', content: 'message', sequence: 1 }); + onMessage(ChatProgress.Answer, { id: 'assistant-2', content: 'second message' }); + + const assistantMessages = mockItems.messages.filter(m => m.role === 'assistant'); + expect(assistantMessages).toHaveLength(2); + expect(assistantMessages[0].content).toBe('first message'); + expect(assistantMessages[1].content).toBe('second message'); + expect(assistantMessages[1].content).not.toContain('first'); + }); + }); + describe('message parts population', () => { it('should populate parts on partial messages', () => { const { onMessage } = createMessageEventQueue( diff --git a/src/services/socket-manager/message-queue.ts b/src/services/socket-manager/message-queue.ts index 930d2dd5..91ba7649 100644 --- a/src/services/socket-manager/message-queue.ts +++ b/src/services/socket-manager/message-queue.ts @@ -57,7 +57,8 @@ function processChatEvent( data: any, chatEventQueue: ChatEventQueue, items: AgentManagerItems, - onNewMessage: AgentManagerOptions['callbacks']['onNewMessage'] + onNewMessage: AgentManagerOptions['callbacks']['onNewMessage'], + clearQueue: () => void ) { if (event === ChatProgress.Transcribe && data.content) { handleAudioTranscribedMessage(data, items, onNewMessage); @@ -70,10 +71,21 @@ function processChatEvent( const lastMessage = items.messages[items.messages.length - 1]; + // A new assistant message within the same turn (e.g. after a client tool call, or several + // assistant messages in a row) is signalled by a chat event whose `id` differs from the + // last assistant message. The new message typically starts with `Partial` events and ends + // with `Answer`, so both branches must detect the id change — otherwise the SDK overwrites + // the previous message on the first partial of the new one. + const isNewAssistantMessage = data.id && lastMessage?.role === 'assistant' && lastMessage.id !== data.id; + let currentMessage: Message; - if (lastMessage?.role === 'assistant') { + if (lastMessage?.role === 'assistant' && !isNewAssistantMessage) { currentMessage = lastMessage; - } else if (!lastMessage || (lastMessage.transcribed && lastMessage.role === 'user')) { + } else if (!lastMessage || (lastMessage.transcribed && lastMessage.role === 'user') || isNewAssistantMessage) { + if (isNewAssistantMessage) { + // Reset the streaming buffer so the next message does not inherit the previous one's content. + clearQueue(); + } currentMessage = { id: data.id || `assistant-${Date.now()}`, role: data.role || 'assistant', @@ -132,7 +144,7 @@ export function createMessageEventQueue( : event === StreamEvents.ChatAudioTranscribed ? ChatProgress.Transcribe : (event as ChatProgress); - processChatEvent(chatEvent, data, chatEventQueue, items, onNewMessage); + processChatEvent(chatEvent, data, chatEventQueue, items, onNewMessage, clearQueue); if (chatEvent === ChatProgress.Answer) { analytics.track('agent-message-received', { From 04cc37d2a9056b0d7cf7f5ee4f4b9b557181cda9 Mon Sep 17 00:00:00 2001 From: dariusz-did Date: Thu, 23 Apr 2026 14:26:37 +0200 Subject: [PATCH 7/7] fix: dispatch new tool-call/* data channel events (#366) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: dispatch new tool-call/* data channel events Backend migrated tool execution notifications from tool/calling + tool/result to tool-call/started + tool-call/done + tool-call/error. Add the new subjects to the dispatch table and route tool-call/started through handleToolEvents so the SDK emits AgentActivityState.ToolActive again. Full onToolEvent payload rewire is tracked in a follow-up task. * chore: remove unused ToolCalling/ToolResult events Backend no longer emits tool/calling or tool/result — the new tool-call/* subjects replaced them. This removes the dead enum values, payload types, dispatch entries, and the onToolEvent callback whose signature was pinned to the legacy payloads. The public onToolEvent callback will be reintroduced with the new payload shapes as part of the deferred full-support task. Addresses review feedback on #366. * feat: wire onToolEvent to new tool-call/* payloads Adds ToolCallStartedPayload, ToolCallDonePayload and ToolCallErrorPayload type definitions matching the backend wire format (call_id, name, input, output, timestamp; plus duration_ms and extra on done/error). Restores the public onToolEvent callback with an overloaded signature discriminated by the event argument, and invokes it from handleToolEvents for all three tool-call events. tool-call/started still drives the ToolActive activity state transition; tool-call/done and tool-call/error only forward the payload and leave state transitions to the existing stream-video/done path. --- .../socket-manager/message-queue.test.ts | 5 +- .../streaming-manager/livekit-manager.test.ts | 200 ++++++++++-------- .../streaming-manager/livekit-manager.ts | 25 ++- src/types/entities/agents/manager.ts | 13 +- src/types/stream/stream.ts | 52 +++-- 5 files changed, 172 insertions(+), 123 deletions(-) diff --git a/src/services/socket-manager/message-queue.test.ts b/src/services/socket-manager/message-queue.test.ts index ddee43ce..2b1deb2a 100644 --- a/src/services/socket-manager/message-queue.test.ts +++ b/src/services/socket-manager/message-queue.test.ts @@ -323,9 +323,8 @@ describe('createMessageEventQueue', () => { ); onMessage(ChatProgress.Answer, { id: 'assistant-1', content: "ok, i'll book a meeting" }); - // ToolCalling / ToolResult events are dispatched via a separate callback and do not - // touch messages; simulating that gap here means the next answer event arrives with - // a fresh id. + // Tool-call events are dispatched via a separate path and do not touch messages; + // simulating that gap here means the next answer event arrives with a fresh id. onMessage(ChatProgress.Answer, { id: 'assistant-2', content: 'i booked a meeting for you' }); expect(mockItems.messages.map(m => m.content)).toEqual([ diff --git a/src/services/streaming-manager/livekit-manager.test.ts b/src/services/streaming-manager/livekit-manager.test.ts index 118fa076..933fa9ca 100644 --- a/src/services/streaming-manager/livekit-manager.test.ts +++ b/src/services/streaming-manager/livekit-manager.test.ts @@ -1141,20 +1141,14 @@ describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { }); describe('Enum values', () => { - it('should have correct StreamEvents enum values for tool events', () => { - // ASSERT: - expect(StreamEvents.ToolCalling).toBe('tool/calling'); - expect(StreamEvents.ToolResult).toBe('tool/result'); - }); - it('should have correct AgentActivityState enum value for ToolActive', () => { // ASSERT: expect(AgentActivityState.ToolActive).toBe('TOOL_ACTIVE'); }); }); - describe('handleDataReceived - tool/calling', () => { - it('should transition to ToolActive and call onToolEvent on tool/calling', async () => { + describe('handleDataReceived - tool-call/started', () => { + it('should transition to ToolActive and forward payload via onToolEvent on tool-call/started', async () => { // ARRANGE: const onAgentActivityStateChange = jest.fn(); const onToolEvent = jest.fn(); @@ -1166,11 +1160,12 @@ describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { const dataHandler = getDataReceivedHandler(); const payload = createDataChannelPayload({ - subject: StreamEvents.ToolCalling, - execution_id: 'exec-123', - tool_name: 'get_weather', - arguments: { location: 'Tel Aviv' }, - created_at: new Date().toISOString(), + subject: StreamEvents.ToolCallStarted, + call_id: 'call-123', + name: 'get_weather', + input: { location: 'Tel Aviv' }, + output: {}, + timestamp: new Date().toISOString(), }); // ACT: @@ -1179,17 +1174,18 @@ describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { // ASSERT: expect(onAgentActivityStateChange).toHaveBeenCalledWith(AgentActivityState.ToolActive); expect(onToolEvent).toHaveBeenCalledWith( - StreamEvents.ToolCalling, + StreamEvents.ToolCallStarted, expect.objectContaining({ - execution_id: 'exec-123', - tool_name: 'get_weather', + call_id: 'call-123', + name: 'get_weather', + input: { location: 'Tel Aviv' }, }) ); }); }); - describe('handleDataReceived - tool/result', () => { - it('should call onToolEvent but not change state on tool/result', async () => { + describe('handleDataReceived - tool-call/done', () => { + it('should forward payload via onToolEvent without changing activity state', async () => { // ARRANGE: const onAgentActivityStateChange = jest.fn(); const onToolEvent = jest.fn(); @@ -1201,38 +1197,92 @@ describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { const dataHandler = getDataReceivedHandler(); - // First trigger tool/calling to set ToolActive + // Set ToolActive first so we can verify done doesn't touch it dataHandler( createDataChannelPayload({ - subject: StreamEvents.ToolCalling, - execution_id: 'exec-123', - tool_name: 'get_weather', - arguments: {}, - created_at: new Date().toISOString(), + subject: StreamEvents.ToolCallStarted, + call_id: 'call-123', + name: 'get_weather', + input: {}, + output: {}, + timestamp: new Date().toISOString(), }) ); onAgentActivityStateChange.mockClear(); - const toolResultPayload = createDataChannelPayload({ - subject: StreamEvents.ToolResult, - execution_id: 'exec-123', - tool_name: 'get_weather', - success: true, + const donePayload = createDataChannelPayload({ + subject: StreamEvents.ToolCallDone, + call_id: 'call-123', + name: 'get_weather', + input: {}, + output: { temp: 22 }, duration_ms: 500, - error_message: null, - created_at: new Date().toISOString(), + extra: {}, + timestamp: new Date().toISOString(), }); // ACT: - dataHandler(toolResultPayload); + dataHandler(donePayload); // ASSERT: expect(onAgentActivityStateChange).not.toHaveBeenCalled(); expect(onToolEvent).toHaveBeenCalledWith( - StreamEvents.ToolResult, + StreamEvents.ToolCallDone, expect.objectContaining({ - execution_id: 'exec-123', - success: true, + call_id: 'call-123', + output: { temp: 22 }, + duration_ms: 500, + }) + ); + }); + }); + + describe('handleDataReceived - tool-call/error', () => { + it('should forward payload via onToolEvent without changing activity state', async () => { + // ARRANGE: + const onAgentActivityStateChange = jest.fn(); + const onToolEvent = jest.fn(); + options.callbacks.onAgentActivityStateChange = onAgentActivityStateChange; + options.callbacks.onToolEvent = onToolEvent; + + await createLiveKitStreamingManager(agentId, sessionOptions, options); + await simulateConnection(); + + const dataHandler = getDataReceivedHandler(); + + dataHandler( + createDataChannelPayload({ + subject: StreamEvents.ToolCallStarted, + call_id: 'call-123', + name: 'get_weather', + input: {}, + output: {}, + timestamp: new Date().toISOString(), + }) + ); + onAgentActivityStateChange.mockClear(); + + const errorPayload = createDataChannelPayload({ + subject: StreamEvents.ToolCallError, + call_id: 'call-123', + name: 'get_weather', + input: {}, + output: {}, + duration_ms: 120, + extra: { message: 'upstream timeout' }, + timestamp: new Date().toISOString(), + }); + + // ACT: + dataHandler(errorPayload); + + // ASSERT: + expect(onAgentActivityStateChange).not.toHaveBeenCalled(); + expect(onToolEvent).toHaveBeenCalledWith( + StreamEvents.ToolCallError, + expect.objectContaining({ + call_id: 'call-123', + extra: { message: 'upstream timeout' }, }) ); }); @@ -1252,11 +1302,12 @@ describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { // Set ToolActive state first dataHandler( createDataChannelPayload({ - subject: StreamEvents.ToolCalling, - execution_id: 'exec-123', - tool_name: 'test', - arguments: {}, - created_at: new Date().toISOString(), + subject: StreamEvents.ToolCallStarted, + call_id: 'call-123', + name: 'test', + input: {}, + output: {}, + timestamp: new Date().toISOString(), }) ); onAgentActivityStateChange.mockClear(); @@ -1286,11 +1337,12 @@ describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { // Set ToolActive state first dataHandler( createDataChannelPayload({ - subject: StreamEvents.ToolCalling, - execution_id: 'exec-123', - tool_name: 'test', - arguments: {}, - created_at: new Date().toISOString(), + subject: StreamEvents.ToolCallStarted, + call_id: 'call-123', + name: 'test', + input: {}, + output: {}, + timestamp: new Date().toISOString(), }) ); onAgentActivityStateChange.mockClear(); @@ -1319,11 +1371,12 @@ describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { // Set ToolActive state first dataHandler( createDataChannelPayload({ - subject: StreamEvents.ToolCalling, - execution_id: 'exec-123', - tool_name: 'test', - arguments: {}, - created_at: new Date().toISOString(), + subject: StreamEvents.ToolCallStarted, + call_id: 'call-123', + name: 'test', + input: {}, + output: {}, + timestamp: new Date().toISOString(), }) ); onAgentActivityStateChange.mockClear(); @@ -1345,9 +1398,7 @@ describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { it('should stay ToolActive across multiple tool calls until final stream-video/done', async () => { // ARRANGE: const onAgentActivityStateChange = jest.fn(); - const onToolEvent = jest.fn(); options.callbacks.onAgentActivityStateChange = onAgentActivityStateChange; - options.callbacks.onToolEvent = onToolEvent; await createLiveKitStreamingManager(agentId, sessionOptions, options); await simulateConnection(); @@ -1357,22 +1408,12 @@ describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { // First tool cycle dataHandler( createDataChannelPayload({ - subject: StreamEvents.ToolCalling, - execution_id: 'exec-1', - tool_name: 'tool1', - arguments: {}, - created_at: new Date().toISOString(), - }) - ); - - dataHandler( - createDataChannelPayload({ - subject: StreamEvents.ToolResult, - execution_id: 'exec-1', - tool_name: 'tool1', - success: true, - duration_ms: 100, - created_at: new Date().toISOString(), + subject: StreamEvents.ToolCallStarted, + call_id: 'call-1', + name: 'tool1', + input: {}, + output: {}, + timestamp: new Date().toISOString(), }) ); @@ -1387,22 +1428,12 @@ describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { // Second tool cycle dataHandler( createDataChannelPayload({ - subject: StreamEvents.ToolCalling, - execution_id: 'exec-2', - tool_name: 'tool2', - arguments: {}, - created_at: new Date().toISOString(), - }) - ); - - dataHandler( - createDataChannelPayload({ - subject: StreamEvents.ToolResult, - execution_id: 'exec-2', - tool_name: 'tool2', - success: true, - duration_ms: 200, - created_at: new Date().toISOString(), + subject: StreamEvents.ToolCallStarted, + call_id: 'call-2', + name: 'tool2', + input: {}, + output: {}, + timestamp: new Date().toISOString(), }) ); @@ -1417,7 +1448,6 @@ describe('LiveKit Streaming Manager - Tool Events and Activity State', () => { // ASSERT: expect(onAgentActivityStateChange).toHaveBeenCalledWith(AgentActivityState.ToolActive); expect(onAgentActivityStateChange).toHaveBeenLastCalledWith(AgentActivityState.Idle); - expect(onToolEvent).toHaveBeenCalledTimes(4); }); }); diff --git a/src/services/streaming-manager/livekit-manager.ts b/src/services/streaming-manager/livekit-manager.ts index e714d376..3ecd20a6 100644 --- a/src/services/streaming-manager/livekit-manager.ts +++ b/src/services/streaming-manager/livekit-manager.ts @@ -10,8 +10,9 @@ import { StreamingManagerOptions, StreamingState, StreamType, - ToolCallingPayload, - ToolResultPayload, + ToolCallDonePayload, + ToolCallErrorPayload, + ToolCallStartedPayload, } from '@sdk/types'; import { ChatProgress } from '@sdk/types/entities/agents/manager'; import { noop } from '@sdk/utils'; @@ -357,20 +358,25 @@ export async function createLiveKitStreamingManager sets ToolActive + * - tool-call/started -> sets ToolActive * - stream-video/done with interruptible: true -> sets Idle * - stream-video/done with interruptible: false -> stays ToolActive (more tools coming) */ function handleToolEvents(subject: string, data: any): void { - if (subject === StreamEvents.ToolCalling) { + if (subject === StreamEvents.ToolCallStarted) { currentActivityState = AgentActivityState.ToolActive; callbacks.onAgentActivityStateChange?.(AgentActivityState.ToolActive); - callbacks.onToolEvent?.(StreamEvents.ToolCalling, data as ToolCallingPayload); + callbacks.onToolEvent?.(StreamEvents.ToolCallStarted, data as ToolCallStartedPayload); return; } - if (subject === StreamEvents.ToolResult) { - callbacks.onToolEvent?.(StreamEvents.ToolResult, data as ToolResultPayload); + if (subject === StreamEvents.ToolCallDone) { + callbacks.onToolEvent?.(StreamEvents.ToolCallDone, data as ToolCallDonePayload); + return; + } + + if (subject === StreamEvents.ToolCallError) { + callbacks.onToolEvent?.(StreamEvents.ToolCallError, data as ToolCallErrorPayload); } } @@ -421,8 +427,9 @@ export async function createLiveKitStreamingManager = { [StreamEvents.ChatAnswer]: handleChatEvents, [StreamEvents.ChatPartial]: handleChatEvents, - [StreamEvents.ToolCalling]: handleToolEvents, - [StreamEvents.ToolResult]: handleToolEvents, + [StreamEvents.ToolCallStarted]: handleToolEvents, + [StreamEvents.ToolCallDone]: handleToolEvents, + [StreamEvents.ToolCallError]: handleToolEvents, [StreamEvents.StreamVideoCreated]: handleVideoEvents, [StreamEvents.StreamVideoDone]: handleVideoEvents, [StreamEvents.StreamVideoError]: handleVideoEvents, diff --git a/src/types/entities/agents/manager.ts b/src/types/entities/agents/manager.ts index 7ef2b6cd..044c15e3 100644 --- a/src/types/entities/agents/manager.ts +++ b/src/types/entities/agents/manager.ts @@ -10,8 +10,6 @@ import { StreamEvents, StreamType, StreamingState, - ToolCallingPayload, - ToolResultPayload, } from '@sdk/types/stream'; import { SupportedStreamScript } from '@sdk/types/stream-script'; import type { ManagerCallbacks as StreamManagerCallbacks } from '../../stream/stream'; @@ -107,14 +105,11 @@ interface ManagerCallbacks { */ onStreamCreated?: StreamManagerCallbacks['onStreamCreated']; /** - * Optional callback function that will be triggered when tool events occur during the call - * @param event - The tool event type (tool/calling or tool/result) - * @param data - The tool event payload + * Optional callback function that will be triggered when tool-call events occur during the call + * (tool-call/started, tool-call/done, tool-call/error). + * The payload shape is discriminated by the event argument. */ - onToolEvent?: ( - event: StreamEvents.ToolCalling | StreamEvents.ToolResult, - data: ToolCallingPayload | ToolResultPayload - ) => void; + onToolEvent?: StreamManagerCallbacks['onToolEvent']; /** * Optional callback function that will be triggered when the interruptible state changes * @param interruptible - Whether the agent can be interrupted by the user diff --git a/src/types/stream/stream.ts b/src/types/stream/stream.ts index 55f3c5df..fdadbce7 100644 --- a/src/types/stream/stream.ts +++ b/src/types/stream/stream.ts @@ -40,8 +40,9 @@ export enum StreamEvents { StreamVideoDone = 'stream-video/done', StreamVideoError = 'stream-video/error', StreamVideoRejected = 'stream-video/rejected', - ToolCalling = 'tool/calling', - ToolResult = 'tool/result', + ToolCallStarted = 'tool-call/started', + ToolCallDone = 'tool-call/done', + ToolCallError = 'tool-call/error', } export enum ConnectionState { @@ -72,10 +73,7 @@ export interface ManagerCallbacks { onStreamCreated?: (stream: { stream_id: string; session_id: string; agent_id: string }) => void; onStreamReady?: () => void; onInterruptDetected?: (interrupt: Interrupt) => void; - onToolEvent?: ( - event: StreamEvents.ToolCalling | StreamEvents.ToolResult, - data: ToolCallingPayload | ToolResultPayload - ) => void; + onToolEvent?: ToolEventCallback; onInterruptibleChange?: (interruptible: boolean) => void; onFirstAudioDetected?: (metrics: AudioDetectionMetrics) => void; } @@ -196,18 +194,38 @@ export interface StreamInterruptPayload { export type ClientToolHandler = (args: Record) => Promise; -export interface ToolCallingPayload { - execution_id: string; - tool_name: string; - arguments: Record; - created_at: string; +export interface ToolCallStartedPayload { + call_id: string; + name: string; + input: Record; + output: Record; + timestamp: string; } -export interface ToolResultPayload { - execution_id: string; - tool_name: string; +export interface ToolCallDonePayload { + call_id: string; + name: string; + input: Record; + output: Record; duration_ms: number; - result?: unknown; - error_message?: string | null; - created_at: string; + extra: Record; + timestamp: string; } + +export interface ToolCallErrorPayload { + call_id: string; + name: string; + input: Record; + output: Record; + duration_ms: number; + extra: Record; + timestamp: string; +} + +export type ToolEventPayload = ToolCallStartedPayload | ToolCallDonePayload | ToolCallErrorPayload; + +export type ToolEventCallback = { + (event: StreamEvents.ToolCallStarted, data: ToolCallStartedPayload): void; + (event: StreamEvents.ToolCallDone, data: ToolCallDonePayload): void; + (event: StreamEvents.ToolCallError, data: ToolCallErrorPayload): void; +};