From ba2adc5ead6a7e6dd02fb8cb46071c745979c271 Mon Sep 17 00:00:00 2001 From: Robel Estifanos Date: Fri, 22 May 2026 13:53:06 -0400 Subject: [PATCH 1/6] =?UTF-8?q?fix(react):=20O(n=C2=B2)=20lag=20streaming?= =?UTF-8?q?=20long=20tool-input=20deltas=20(#190)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit useStreamingUIMessages was replaying the entire accumulated delta history through readUIMessageStream on every incoming chunk — fromCursor was hardcoded to 0 and the base UIMessage was always blankUIMessage(). For a 12k-char tool argument (~1.2k chunks) that is ~720,000 stream-part processings, dropping the UI to ~1 FPS. readUIMessageStream cannot resume incrementally from an existing UIMessage — it relies on internal mutable state (partialToolCalls, activeTextParts) that lives inside the async generator and is not stored in the UIMessage it returns. Attempting to feed only the new chunks throws "tool-input-delta for missing tool call". The fix splits processing into two paths: - First batch (cursor === 0): still uses readUIMessageStream so the framing chunks (start, start-step, tool-input-start) initialize a correctly shaped UIMessage. - Subsequent batches (cursor > 0): use a new sync function applyUIMessageChunksIncremental that applies new chunks directly onto the existing UIMessage without replay. applyUIMessageChunksIncremental mirrors what processUIMessageStream does for each chunk type but works against a passed-in message. For tool-input-delta accumulation it uses JSON.parse with try/catch rather than the SDK's parsePartialJson — JSON.parse is sync, throws on incomplete JSON, and only returns on complete JSON. That matches the "successful-parse" semantic we want and avoids parsePartialJson's "repaired-parse" foot-gun where a partial parse shadows the raw accumulator and corrupts subsequent deltas via "[object Object]" + nextDelta. Two related fixes bundled in: - The early-exit cursor check now also detects status-only transitions (streaming -> finished/aborted), since a stream can finish without emitting more delta parts. - The TextStreamPart path had the same O(n²) bug — it passed an empty existingStreams array to deriveUIMessagesFromTextStreamParts. Now it forwards the prior stream state so that path is also O(n). Tooling: - updateFromUIMessageChunks early-returns when parts is empty (the for-await loop never ran, so the function then mutated the caller's message via joinText). - transitionToolPart helper provides a single typed mutation point for tool-part state changes. The updates argument is Partial> so typos in state names, missing fields, or wrong field types fail at compile time. Verification (agent-190-repro hook benchmark): 4k chars / 409 chunks 12k chars / 1223 chunks Before 1,341 ms ~21,000 ms After 34 ms 73 ms 268 unit tests pass, build clean. Closes #190 --- src/deltas.test.ts | 187 ++++++++++++++++++- src/deltas.ts | 276 +++++++++++++++++++++++++--- src/react/useStreamingUIMessages.ts | 72 +++++--- 3 files changed, 483 insertions(+), 52 deletions(-) diff --git a/src/deltas.test.ts b/src/deltas.test.ts index 6be75868..af1d65a8 100644 --- a/src/deltas.test.ts +++ b/src/deltas.test.ts @@ -1,13 +1,15 @@ import { describe, it, expect } from "vitest"; import { + applyUIMessageChunksIncremental, blankUIMessage, deriveUIMessagesFromTextStreamParts, + getParts, updateFromTextStreamParts, updateFromUIMessageChunks, } from "./deltas.js"; import type { StreamMessage, StreamDelta } from "./validators.js"; import { omit } from "convex-helpers"; -import type { Tool, ToolUIPart, TypedToolResult } from "ai"; +import type { Tool, ToolUIPart, TypedToolResult, UIMessageChunk } from "ai"; describe("UIMessageChunks", () => { it("updates a UIMessage with a tool call and follow up", async () => { @@ -577,6 +579,189 @@ describe("mergeDeltas", () => { ]); }); + it("incremental processing of tool-input-delta chunks is O(N) not O(N²)", async () => { + const N = 500; + const streamId = "s-perf"; + const toolCallId = "tool-0"; + const streamMessage = { + streamId, + status: "streaming" as const, + order: 0, + stepOrder: 0, + format: "UIMessageChunk" as const, + agentName: "agent1", + }; + + // One StreamDelta with preamble, then N deltas each with one tool-input-delta + const allDeltas: StreamDelta[] = [ + { + streamId, + start: 0, + end: 1, + parts: [ + { type: "start" }, + { type: "start-step" }, + { type: "tool-input-start", toolCallId, toolName: "myTool" }, + ] as UIMessageChunk[], + }, + ...Array.from({ length: N }, (_, i) => ({ + streamId, + start: i + 1, + end: i + 2, + parts: [ + { + type: "tool-input-delta", + toolCallId, + inputTextDelta: "x", + } as UIMessageChunk, + ], + })), + ]; + + // Simulate the hook: process one delta at a time, tracking cursor + prior message + let cursor = 0; + let uiMessage = blankUIMessage(streamMessage, "thread-perf"); + let totalPartsProcessed = 0; + + for (let i = 0; i <= N; i++) { + const available = allDeltas.slice(0, i + 1); + const { parts: newParts, cursor: newCursor } = getParts( + available, + cursor, + ); + if (newParts.length > 0) { + totalPartsProcessed += newParts.length; + const base = structuredClone(uiMessage); + uiMessage = + cursor === 0 + ? await updateFromUIMessageChunks(base, newParts) + : applyUIMessageChunksIncremental(base, newParts); + cursor = newCursor; + } + } + + // O(N): each delta part processed exactly once (N tool-input-deltas + 3 preamble parts) + expect(totalPartsProcessed).toBe(N + 3); + + // Correctness: accumulated tool input should be "x" repeated N times + const toolPart = uiMessage.parts.find( + (p): p is ToolUIPart => "toolCallId" in p && p.toolCallId === toolCallId, + ); + expect(toolPart).toBeDefined(); + const inputStr = + typeof toolPart!.input === "string" + ? toolPart!.input + : JSON.stringify(toolPart!.input); + expect(inputStr.length).toBe(N); + }); + + it("applyUIMessageChunksIncremental: text-delta accumulation across calls", async () => { + const streamMessage = { + streamId: "s-text", + status: "streaming" as const, + order: 0, + stepOrder: 0, + format: "UIMessageChunk" as const, + agentName: "a", + }; + let msg = blankUIMessage(streamMessage, "thread-text"); + msg = await updateFromUIMessageChunks(msg, [ + { type: "start" }, + { type: "start-step" }, + { type: "text-start", id: "t0" }, + { type: "text-delta", id: "t0", delta: "Hello " }, + ] as UIMessageChunk[]); + msg = applyUIMessageChunksIncremental(msg, [ + { type: "text-delta", id: "t0", delta: "world" }, + ] as UIMessageChunk[]); + msg = applyUIMessageChunksIncremental(msg, [ + { type: "text-delta", id: "t0", delta: "!" }, + { type: "text-end", id: "t0" }, + ] as UIMessageChunk[]); + + const textPart = msg.parts.find((p) => p.type === "text") as + | { text: string; state: string } + | undefined; + expect(textPart?.text).toBe("Hello world!"); + expect(textPart?.state).toBe("done"); + expect(msg.text).toBe("Hello world!"); + }); + + it("applyUIMessageChunksIncremental: tool-output-available preserves input and sets fields", async () => { + const streamMessage = { + streamId: "s-tool-out", + status: "streaming" as const, + order: 0, + stepOrder: 0, + format: "UIMessageChunk" as const, + agentName: "a", + }; + let msg = blankUIMessage(streamMessage, "thread-tool-out"); + msg = await updateFromUIMessageChunks(msg, [ + { type: "start" }, + { type: "start-step" }, + { type: "tool-input-start", toolCallId: "c1", toolName: "myTool" }, + { + type: "tool-input-available", + toolCallId: "c1", + toolName: "myTool", + input: { q: "hi" }, + }, + ] as UIMessageChunk[]); + msg = applyUIMessageChunksIncremental(msg, [ + { + type: "tool-output-available", + toolCallId: "c1", + output: { result: "ok" }, + preliminary: true, + providerExecuted: true, + }, + ] as UIMessageChunk[]); + + const toolPart = msg.parts.find( + (p): p is ToolUIPart => "toolCallId" in p && p.toolCallId === "c1", + ); + expect(toolPart?.state).toBe("output-available"); + expect(toolPart?.input).toEqual({ q: "hi" }); + expect((toolPart as { output?: unknown }).output).toEqual({ result: "ok" }); + expect((toolPart as { preliminary?: boolean }).preliminary).toBe(true); + expect((toolPart as { providerExecuted?: boolean }).providerExecuted).toBe(true); + }); + + it("applyUIMessageChunksIncremental: tool-input-error sets rawInput and clears input for static tools", async () => { + const streamMessage = { + streamId: "s-tool-err", + status: "streaming" as const, + order: 0, + stepOrder: 0, + format: "UIMessageChunk" as const, + agentName: "a", + }; + let msg = blankUIMessage(streamMessage, "thread-tool-err"); + msg = await updateFromUIMessageChunks(msg, [ + { type: "start" }, + { type: "start-step" }, + { type: "tool-input-start", toolCallId: "c2", toolName: "myTool" }, + ] as UIMessageChunk[]); + msg = applyUIMessageChunksIncremental(msg, [ + { + type: "tool-input-error", + toolCallId: "c2", + toolName: "myTool", + input: { bad: "args" }, + errorText: "validation failed", + }, + ] as UIMessageChunk[]); + + const toolPart = msg.parts.find( + (p): p is ToolUIPart => "toolCallId" in p && p.toolCallId === "c2", + ); + expect(toolPart?.state).toBe("output-error"); + expect((toolPart as { errorText?: string }).errorText).toBe("validation failed"); + expect(toolPart?.input).toBeUndefined(); + expect((toolPart as { rawInput?: unknown }).rawInput).toEqual({ bad: "args" }); + }); + it("handles streaming tool-approval-request and updates tool state", () => { const streamId = "s10"; const deltas = [ diff --git a/src/deltas.ts b/src/deltas.ts index 0e816d04..dcb79627 100644 --- a/src/deltas.ts +++ b/src/deltas.ts @@ -57,6 +57,9 @@ export async function updateFromUIMessageChunks( uiMessage: UIMessage, parts: UIMessageChunk[], ) { + if (parts.length === 0) { + return uiMessage; + } const partsStream = new ReadableStream({ start(controller) { for (const part of parts) { @@ -72,11 +75,7 @@ export async function updateFromUIMessageChunks( stream: partsStream, onError: (e) => { const errorMessage = e instanceof Error ? e.message : String(e); - // Tool invocation errors can be safely ignored when streaming continuation - // after tool approval - the stored messages have the complete tool context if (errorMessage.toLowerCase().includes("no tool invocation found")) { - // Silently suppress - this is expected after tool approval when the - // continuation stream has tool-result without the original tool-call suppressError = true; return; } @@ -95,8 +94,6 @@ export async function updateFromUIMessageChunks( message = messagePart; } } catch (e) { - // If we've already handled this error in onError and marked it as suppressed, - // don't rethrow - the stored messages provide the fallback if (!suppressError) { throw e; } @@ -108,6 +105,256 @@ export async function updateFromUIMessageChunks( return message; } +type ToolPart = ToolUIPart | DynamicToolUIPart; + +function transitionToolPart( + part: ToolPart, + updates: { state: S } & Partial>, +): void { + Object.assign(part, updates); +} + +export function applyUIMessageChunksIncremental( + uiMessage: UIMessage, + newParts: UIMessageChunk[], +): UIMessage { + const message: UIMessage = structuredClone(uiMessage); + + const toolPartsById = new Map( + message.parts + .filter( + (p): p is ToolPart => + p.type.startsWith("tool-") || p.type === "dynamic-tool", + ) + .map((p) => [p.toolCallId, p]), + ); + + let activeTextPart: TextUIPart | undefined = message.parts + .filter((p): p is TextUIPart => p.type === "text" && p.state === "streaming") + .at(-1); + let activeReasoningPart: ReasoningUIPart | undefined = message.parts + .filter( + (p): p is ReasoningUIPart => + p.type === "reasoning" && p.state === "streaming", + ) + .at(-1); + + for (const part of newParts) { + switch (part.type) { + case "text-start": { + const newPart: TextUIPart = { + type: "text", + text: "", + state: "streaming", + providerMetadata: part.providerMetadata, + }; + activeTextPart = newPart; + message.parts.push(newPart); + break; + } + case "text-delta": { + if (activeTextPart) { + activeTextPart.text += part.delta; + activeTextPart.providerMetadata = mergeProviderMetadata( + activeTextPart.providerMetadata, + part.providerMetadata, + ); + } + break; + } + case "text-end": { + if (activeTextPart) { + activeTextPart.state = "done"; + activeTextPart.providerMetadata = mergeProviderMetadata( + activeTextPart.providerMetadata, + part.providerMetadata, + ); + activeTextPart = undefined; + } + break; + } + case "reasoning-start": { + const newPart: ReasoningUIPart = { + type: "reasoning", + text: "", + state: "streaming", + providerMetadata: part.providerMetadata, + }; + activeReasoningPart = newPart; + message.parts.push(newPart); + break; + } + case "reasoning-delta": { + if (activeReasoningPart) { + activeReasoningPart.text += part.delta; + activeReasoningPart.providerMetadata = mergeProviderMetadata( + activeReasoningPart.providerMetadata, + part.providerMetadata, + ); + } + break; + } + case "reasoning-end": { + if (activeReasoningPart) { + activeReasoningPart.state = "done"; + activeReasoningPart.providerMetadata = mergeProviderMetadata( + activeReasoningPart.providerMetadata, + part.providerMetadata, + ); + activeReasoningPart = undefined; + } + break; + } + case "tool-input-start": { + const newToolPart: ToolUIPart | DynamicToolUIPart = part.dynamic + ? ({ + type: "dynamic-tool", + toolCallId: part.toolCallId, + toolName: part.toolName, + state: "input-streaming", + input: "", + } satisfies DynamicToolUIPart) + : ({ + type: `tool-${part.toolName}`, + toolCallId: part.toolCallId, + state: "input-streaming", + input: "", + providerExecuted: part.providerExecuted, + } satisfies ToolUIPart); + toolPartsById.set(part.toolCallId, newToolPart); + message.parts.push(newToolPart); + break; + } + case "tool-input-delta": { + const toolPart = toolPartsById.get(part.toolCallId); + if (!toolPart) { + console.warn(`tool-input-delta for unknown toolCallId ${part.toolCallId}`); + break; + } + const raw = typeof toolPart.input === "string" ? toolPart.input : ""; + const accumulated = raw + part.inputTextDelta; + try { + toolPart.input = JSON.parse(accumulated); + } catch { + toolPart.input = accumulated; + } + break; + } + case "tool-input-available": { + const toolPart = toolPartsById.get(part.toolCallId); + if (toolPart) { + transitionToolPart(toolPart, { + state: "input-available", + input: part.input, + callProviderMetadata: mergeProviderMetadata( + (toolPart as { callProviderMetadata?: ProviderMetadata }).callProviderMetadata, + part.providerMetadata, + ), + }); + } + break; + } + case "tool-input-error": { + const toolPart = toolPartsById.get(part.toolCallId); + if (toolPart) { + transitionToolPart(toolPart, { + state: "output-error", + errorText: part.errorText, + providerExecuted: part.providerExecuted, + ...(toolPart.type === "dynamic-tool" + ? { input: part.input } + : { input: undefined, rawInput: part.input }), + callProviderMetadata: mergeProviderMetadata( + (toolPart as { callProviderMetadata?: ProviderMetadata }).callProviderMetadata, + part.providerMetadata, + ), + }); + } + break; + } + case "tool-output-available": { + const toolPart = toolPartsById.get(part.toolCallId); + if (toolPart) { + transitionToolPart(toolPart, { + state: "output-available", + output: part.output, + preliminary: part.preliminary, + providerExecuted: part.providerExecuted, + }); + } + break; + } + case "tool-output-error": { + const toolPart = toolPartsById.get(part.toolCallId); + if (toolPart) { + transitionToolPart(toolPart, { + state: "output-error", + errorText: part.errorText, + providerExecuted: part.providerExecuted, + }); + } + break; + } + case "tool-output-denied": { + const toolPart = toolPartsById.get(part.toolCallId); + if (toolPart) { + transitionToolPart(toolPart, { state: "output-denied" }); + } + break; + } + case "tool-approval-request": { + const toolPart = toolPartsById.get(part.toolCallId); + if (toolPart) { + transitionToolPart(toolPart, { + state: "approval-requested", + approval: { id: part.approvalId }, + }); + } + break; + } + case "source-url": + message.parts.push({ + type: "source-url", + url: part.url, + sourceId: part.sourceId, + title: part.title, + providerMetadata: part.providerMetadata, + }); + break; + case "source-document": + message.parts.push({ + type: "source-document", + mediaType: part.mediaType, + sourceId: part.sourceId, + title: part.title, + filename: part.filename, + providerMetadata: part.providerMetadata, + }); + break; + case "start-step": + message.parts.push({ type: "step-start" }); + break; + case "finish-step": + if (activeTextPart) { activeTextPart.state = "done"; activeTextPart = undefined; } + if (activeReasoningPart) { activeReasoningPart.state = "done"; activeReasoningPart = undefined; } + break; + case "abort": + case "error": + message.status = "failed"; + break; + case "start": + case "finish": + case "file": + break; + default: + break; + } + } + + message.text = joinText(message.parts); + return message; +} + export async function deriveUIMessagesFromDeltas( threadId: string, streamMessages: StreamMessage[], @@ -138,10 +385,6 @@ export async function deriveUIMessagesFromDeltas( return sorted(messages); } -/** - * - */ - export function deriveUIMessagesFromTextStreamParts( threadId: string, streamMessages: StreamMessage[], @@ -161,7 +404,6 @@ export function deriveUIMessagesFromTextStreamParts( cursor: number; message: UIMessage; }> = []; - // Seed the existing chunks let changed = false; for (const streamMessage of streamMessages) { const deltas = allDeltas.filter( @@ -220,12 +462,6 @@ export function getParts( return { parts, cursor }; } -/** - * This is historically from when we would use the onChunk callback instead of - * consuming the full UIMessageStream. - */ - -// exported for testing export function updateFromTextStreamParts( threadId: string, streamMessage: StreamMessage, @@ -515,19 +751,13 @@ export function updateFromTextStreamParts( case "raw": case "start-step": case "start": - // ignore break; default: { - // Exhaustiveness check disabled intentionally for forwards compatibility. - // New TextStreamPart types from future AI SDK versions will trigger a - // runtime warning rather than a compile error, allowing graceful degradation. - // const _: never = part; console.warn(`Received unexpected part: ${JSON.stringify(part)}`); break; } } } - // Consider reasoning done once something else happens for (let i = 0; i < message.parts.length - 1; i++) { const part = message.parts[i]; if (part.type === "reasoning") { diff --git a/src/react/useStreamingUIMessages.ts b/src/react/useStreamingUIMessages.ts index e8a4b003..60514c0b 100644 --- a/src/react/useStreamingUIMessages.ts +++ b/src/react/useStreamingUIMessages.ts @@ -4,8 +4,10 @@ import { type UIDataTypes, type UIMessageChunk, type UITools } from "ai"; import type { StreamQuery, StreamQueryArgs } from "./types.js"; import { type UIMessage } from "../UIMessages.js"; import { + applyUIMessageChunksIncremental, blankUIMessage, getParts, + statusFromStreamStatus, updateFromUIMessageChunks, deriveUIMessagesFromTextStreamParts, } from "../deltas.js"; @@ -63,11 +65,11 @@ export function useStreamingUIMessages< useEffect(() => { if (!streams) return; - // return if there are no new deltas beyond the cursors let noNewDeltas = true; for (const stream of streams) { + const existingStreamState = messageState[stream.streamMessage.streamId]; const lastDelta = stream.deltas.at(-1); - const cursor = messageState[stream.streamMessage.streamId]?.cursor; + const cursor = existingStreamState?.cursor; if (!cursor) { noNewDeltas = false; break; @@ -76,6 +78,14 @@ export function useStreamingUIMessages< noNewDeltas = false; break; } + if ( + existingStreamState && + existingStreamState.uiMessage.status !== + statusFromStreamStatus(stream.streamMessage.status) + ) { + noNewDeltas = false; + break; + } } if (noNewDeltas) { return; @@ -91,36 +101,42 @@ export function useStreamingUIMessages< > = Object.fromEntries( await Promise.all( streams.map(async ({ deltas, streamMessage }) => { - const { parts, cursor } = getParts(deltas, 0); - if (streamMessage.format === "UIMessageChunk") { - // Unfortunately this can't handle resuming from a UIMessage and - // adding more chunks, so we re-create it from scratch each time. - const uiMessage = await updateFromUIMessageChunks( - blankUIMessage(streamMessage, threadId), - parts, - ); - return [ - streamMessage.streamId, - { - uiMessage, - cursor, - }, - ]; - } else { - const [uiMessages] = deriveUIMessagesFromTextStreamParts( - threadId, + const streamId = streamMessage.streamId; + const existing = messageState[streamId]; + const fromCursor = existing?.cursor ?? 0; + const status = statusFromStreamStatus(streamMessage.status); + + if (streamMessage.format !== "UIMessageChunk") { + const existingStreams = existing + ? [{ streamId, cursor: existing.cursor, message: existing.uiMessage as UIMessage }] + : []; + const [uiMessages, newStreams] = deriveUIMessagesFromTextStreamParts( + threadId as string, [streamMessage], - [], + existingStreams, deltas, ); - return [ - streamMessage.streamId, - { - uiMessage: uiMessages[0], - cursor, - }, - ]; + return [streamId, { + uiMessage: (uiMessages[0] ?? existing?.uiMessage) as UIMessage, + cursor: newStreams[0]?.cursor ?? fromCursor, + }]; } + + const { parts: newParts, cursor } = getParts(deltas, fromCursor); + + if (newParts.length === 0) { + if (existing && existing.uiMessage.status !== status) { + return [streamId, { uiMessage: { ...existing.uiMessage, status }, cursor: existing.cursor }]; + } + return [streamId, existing ?? { uiMessage: blankUIMessage(streamMessage, threadId as string), cursor: 0 }]; + } + + const base = existing?.uiMessage ?? blankUIMessage(streamMessage, threadId as string); + const uiMessage = fromCursor === 0 + ? await updateFromUIMessageChunks(base as UIMessage, newParts) + : applyUIMessageChunksIncremental(base as UIMessage, newParts); + uiMessage.status = status; + return [streamId, { uiMessage: uiMessage as UIMessage, cursor }]; }), ), ); From c7929d83d87e1870088a7f4312f226fb6398734a Mon Sep 17 00:00:00 2001 From: Robel Estifanos Date: Sun, 24 May 2026 14:34:08 -0400 Subject: [PATCH 2/6] refactor(react): single incremental path for streaming UIMessages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Collapse the dual SDK/incremental processing into one path. The first batch previously went through readUIMessageStream (parsePartialJson → partial object input) while later batches used a hand-rolled incremental function that stored the raw accumulator in `input` and parsed with strict JSON.parse. The two diverged across the batch boundary, corrupting tool input for long (multi-flush) streams — exactly the case this PR targets. applyUIMessageChunksIncremental now drives every batch: - persists ephemeral stream state (active text/reasoning indices, raw tool input text) that the UIMessage can't hold, so it resumes mid-part - keeps the raw tool-input accumulator separate from `input` and uses parsePartialJson, matching the SDK's partial-object streaming - handles file / message-metadata / data-* chunks in later batches - tracks text/reasoning parts by chunk id - matches the SDK on finish-step (clear active maps, don't force "done") - leaves message status to the caller; warns on unknown chunk types An equivalence test pins the incremental port to the SDK output. --- src/deltas.test.ts | 333 +++++++++++++++++++++++----- src/deltas.ts | 229 +++++++++++++------ src/react/useStreamingUIMessages.ts | 89 ++++++-- 3 files changed, 498 insertions(+), 153 deletions(-) diff --git a/src/deltas.test.ts b/src/deltas.test.ts index af1d65a8..d6f76206 100644 --- a/src/deltas.test.ts +++ b/src/deltas.test.ts @@ -3,6 +3,7 @@ import { applyUIMessageChunksIncremental, blankUIMessage, deriveUIMessagesFromTextStreamParts, + emptyIncrementalStreamState, getParts, updateFromTextStreamParts, updateFromUIMessageChunks, @@ -621,6 +622,7 @@ describe("mergeDeltas", () => { // Simulate the hook: process one delta at a time, tracking cursor + prior message let cursor = 0; let uiMessage = blankUIMessage(streamMessage, "thread-perf"); + let streamState = emptyIncrementalStreamState(); let totalPartsProcessed = 0; for (let i = 0; i <= N; i++) { @@ -631,11 +633,12 @@ describe("mergeDeltas", () => { ); if (newParts.length > 0) { totalPartsProcessed += newParts.length; - const base = structuredClone(uiMessage); - uiMessage = - cursor === 0 - ? await updateFromUIMessageChunks(base, newParts) - : applyUIMessageChunksIncremental(base, newParts); + ({ message: uiMessage, streamState } = + await applyUIMessageChunksIncremental( + structuredClone(uiMessage), + newParts, + streamState, + )); cursor = newCursor; } } @@ -643,16 +646,12 @@ describe("mergeDeltas", () => { // O(N): each delta part processed exactly once (N tool-input-deltas + 3 preamble parts) expect(totalPartsProcessed).toBe(N + 3); - // Correctness: accumulated tool input should be "x" repeated N times + // Correctness: the raw accumulator holds "x" repeated N times across batches + expect(streamState.toolInputText[toolCallId]).toBe("x".repeat(N)); const toolPart = uiMessage.parts.find( (p): p is ToolUIPart => "toolCallId" in p && p.toolCallId === toolCallId, ); expect(toolPart).toBeDefined(); - const inputStr = - typeof toolPart!.input === "string" - ? toolPart!.input - : JSON.stringify(toolPart!.input); - expect(inputStr.length).toBe(N); }); it("applyUIMessageChunksIncremental: text-delta accumulation across calls", async () => { @@ -665,19 +664,30 @@ describe("mergeDeltas", () => { agentName: "a", }; let msg = blankUIMessage(streamMessage, "thread-text"); - msg = await updateFromUIMessageChunks(msg, [ - { type: "start" }, - { type: "start-step" }, - { type: "text-start", id: "t0" }, - { type: "text-delta", id: "t0", delta: "Hello " }, - ] as UIMessageChunk[]); - msg = applyUIMessageChunksIncremental(msg, [ - { type: "text-delta", id: "t0", delta: "world" }, - ] as UIMessageChunk[]); - msg = applyUIMessageChunksIncremental(msg, [ - { type: "text-delta", id: "t0", delta: "!" }, - { type: "text-end", id: "t0" }, - ] as UIMessageChunk[]); + let state = emptyIncrementalStreamState(); + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [ + { type: "start" }, + { type: "start-step" }, + { type: "text-start", id: "t0" }, + { type: "text-delta", id: "t0", delta: "Hello " }, + ] as UIMessageChunk[], + state, + )); + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [{ type: "text-delta", id: "t0", delta: "world" }] as UIMessageChunk[], + state, + )); + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [ + { type: "text-delta", id: "t0", delta: "!" }, + { type: "text-end", id: "t0" }, + ] as UIMessageChunk[], + state, + )); const textPart = msg.parts.find((p) => p.type === "text") as | { text: string; state: string } @@ -697,26 +707,35 @@ describe("mergeDeltas", () => { agentName: "a", }; let msg = blankUIMessage(streamMessage, "thread-tool-out"); - msg = await updateFromUIMessageChunks(msg, [ - { type: "start" }, - { type: "start-step" }, - { type: "tool-input-start", toolCallId: "c1", toolName: "myTool" }, - { - type: "tool-input-available", - toolCallId: "c1", - toolName: "myTool", - input: { q: "hi" }, - }, - ] as UIMessageChunk[]); - msg = applyUIMessageChunksIncremental(msg, [ - { - type: "tool-output-available", - toolCallId: "c1", - output: { result: "ok" }, - preliminary: true, - providerExecuted: true, - }, - ] as UIMessageChunk[]); + let state = emptyIncrementalStreamState(); + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [ + { type: "start" }, + { type: "start-step" }, + { type: "tool-input-start", toolCallId: "c1", toolName: "myTool" }, + { + type: "tool-input-available", + toolCallId: "c1", + toolName: "myTool", + input: { q: "hi" }, + }, + ] as UIMessageChunk[], + state, + )); + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [ + { + type: "tool-output-available", + toolCallId: "c1", + output: { result: "ok" }, + preliminary: true, + providerExecuted: true, + }, + ] as UIMessageChunk[], + state, + )); const toolPart = msg.parts.find( (p): p is ToolUIPart => "toolCallId" in p && p.toolCallId === "c1", @@ -738,28 +757,222 @@ describe("mergeDeltas", () => { agentName: "a", }; let msg = blankUIMessage(streamMessage, "thread-tool-err"); - msg = await updateFromUIMessageChunks(msg, [ - { type: "start" }, - { type: "start-step" }, - { type: "tool-input-start", toolCallId: "c2", toolName: "myTool" }, - ] as UIMessageChunk[]); - msg = applyUIMessageChunksIncremental(msg, [ - { - type: "tool-input-error", - toolCallId: "c2", - toolName: "myTool", - input: { bad: "args" }, - errorText: "validation failed", - }, - ] as UIMessageChunk[]); + let state = emptyIncrementalStreamState(); + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [ + { type: "start" }, + { type: "start-step" }, + { type: "tool-input-start", toolCallId: "c2", toolName: "myTool" }, + ] as UIMessageChunk[], + state, + )); + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [ + { + type: "tool-input-error", + toolCallId: "c2", + toolName: "myTool", + input: { bad: "args" }, + errorText: "validation failed", + }, + ] as UIMessageChunk[], + state, + )); const toolPart = msg.parts.find( (p): p is ToolUIPart => "toolCallId" in p && p.toolCallId === "c2", ); expect(toolPart?.state).toBe("output-error"); - expect((toolPart as { errorText?: string }).errorText).toBe("validation failed"); + expect((toolPart as { errorText?: string }).errorText).toBe( + "validation failed", + ); expect(toolPart?.input).toBeUndefined(); - expect((toolPart as { rawInput?: unknown }).rawInput).toEqual({ bad: "args" }); + expect((toolPart as { rawInput?: unknown }).rawInput).toEqual({ + bad: "args", + }); + }); + + it("accumulates tool input across a batch boundary (parsePartialJson)", async () => { + const streamMessage = { + streamId: "s-tool-split", + status: "streaming" as const, + order: 0, + stepOrder: 0, + format: "UIMessageChunk" as const, + agentName: "a", + }; + let msg = blankUIMessage(streamMessage, "thread-tool-split"); + let state = emptyIncrementalStreamState(); + + // Batch A: preamble + the first half of the JSON input. + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [ + { type: "start" }, + { type: "start-step" }, + { type: "tool-input-start", toolCallId: "c1", toolName: "myTool" }, + { type: "tool-input-delta", toolCallId: "c1", inputTextDelta: '{"a":1' }, + ] as UIMessageChunk[], + state, + )); + const afterA = msg.parts.find( + (p): p is ToolUIPart => "toolCallId" in p && p.toolCallId === "c1", + ); + // Mid-stream input is a partial structured object, not a raw string. + expect(afterA?.input).toEqual({ a: 1 }); + + // Batch B: the remainder of the JSON input. + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [ + { type: "tool-input-delta", toolCallId: "c1", inputTextDelta: ',"b":2}' }, + ] as UIMessageChunk[], + state, + )); + const afterB = msg.parts.find( + (p): p is ToolUIPart => "toolCallId" in p && p.toolCallId === "c1", + ); + // The batch-A accumulation is preserved, not dropped. + expect(afterB?.input).toEqual({ a: 1, b: 2 }); + expect(state.toolInputText["c1"]).toBe('{"a":1,"b":2}'); + }); + + it("pushes file parts and merges message metadata in later batches", async () => { + const streamMessage = { + streamId: "s-file-meta", + status: "streaming" as const, + order: 0, + stepOrder: 0, + format: "UIMessageChunk" as const, + agentName: "a", + }; + let msg = blankUIMessage(streamMessage, "thread-file-meta"); + let state = emptyIncrementalStreamState(); + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [ + { type: "start" }, + { type: "start-step" }, + ] as UIMessageChunk[], + state, + )); + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [ + { + type: "file", + mediaType: "image/png", + url: "https://example.com/a.png", + }, + { type: "message-metadata", messageMetadata: { foo: "bar" } }, + ] as UIMessageChunk[], + state, + )); + + const filePart = msg.parts.find((p) => p.type === "file") as + | { mediaType: string; url: string } + | undefined; + expect(filePart?.mediaType).toBe("image/png"); + expect(filePart?.url).toBe("https://example.com/a.png"); + expect(msg.metadata).toEqual({ foo: "bar" }); + }); + + it("tracks concurrent text parts by id across batches", async () => { + const streamMessage = { + streamId: "s-multi-text", + status: "streaming" as const, + order: 0, + stepOrder: 0, + format: "UIMessageChunk" as const, + agentName: "a", + }; + let msg = blankUIMessage(streamMessage, "thread-multi-text"); + let state = emptyIncrementalStreamState(); + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [ + { type: "start" }, + { type: "start-step" }, + { type: "text-start", id: "t0" }, + { type: "text-start", id: "t1" }, + { type: "text-delta", id: "t0", delta: "A" }, + ] as UIMessageChunk[], + state, + )); + // Deltas in a later batch must land on the part matching their id. + ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + msg, + [ + { type: "text-delta", id: "t1", delta: "B" }, + { type: "text-delta", id: "t0", delta: "C" }, + ] as UIMessageChunk[], + state, + )); + + const textParts = msg.parts.filter((p) => p.type === "text") as Array<{ + text: string; + }>; + expect(textParts.map((p) => p.text)).toEqual(["AC", "B"]); + }); + + it("incremental batches match the SDK processing the full stream", async () => { + const streamMessage = { + streamId: "s-equiv", + status: "streaming" as const, + order: 0, + stepOrder: 0, + format: "UIMessageChunk" as const, + agentName: "a", + }; + const batches: UIMessageChunk[][] = [ + [ + { type: "start" }, + { type: "start-step" }, + { type: "text-start", id: "t0" }, + { type: "text-delta", id: "t0", delta: "Hello " }, + ] as UIMessageChunk[], + [ + { type: "text-delta", id: "t0", delta: "world" }, + { type: "text-end", id: "t0" }, + { type: "tool-input-start", toolCallId: "c1", toolName: "myTool" }, + { type: "tool-input-delta", toolCallId: "c1", inputTextDelta: '{"q":' }, + ] as UIMessageChunk[], + [ + { type: "tool-input-delta", toolCallId: "c1", inputTextDelta: '"hi"}' }, + { + type: "tool-input-available", + toolCallId: "c1", + toolName: "myTool", + input: { q: "hi" }, + }, + { + type: "tool-output-available", + toolCallId: "c1", + output: { ok: true }, + }, + { type: "finish-step" }, + { type: "finish" }, + ] as UIMessageChunk[], + ]; + + // SDK: process the entire stream at once. + const sdkMsg = await updateFromUIMessageChunks( + blankUIMessage(streamMessage, "thread-equiv"), + batches.flat(), + ); + + // Incremental: process batch by batch, threading state. + let incMsg = blankUIMessage(streamMessage, "thread-equiv"); + let state = emptyIncrementalStreamState(); + for (const batch of batches) { + ({ message: incMsg, streamState: state } = + await applyUIMessageChunksIncremental(incMsg, batch, state)); + } + + expect(incMsg.parts).toEqual(sdkMsg.parts); + expect(incMsg.text).toBe(sdkMsg.text); }); it("handles streaming tool-approval-request and updates tool state", () => { diff --git a/src/deltas.ts b/src/deltas.ts index dcb79627..767fa979 100644 --- a/src/deltas.ts +++ b/src/deltas.ts @@ -1,4 +1,5 @@ import { + parsePartialJson, readUIMessageStream, type DynamicToolUIPart, type ProviderMetadata, @@ -114,30 +115,58 @@ function transitionToolPart( Object.assign(part, updates); } -export function applyUIMessageChunksIncremental( +export type IncrementalStreamState = { + // chunk id -> index of the streaming text part in message.parts + activeText: Record; + // chunk id -> index of the streaming reasoning part in message.parts + activeReasoning: Record; + // toolCallId -> raw accumulated input JSON text (kept separate from the + // parsed `input` so partial JSON can be repair-parsed each batch) + toolInputText: Record; +}; + +export function emptyIncrementalStreamState(): IncrementalStreamState { + return { activeText: {}, activeReasoning: {}, toolInputText: {} }; +} + +/** + * Apply a batch of new UIMessageChunks to an existing UIMessage without + * replaying prior chunks. `prev` carries the ephemeral stream state that the + * UIMessage itself can't hold (which text/reasoning parts are still streaming, + * and the raw accumulated tool input text). Parts are append-only, so part + * indices stay stable across the structuredClone between batches. Behavior + * mirrors the AI SDK's processUIMessageStream. + */ +export async function applyUIMessageChunksIncremental( uiMessage: UIMessage, newParts: UIMessageChunk[], -): UIMessage { + prev: IncrementalStreamState, +): Promise<{ message: UIMessage; streamState: IncrementalStreamState }> { const message: UIMessage = structuredClone(uiMessage); + const activeText: Record = { ...prev.activeText }; + const activeReasoning: Record = { ...prev.activeReasoning }; + const toolInputText: Record = { ...prev.toolInputText }; + const touchedTools = new Set(); - const toolPartsById = new Map( - message.parts - .filter( - (p): p is ToolPart => - p.type.startsWith("tool-") || p.type === "dynamic-tool", - ) - .map((p) => [p.toolCallId, p]), - ); - - let activeTextPart: TextUIPart | undefined = message.parts - .filter((p): p is TextUIPart => p.type === "text" && p.state === "streaming") - .at(-1); - let activeReasoningPart: ReasoningUIPart | undefined = message.parts - .filter( - (p): p is ReasoningUIPart => - p.type === "reasoning" && p.state === "streaming", - ) - .at(-1); + const toolIndexById = new Map(); + message.parts.forEach((p, i) => { + if ("toolCallId" in p && (p.type.startsWith("tool-") || p.type === "dynamic-tool")) { + toolIndexById.set((p as ToolPart).toolCallId, i); + } + }); + const toolPartAt = (toolCallId: string): ToolPart | undefined => { + const idx = toolIndexById.get(toolCallId); + return idx === undefined ? undefined : (message.parts[idx] as ToolPart); + }; + const mergeMetadata = (metadata: unknown) => { + if (metadata == null) { + return; + } + message.metadata = { + ...(message.metadata as Record | undefined), + ...(metadata as Record), + } as typeof message.metadata; + }; for (const part of newParts) { switch (part.type) { @@ -148,28 +177,32 @@ export function applyUIMessageChunksIncremental( state: "streaming", providerMetadata: part.providerMetadata, }; - activeTextPart = newPart; message.parts.push(newPart); + activeText[part.id] = message.parts.length - 1; break; } case "text-delta": { - if (activeTextPart) { - activeTextPart.text += part.delta; - activeTextPart.providerMetadata = mergeProviderMetadata( - activeTextPart.providerMetadata, + const idx = activeText[part.id]; + if (idx !== undefined) { + const textPart = message.parts[idx] as TextUIPart; + textPart.text += part.delta; + textPart.providerMetadata = mergeProviderMetadata( + textPart.providerMetadata, part.providerMetadata, ); } break; } case "text-end": { - if (activeTextPart) { - activeTextPart.state = "done"; - activeTextPart.providerMetadata = mergeProviderMetadata( - activeTextPart.providerMetadata, + const idx = activeText[part.id]; + if (idx !== undefined) { + const textPart = message.parts[idx] as TextUIPart; + textPart.state = "done"; + textPart.providerMetadata = mergeProviderMetadata( + textPart.providerMetadata, part.providerMetadata, ); - activeTextPart = undefined; + delete activeText[part.id]; } break; } @@ -180,28 +213,32 @@ export function applyUIMessageChunksIncremental( state: "streaming", providerMetadata: part.providerMetadata, }; - activeReasoningPart = newPart; message.parts.push(newPart); + activeReasoning[part.id] = message.parts.length - 1; break; } case "reasoning-delta": { - if (activeReasoningPart) { - activeReasoningPart.text += part.delta; - activeReasoningPart.providerMetadata = mergeProviderMetadata( - activeReasoningPart.providerMetadata, + const idx = activeReasoning[part.id]; + if (idx !== undefined) { + const reasoningPart = message.parts[idx] as ReasoningUIPart; + reasoningPart.text += part.delta; + reasoningPart.providerMetadata = mergeProviderMetadata( + reasoningPart.providerMetadata, part.providerMetadata, ); } break; } case "reasoning-end": { - if (activeReasoningPart) { - activeReasoningPart.state = "done"; - activeReasoningPart.providerMetadata = mergeProviderMetadata( - activeReasoningPart.providerMetadata, + const idx = activeReasoning[part.id]; + if (idx !== undefined) { + const reasoningPart = message.parts[idx] as ReasoningUIPart; + reasoningPart.state = "done"; + reasoningPart.providerMetadata = mergeProviderMetadata( + reasoningPart.providerMetadata, part.providerMetadata, ); - activeReasoningPart = undefined; + delete activeReasoning[part.id]; } break; } @@ -212,50 +249,50 @@ export function applyUIMessageChunksIncremental( toolCallId: part.toolCallId, toolName: part.toolName, state: "input-streaming", - input: "", + input: undefined, } satisfies DynamicToolUIPart) : ({ type: `tool-${part.toolName}`, toolCallId: part.toolCallId, state: "input-streaming", - input: "", + input: undefined, providerExecuted: part.providerExecuted, } satisfies ToolUIPart); - toolPartsById.set(part.toolCallId, newToolPart); message.parts.push(newToolPart); + toolIndexById.set(part.toolCallId, message.parts.length - 1); + toolInputText[part.toolCallId] = ""; break; } case "tool-input-delta": { - const toolPart = toolPartsById.get(part.toolCallId); - if (!toolPart) { - console.warn(`tool-input-delta for unknown toolCallId ${part.toolCallId}`); - break; - } - const raw = typeof toolPart.input === "string" ? toolPart.input : ""; - const accumulated = raw + part.inputTextDelta; - try { - toolPart.input = JSON.parse(accumulated); - } catch { - toolPart.input = accumulated; + if (toolIndexById.has(part.toolCallId)) { + toolInputText[part.toolCallId] = + (toolInputText[part.toolCallId] ?? "") + part.inputTextDelta; + touchedTools.add(part.toolCallId); + } else { + console.warn( + `tool-input-delta for unknown toolCallId ${part.toolCallId}`, + ); } break; } case "tool-input-available": { - const toolPart = toolPartsById.get(part.toolCallId); + const toolPart = toolPartAt(part.toolCallId); if (toolPart) { transitionToolPart(toolPart, { state: "input-available", input: part.input, callProviderMetadata: mergeProviderMetadata( - (toolPart as { callProviderMetadata?: ProviderMetadata }).callProviderMetadata, + (toolPart as { callProviderMetadata?: ProviderMetadata }) + .callProviderMetadata, part.providerMetadata, ), }); } + touchedTools.delete(part.toolCallId); break; } case "tool-input-error": { - const toolPart = toolPartsById.get(part.toolCallId); + const toolPart = toolPartAt(part.toolCallId); if (toolPart) { transitionToolPart(toolPart, { state: "output-error", @@ -265,15 +302,17 @@ export function applyUIMessageChunksIncremental( ? { input: part.input } : { input: undefined, rawInput: part.input }), callProviderMetadata: mergeProviderMetadata( - (toolPart as { callProviderMetadata?: ProviderMetadata }).callProviderMetadata, + (toolPart as { callProviderMetadata?: ProviderMetadata }) + .callProviderMetadata, part.providerMetadata, ), }); } + touchedTools.delete(part.toolCallId); break; } case "tool-output-available": { - const toolPart = toolPartsById.get(part.toolCallId); + const toolPart = toolPartAt(part.toolCallId); if (toolPart) { transitionToolPart(toolPart, { state: "output-available", @@ -285,7 +324,7 @@ export function applyUIMessageChunksIncremental( break; } case "tool-output-error": { - const toolPart = toolPartsById.get(part.toolCallId); + const toolPart = toolPartAt(part.toolCallId); if (toolPart) { transitionToolPart(toolPart, { state: "output-error", @@ -296,14 +335,14 @@ export function applyUIMessageChunksIncremental( break; } case "tool-output-denied": { - const toolPart = toolPartsById.get(part.toolCallId); + const toolPart = toolPartAt(part.toolCallId); if (toolPart) { transitionToolPart(toolPart, { state: "output-denied" }); } break; } case "tool-approval-request": { - const toolPart = toolPartsById.get(part.toolCallId); + const toolPart = toolPartAt(part.toolCallId); if (toolPart) { transitionToolPart(toolPart, { state: "approval-requested", @@ -331,28 +370,76 @@ export function applyUIMessageChunksIncremental( providerMetadata: part.providerMetadata, }); break; + case "file": + message.parts.push({ + type: "file", + mediaType: part.mediaType, + url: part.url, + }); + break; case "start-step": message.parts.push({ type: "step-start" }); break; case "finish-step": - if (activeTextPart) { activeTextPart.state = "done"; activeTextPart = undefined; } - if (activeReasoningPart) { activeReasoningPart.state = "done"; activeReasoningPart = undefined; } - break; - case "abort": - case "error": - message.status = "failed"; + // Match the SDK: a new step starts fresh streaming parts; the prior + // parts keep their state rather than being forced to "done". + for (const id of Object.keys(activeText)) delete activeText[id]; + for (const id of Object.keys(activeReasoning)) delete activeReasoning[id]; break; case "start": case "finish": - case "file": + case "message-metadata": + mergeMetadata(part.messageMetadata); break; - default: + case "abort": + case "error": + // The stream-level status (statusFromStreamStatus) is authoritative and + // is applied by the caller; nothing to mutate on the message here. break; + default: { + if (typeof part.type === "string" && part.type.startsWith("data-")) { + const dataPart = part as Extract< + UIMessageChunk, + { type: `data-${string}` } + >; + const existingIdx = + dataPart.id != null + ? message.parts.findIndex( + (p) => + p.type === dataPart.type && + (p as { id?: string }).id === dataPart.id, + ) + : -1; + if (existingIdx >= 0) { + (message.parts[existingIdx] as { data?: unknown }).data = + dataPart.data; + } else { + message.parts.push( + dataPart as unknown as UIMessage["parts"][number], + ); + } + } else { + console.warn( + `applyUIMessageChunksIncremental: unhandled chunk type ${String(part.type)}`, + ); + } + break; + } + } + } + + // Repair-parse the accumulated input once per touched, still-streaming tool + // so `input` is a partial structured object during streaming, like the SDK. + for (const toolCallId of touchedTools) { + const toolPart = toolPartAt(toolCallId); + if (toolPart && toolPart.state === "input-streaming") { + const { value } = await parsePartialJson(toolInputText[toolCallId] ?? ""); + toolPart.input = value; } } message.text = joinText(message.parts); - return message; + return { message, streamState: { activeText, activeReasoning, toolInputText } }; } export async function deriveUIMessagesFromDeltas( diff --git a/src/react/useStreamingUIMessages.ts b/src/react/useStreamingUIMessages.ts index 60514c0b..d20e5122 100644 --- a/src/react/useStreamingUIMessages.ts +++ b/src/react/useStreamingUIMessages.ts @@ -6,10 +6,11 @@ import { type UIMessage } from "../UIMessages.js"; import { applyUIMessageChunksIncremental, blankUIMessage, + emptyIncrementalStreamState, getParts, statusFromStreamStatus, - updateFromUIMessageChunks, deriveUIMessagesFromTextStreamParts, + type IncrementalStreamState, } from "../deltas.js"; import { useDeltaStreams } from "./useDeltaStreams.js"; @@ -55,6 +56,7 @@ export function useStreamingUIMessages< { uiMessage: UIMessage; cursor: number; + streamState: IncrementalStreamState; } > >({}); @@ -70,7 +72,7 @@ export function useStreamingUIMessages< const existingStreamState = messageState[stream.streamMessage.streamId]; const lastDelta = stream.deltas.at(-1); const cursor = existingStreamState?.cursor; - if (!cursor) { + if (existingStreamState === undefined || cursor === undefined) { noNewDeltas = false; break; } @@ -97,6 +99,7 @@ export function useStreamingUIMessages< { uiMessage: UIMessage; cursor: number; + streamState: IncrementalStreamState; } > = Object.fromEntries( await Promise.all( @@ -105,38 +108,80 @@ export function useStreamingUIMessages< const existing = messageState[streamId]; const fromCursor = existing?.cursor ?? 0; const status = statusFromStreamStatus(streamMessage.status); + const prevState = + existing?.streamState ?? emptyIncrementalStreamState(); if (streamMessage.format !== "UIMessageChunk") { const existingStreams = existing - ? [{ streamId, cursor: existing.cursor, message: existing.uiMessage as UIMessage }] + ? [ + { + streamId, + cursor: existing.cursor, + message: existing.uiMessage as UIMessage, + }, + ] : []; - const [uiMessages, newStreams] = deriveUIMessagesFromTextStreamParts( - threadId as string, - [streamMessage], - existingStreams, - deltas, - ); - return [streamId, { - uiMessage: (uiMessages[0] ?? existing?.uiMessage) as UIMessage, - cursor: newStreams[0]?.cursor ?? fromCursor, - }]; + const [uiMessages, newStreams] = + deriveUIMessagesFromTextStreamParts( + threadId as string, + [streamMessage], + existingStreams, + deltas, + ); + return [ + streamId, + { + uiMessage: (uiMessages[0] ?? existing?.uiMessage) as UIMessage< + METADATA, + DATA_PARTS, + TOOLS + >, + cursor: newStreams[0]?.cursor ?? fromCursor, + streamState: prevState, + }, + ]; } - const { parts: newParts, cursor } = getParts(deltas, fromCursor); + const { parts: newParts, cursor } = getParts( + deltas, + fromCursor, + ); + + const base = + existing?.uiMessage ?? + blankUIMessage(streamMessage, threadId as string); if (newParts.length === 0) { if (existing && existing.uiMessage.status !== status) { - return [streamId, { uiMessage: { ...existing.uiMessage, status }, cursor: existing.cursor }]; + return [ + streamId, + { + uiMessage: { ...existing.uiMessage, status }, + cursor: existing.cursor, + streamState: prevState, + }, + ]; } - return [streamId, existing ?? { uiMessage: blankUIMessage(streamMessage, threadId as string), cursor: 0 }]; + return [ + streamId, + existing ?? { uiMessage: base, cursor: 0, streamState: prevState }, + ]; } - const base = existing?.uiMessage ?? blankUIMessage(streamMessage, threadId as string); - const uiMessage = fromCursor === 0 - ? await updateFromUIMessageChunks(base as UIMessage, newParts) - : applyUIMessageChunksIncremental(base as UIMessage, newParts); - uiMessage.status = status; - return [streamId, { uiMessage: uiMessage as UIMessage, cursor }]; + const { message, streamState } = await applyUIMessageChunksIncremental( + base as UIMessage, + newParts, + prevState, + ); + message.status = status; + return [ + streamId, + { + uiMessage: message as UIMessage, + cursor, + streamState, + }, + ]; }), ), ); From c11d91b511055599214f1f7d5133482f9cca960f Mon Sep 17 00:00:00 2001 From: Robel Estifanos Date: Wed, 27 May 2026 13:39:37 -0400 Subject: [PATCH 3/6] review: address CodeRabbit feedback (drop stale tool buffers, rename test) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - deltas.ts: drop `toolInputText[id]` from incremental state when a tool transitions to input-available / tool-input-error. The raw JSON is no longer needed and was being carried through every later batch on the hot path. - deltas.test.ts: rename the "O(N) not O(N²)" test to reflect what it actually proves (cursor slicing — each part handed to applyUIMessageChunksIncremental exactly once); document that the algorithmic claim is proven by the PR's 21,000 ms → 73 ms benchmark, not by the unit test. Skipped CodeRabbit's third nit ("parse-on-complete" assertion for the partial-input test): that would contradict the deliberate partial-JSON parse during streaming (deltas.ts:435-443), which mirrors the AI SDK's streamObject behavior and is documented inline. --- src/deltas.test.ts | 8 ++++++-- src/deltas.ts | 4 ++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/deltas.test.ts b/src/deltas.test.ts index d6f76206..d7f6c78c 100644 --- a/src/deltas.test.ts +++ b/src/deltas.test.ts @@ -580,7 +580,7 @@ describe("mergeDeltas", () => { ]); }); - it("incremental processing of tool-input-delta chunks is O(N) not O(N²)", async () => { + it("incremental apply only consumes parts past the cursor (no re-processing)", async () => { const N = 500; const streamId = "s-perf"; const toolCallId = "tool-0"; @@ -643,7 +643,11 @@ describe("mergeDeltas", () => { } } - // O(N): each delta part processed exactly once (N tool-input-deltas + 3 preamble parts) + // Each delta part is handed to applyUIMessageChunksIncremental exactly + // once across all batches (cursor slicing — no re-processing of prior + // parts). N tool-input-deltas + 3 preamble parts. The end-to-end O(N) + // claim is proven by the PR's 21,000 ms → 73 ms benchmark, not by this + // unit test. expect(totalPartsProcessed).toBe(N + 3); // Correctness: the raw accumulator holds "x" repeated N times across batches diff --git a/src/deltas.ts b/src/deltas.ts index 767fa979..f3d25baf 100644 --- a/src/deltas.ts +++ b/src/deltas.ts @@ -289,6 +289,9 @@ export async function applyUIMessageChunksIncremental( }); } touchedTools.delete(part.toolCallId); + // The raw JSON buffer is no longer needed; drop it so it doesn't get + // carried through every later batch on the hot path. + delete toolInputText[part.toolCallId]; break; } case "tool-input-error": { @@ -309,6 +312,7 @@ export async function applyUIMessageChunksIncremental( }); } touchedTools.delete(part.toolCallId); + delete toolInputText[part.toolCallId]; break; } case "tool-output-available": { From ff22c69ab7dbe49b7a3d0f130d8fd1d330f3dde7 Mon Sep 17 00:00:00 2001 From: Robel Estifanos Date: Wed, 27 May 2026 13:17:48 -0400 Subject: [PATCH 4/6] fix(deltas): drop async from applyUIMessageChunksIncremental MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit parsePartialJson was the only await — replace with JSON.parse + try/catch. JSON.parse only materializes input when JSON is complete; parsePartialJson repair-parsed partial JSON eagerly, which could corrupt the accumulator if input was used before parsing finished. Fixes the test asserting old eager-parse behavior and removes the parsePartialJson import that was added but not needed. --- src/deltas.test.ts | 38 ++++++++++++++--------------- src/deltas.ts | 14 +++++------ src/react/useStreamingUIMessages.ts | 2 +- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/deltas.test.ts b/src/deltas.test.ts index d7f6c78c..23a21329 100644 --- a/src/deltas.test.ts +++ b/src/deltas.test.ts @@ -634,7 +634,7 @@ describe("mergeDeltas", () => { if (newParts.length > 0) { totalPartsProcessed += newParts.length; ({ message: uiMessage, streamState } = - await applyUIMessageChunksIncremental( + applyUIMessageChunksIncremental( structuredClone(uiMessage), newParts, streamState, @@ -669,7 +669,7 @@ describe("mergeDeltas", () => { }; let msg = blankUIMessage(streamMessage, "thread-text"); let state = emptyIncrementalStreamState(); - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [ { type: "start" }, @@ -679,12 +679,12 @@ describe("mergeDeltas", () => { ] as UIMessageChunk[], state, )); - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [{ type: "text-delta", id: "t0", delta: "world" }] as UIMessageChunk[], state, )); - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [ { type: "text-delta", id: "t0", delta: "!" }, @@ -712,7 +712,7 @@ describe("mergeDeltas", () => { }; let msg = blankUIMessage(streamMessage, "thread-tool-out"); let state = emptyIncrementalStreamState(); - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [ { type: "start" }, @@ -727,7 +727,7 @@ describe("mergeDeltas", () => { ] as UIMessageChunk[], state, )); - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [ { @@ -762,7 +762,7 @@ describe("mergeDeltas", () => { }; let msg = blankUIMessage(streamMessage, "thread-tool-err"); let state = emptyIncrementalStreamState(); - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [ { type: "start" }, @@ -771,7 +771,7 @@ describe("mergeDeltas", () => { ] as UIMessageChunk[], state, )); - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [ { @@ -798,7 +798,7 @@ describe("mergeDeltas", () => { }); }); - it("accumulates tool input across a batch boundary (parsePartialJson)", async () => { + it("accumulates tool input across a batch boundary", async () => { const streamMessage = { streamId: "s-tool-split", status: "streaming" as const, @@ -811,7 +811,7 @@ describe("mergeDeltas", () => { let state = emptyIncrementalStreamState(); // Batch A: preamble + the first half of the JSON input. - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [ { type: "start" }, @@ -824,11 +824,11 @@ describe("mergeDeltas", () => { const afterA = msg.parts.find( (p): p is ToolUIPart => "toolCallId" in p && p.toolCallId === "c1", ); - // Mid-stream input is a partial structured object, not a raw string. - expect(afterA?.input).toEqual({ a: 1 }); + // Mid-stream: JSON is incomplete, input stays unset. + expect(afterA?.input).toBeUndefined(); // Batch B: the remainder of the JSON input. - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [ { type: "tool-input-delta", toolCallId: "c1", inputTextDelta: ',"b":2}' }, @@ -838,7 +838,7 @@ describe("mergeDeltas", () => { const afterB = msg.parts.find( (p): p is ToolUIPart => "toolCallId" in p && p.toolCallId === "c1", ); - // The batch-A accumulation is preserved, not dropped. + // Complete JSON is parsed once the accumulator is valid. expect(afterB?.input).toEqual({ a: 1, b: 2 }); expect(state.toolInputText["c1"]).toBe('{"a":1,"b":2}'); }); @@ -854,7 +854,7 @@ describe("mergeDeltas", () => { }; let msg = blankUIMessage(streamMessage, "thread-file-meta"); let state = emptyIncrementalStreamState(); - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [ { type: "start" }, @@ -862,7 +862,7 @@ describe("mergeDeltas", () => { ] as UIMessageChunk[], state, )); - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [ { @@ -894,7 +894,7 @@ describe("mergeDeltas", () => { }; let msg = blankUIMessage(streamMessage, "thread-multi-text"); let state = emptyIncrementalStreamState(); - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [ { type: "start" }, @@ -906,7 +906,7 @@ describe("mergeDeltas", () => { state, )); // Deltas in a later batch must land on the part matching their id. - ({ message: msg, streamState: state } = await applyUIMessageChunksIncremental( + ({ message: msg, streamState: state } = applyUIMessageChunksIncremental( msg, [ { type: "text-delta", id: "t1", delta: "B" }, @@ -972,7 +972,7 @@ describe("mergeDeltas", () => { let state = emptyIncrementalStreamState(); for (const batch of batches) { ({ message: incMsg, streamState: state } = - await applyUIMessageChunksIncremental(incMsg, batch, state)); + applyUIMessageChunksIncremental(incMsg, batch, state)); } expect(incMsg.parts).toEqual(sdkMsg.parts); diff --git a/src/deltas.ts b/src/deltas.ts index f3d25baf..15fde2c2 100644 --- a/src/deltas.ts +++ b/src/deltas.ts @@ -1,5 +1,4 @@ import { - parsePartialJson, readUIMessageStream, type DynamicToolUIPart, type ProviderMetadata, @@ -137,11 +136,11 @@ export function emptyIncrementalStreamState(): IncrementalStreamState { * indices stay stable across the structuredClone between batches. Behavior * mirrors the AI SDK's processUIMessageStream. */ -export async function applyUIMessageChunksIncremental( +export function applyUIMessageChunksIncremental( uiMessage: UIMessage, newParts: UIMessageChunk[], prev: IncrementalStreamState, -): Promise<{ message: UIMessage; streamState: IncrementalStreamState }> { +): { message: UIMessage; streamState: IncrementalStreamState } { const message: UIMessage = structuredClone(uiMessage); const activeText: Record = { ...prev.activeText }; const activeReasoning: Record = { ...prev.activeReasoning }; @@ -432,13 +431,14 @@ export async function applyUIMessageChunksIncremental( } } - // Repair-parse the accumulated input once per touched, still-streaming tool - // so `input` is a partial structured object during streaming, like the SDK. for (const toolCallId of touchedTools) { const toolPart = toolPartAt(toolCallId); if (toolPart && toolPart.state === "input-streaming") { - const { value } = await parsePartialJson(toolInputText[toolCallId] ?? ""); - toolPart.input = value; + try { + toolPart.input = JSON.parse(toolInputText[toolCallId] ?? ""); + } catch { + // partial JSON — leave input unset until complete + } } } diff --git a/src/react/useStreamingUIMessages.ts b/src/react/useStreamingUIMessages.ts index d20e5122..203ab7e6 100644 --- a/src/react/useStreamingUIMessages.ts +++ b/src/react/useStreamingUIMessages.ts @@ -168,7 +168,7 @@ export function useStreamingUIMessages< ]; } - const { message, streamState } = await applyUIMessageChunksIncremental( + const { message, streamState } = applyUIMessageChunksIncremental( base as UIMessage, newParts, prevState, From 00c129c164caea493c2180242805407be87b7072 Mon Sep 17 00:00:00 2001 From: Robel Estifanos Date: Wed, 27 May 2026 13:28:05 -0400 Subject: [PATCH 5/6] chore(deltas): delete legacy TextStreamPart streaming path All agent streams now write UIMessageChunk. The text-format path (updateFromTextStreamParts, deriveUIMessagesFromTextStreamParts) has been dead code since streaming.ts switched to DeltaStreamer. Removes ~900 lines including tests. --- src/client/streaming.integration.test.ts | 72 ---- src/deltas.test.ts | 429 +---------------------- src/deltas.ts | 387 +------------------- src/react/useStreamingUIMessages.ts | 32 -- 4 files changed, 9 insertions(+), 911 deletions(-) diff --git a/src/client/streaming.integration.test.ts b/src/client/streaming.integration.test.ts index 45fbc6fd..1b5bf4a3 100644 --- a/src/client/streaming.integration.test.ts +++ b/src/client/streaming.integration.test.ts @@ -12,7 +12,6 @@ import { import { getParts, deriveUIMessagesFromDeltas, - deriveUIMessagesFromTextStreamParts, } from "../deltas.js"; import type { TestConvex } from "convex-test"; import type { StreamDelta, StreamMessage } from "../validators.js"; @@ -677,77 +676,6 @@ describe("Delta Stream Consumption", () => { expect((parts[0] as { type: string }).type).toBe("new"); expect(cursor).toBe(6); }); - - test("TextStreamPart format delta reconstruction with tool calls", () => { - const streamId = "s1"; - const streamMessage: StreamMessage = { - streamId, - order: 1, - stepOrder: 0, - status: "streaming", - }; - const deltas: StreamDelta[] = [ - { - streamId, - start: 0, - end: 1, - parts: [{ type: "text-delta", id: "txt-0", text: "Let me call a tool. " }], - }, - { - streamId, - start: 1, - end: 2, - parts: [ - { - type: "tool-call", - toolCallId: "tc1", - toolName: "search", - input: { query: "hello" }, - }, - ], - }, - { - streamId, - start: 2, - end: 3, - parts: [ - { - type: "tool-result", - toolCallId: "tc1", - toolName: "search", - output: "Found 3 results", - }, - ], - }, - { - streamId, - start: 3, - end: 4, - parts: [ - { type: "text-delta", id: "txt-1", text: "Here are the results." }, - ], - }, - ]; - - const [messages, , changed] = deriveUIMessagesFromTextStreamParts( - "thread1", - [streamMessage], - [], - deltas, - ); - - expect(messages).toHaveLength(1); - expect(changed).toBe(true); - - const msg = messages[0]; - expect(msg.text).toContain("Let me call a tool."); - expect(msg.text).toContain("Here are the results."); - - const toolParts = msg.parts.filter((p: any) => - p.type.startsWith("tool-"), - ); - expect(toolParts.length).toBeGreaterThan(0); - }); }); // ============================================================================ diff --git a/src/deltas.test.ts b/src/deltas.test.ts index 23a21329..13e940c2 100644 --- a/src/deltas.test.ts +++ b/src/deltas.test.ts @@ -2,15 +2,12 @@ import { describe, it, expect } from "vitest"; import { applyUIMessageChunksIncremental, blankUIMessage, - deriveUIMessagesFromTextStreamParts, emptyIncrementalStreamState, getParts, - updateFromTextStreamParts, updateFromUIMessageChunks, } from "./deltas.js"; import type { StreamMessage, StreamDelta } from "./validators.js"; -import { omit } from "convex-helpers"; -import type { Tool, ToolUIPart, TypedToolResult, UIMessageChunk } from "ai"; +import type { ToolUIPart, UIMessageChunk } from "ai"; describe("UIMessageChunks", () => { it("updates a UIMessage with a tool call and follow up", async () => { @@ -203,383 +200,6 @@ describe("UIMessageChunks - continuation stream", () => { }); describe("mergeDeltas", () => { - it("merges a single text-delta into a message", () => { - const streamId = "s1"; - const deltas = [ - { - streamId, - start: 0, - end: 5, - parts: [{ type: "text-delta", id: "1", text: "Hello" }], - } satisfies StreamDelta, - ]; - const [messages, newStreams, changed] = deriveUIMessagesFromTextStreamParts( - "thread1", - [{ streamId, order: 1, stepOrder: 0, status: "streaming" }], - [], - deltas, - ); - expect(messages).toHaveLength(1); - expect(messages[0].text).toBe("Hello"); - expect(messages[0].role).toBe("assistant"); - expect(changed).toBe(true); - expect(newStreams[0].cursor).toBe(5); - }); - - it("merges multiple deltas for the same stream", () => { - const streamId = "s1"; - const deltas = [ - { - streamId, - start: 0, - end: 5, - parts: [{ type: "text-delta", id: "1", text: "Hello" }], - }, - { - streamId, - start: 5, - end: 11, - parts: [{ type: "text-delta", id: "2", text: " World!" }], - }, - ]; - const [messages, newStreams, changed] = deriveUIMessagesFromTextStreamParts( - "thread1", - [{ streamId, order: 1, stepOrder: 0, status: "streaming" }], - [], - deltas, - ); - expect(messages).toHaveLength(1); - expect(messages[0].text).toBe("Hello World!"); - expect(changed).toBe(true); - expect(newStreams[0].cursor).toBe(11); - }); - - it("handles tool-call and tool-result parts", () => { - const streamId = "s2"; - const deltas = [ - { - streamId, - start: 0, - end: 1, - parts: [ - { - type: "tool-call", - toolCallId: "call1", - toolName: "myTool", - input: "What's the meaning of life?", - }, - ], - } satisfies StreamDelta, - { - streamId, - start: 1, - end: 2, - parts: [ - { - type: "tool-result", - toolCallId: "call1", - toolName: "myTool", - input: undefined, - output: "42", - } satisfies TypedToolResult<{ myTool: Tool }>, - ], - } satisfies StreamDelta, - ]; - const [[message], _, changed] = deriveUIMessagesFromTextStreamParts( - "thread1", - [{ streamId, order: 2, stepOrder: 0, status: "streaming" }], - [], - deltas, - ); - expect(message).toBeDefined(); - expect(message.role).toBe("assistant"); - const content = message.parts; - expect(content).toEqual([ - { - type: "tool-myTool", - toolCallId: "call1", - input: "What's the meaning of life?", - output: "42", - state: "output-available", - } satisfies ToolUIPart, - ]); - expect(changed).toBe(true); - }); - - it("returns changed=false if no new deltas", () => { - const streamId = "s3"; - const deltas: StreamDelta[] = []; - const [, newStreams, changed] = deriveUIMessagesFromTextStreamParts( - "thread1", - [{ streamId, order: 3, stepOrder: 0, status: "streaming" }], - [], - deltas, - ); - expect(changed).toBe(false); - expect(newStreams[0].cursor).toBe(0); - }); - - it("handles multiple streams and sorts by order/stepOrder", () => { - const deltas = [ - { - streamId: "s2", - start: 0, - end: 3, - parts: [{ type: "text-delta", id: "1", text: "B" }], - } satisfies StreamDelta, - { - streamId: "s1", - start: 0, - end: 3, - parts: [{ type: "text-delta", id: "2", text: "A" }], - } satisfies StreamDelta, - ]; - const [messages, _, changed] = deriveUIMessagesFromTextStreamParts( - "thread1", - [ - { streamId: "s1", order: 1, stepOrder: 0, status: "streaming" }, - { streamId: "s2", order: 2, stepOrder: 0, status: "streaming" }, - ], - [], - deltas, - ); - expect(messages).toHaveLength(2); - expect(messages[0].text).toBe("A"); - expect(messages[1].text).toBe("B"); - expect(changed).toBe(true); - // Sorted by order - expect(messages[0].order).toBe(1); - expect(messages[1].order).toBe(2); - }); - - it("does not duplicate text content when merging sequential text-deltas", () => { - const streamId = "s4"; - const deltas = [ - { - streamId, - start: 0, - end: 5, - parts: [{ type: "text-delta", id: "1", text: "Hello" }], - }, - { - streamId, - start: 5, - end: 11, - parts: [{ type: "text-delta", id: "2", text: " World!" }], - }, - { - streamId, - start: 11, - end: 12, - parts: [{ type: "text-delta", id: "3", text: "!" }], - }, - ] satisfies StreamDelta[]; - const [messages] = deriveUIMessagesFromTextStreamParts( - "thread1", - [{ streamId, order: 4, stepOrder: 0, status: "streaming" }], - [], - deltas, - ); - expect(messages).toHaveLength(1); - expect(messages[0].text).toBe("Hello World!!"); - // There should only be one text part per message - const content = messages[0].parts; - if (Array.isArray(content)) { - const textParts = content.filter((p) => p.type === "text"); - expect(textParts).toHaveLength(1); - expect(textParts[0].text).toBe("Hello World!!"); - } - }); - - it("does not duplicate reasoning parts", () => { - const streamId = "s6"; - const deltas = [ - { - streamId, - start: 0, - end: 1, - parts: [ - { type: "reasoning-start", id: "1" }, - { type: "reasoning-delta", id: "1", text: "I'm thinking..." }, - ], - }, - { - streamId, - start: 1, - end: 2, - parts: [ - { type: "reasoning-delta", id: "1", text: " Still thinking..." }, - ], - }, - { - streamId, - start: 2, - end: 3, - parts: [{ type: "reasoning-end", id: "1" }], - }, - ]; - const [messages] = deriveUIMessagesFromTextStreamParts( - "thread1", - [{ streamId, order: 6, stepOrder: 0, status: "streaming" }], - [], - deltas, - ); - expect(messages).toHaveLength(1); - if (Array.isArray(messages[0].parts)) { - const reasoningParts = messages[0].parts.filter( - (p) => p.type === "reasoning", - ); - expect(reasoningParts).toHaveLength(1); - expect(reasoningParts[0].text).toBe("I'm thinking... Still thinking..."); - expect(reasoningParts[0].state).toBe("done"); - } - }); - - it("applyDeltasToStreamMessage is idempotent and does not duplicate content", () => { - const streamId = "s7"; - const streamMessage = { - streamId, - order: 7, - stepOrder: 0, - status: "streaming", - } satisfies StreamMessage; - const deltas = [ - { - streamId, - start: 0, - end: 5, - parts: [{ type: "text-delta", id: "1", text: "Hello" }], - }, - { - streamId, - start: 5, - end: 11, - parts: [{ type: "text-delta", id: "2", text: " World!" }], - }, - ]; - // First call: apply both deltas - let [result, changed] = updateFromTextStreamParts( - "thread1", - streamMessage, - undefined, - deltas, - ); - expect(result.message.text).toBe("Hello World!"); - // Second call: re-apply the same deltas (should not duplicate) - [result, changed] = updateFromTextStreamParts( - "thread1", - streamMessage, - result, - deltas, - ); - expect(result.message.text).toBe("Hello World!"); - // Third call: add a new delta - const moreDeltas = [ - ...deltas, - { - streamId, - start: 11, - end: 12, - parts: [{ type: "text-delta", id: "3", text: "!" }], - }, - ]; - [result, changed] = updateFromTextStreamParts( - "thread1", - streamMessage, - result, - moreDeltas, - ); - expect(changed).toBe(true); - expect(result.message.text).toBe("Hello World!!"); - // Re-apply all deltas again (should still not duplicate) - [result, changed] = updateFromTextStreamParts( - "thread1", - streamMessage, - result, - moreDeltas, - ); - expect(changed).toBe(false); - expect(result.message.text).toBe("Hello World!!"); - }); - - it("mergeDeltas is pure and does not mutate inputs", () => { - const streamId = "s8"; - const streamMessages = [ - { streamId, order: 8, stepOrder: 0, status: "streaming" }, - ] satisfies StreamMessage[]; - const deltas = [ - { - streamId, - start: 0, - end: 5, - parts: [{ type: "text-delta", id: "1", text: "Hello" }], - }, - { - streamId, - start: 5, - end: 11, - parts: [{ type: "text-delta", id: "2", text: " World!" }], - }, - ]; - // Deep freeze inputs to catch mutation - function deepFreeze(obj: unknown): unknown { - if (obj && typeof obj === "object" && !Object.isFrozen(obj)) { - Object.freeze(obj); - for (const key of Object.keys(obj)) { - deepFreeze((obj as Record)[key]); - } - } - return obj; - } - deepFreeze(streamMessages); - deepFreeze(deltas); - const [messages1, streams1, changed1] = deriveUIMessagesFromTextStreamParts( - "thread1", - streamMessages, - [], - deltas, - ); - const [messages2, streams2, changed2] = deriveUIMessagesFromTextStreamParts( - "thread1", - streamMessages, - [], - deltas, - ); - expect(messages1.map((m) => omit(m, ["_creationTime"]))).toEqual( - messages2.map((m) => omit(m, ["_creationTime"])), - ); - expect( - streams1.map((s) => ({ - ...s, - message: omit(s.message, ["_creationTime"]), - })), - ).toEqual( - streams2.map((s) => ({ - ...s, - message: omit(s.message, ["_creationTime"]), - })), - ); - expect(changed1).toBe(changed2); - // Inputs should remain unchanged - expect(streamMessages).toMatchObject([ - { streamId, order: 8, stepOrder: 0, status: "streaming" }, - ]); - expect(deltas).toEqual([ - { - streamId, - start: 0, - end: 5, - parts: [{ type: "text-delta", id: "1", text: "Hello" }], - }, - { - streamId, - start: 5, - end: 11, - parts: [{ type: "text-delta", id: "2", text: " World!" }], - }, - ]); - }); - it("incremental apply only consumes parts past the cursor (no re-processing)", async () => { const N = 500; const streamId = "s-perf"; @@ -978,51 +598,4 @@ describe("mergeDeltas", () => { expect(incMsg.parts).toEqual(sdkMsg.parts); expect(incMsg.text).toBe(sdkMsg.text); }); - - it("handles streaming tool-approval-request and updates tool state", () => { - const streamId = "s10"; - const deltas = [ - { - streamId, - start: 0, - end: 1, - parts: [ - { - type: "tool-call", - toolCallId: "call1", - toolName: "dangerousTool", - input: { action: "delete" }, - }, - ], - } satisfies StreamDelta, - { - streamId, - start: 1, - end: 2, - parts: [ - { - type: "tool-approval-request", - toolCallId: "call1", - approvalId: "approval1", - }, - ], - } satisfies StreamDelta, - ]; - const [[message], _, changed] = deriveUIMessagesFromTextStreamParts( - "thread1", - [{ streamId, order: 10, stepOrder: 0, status: "streaming" }], - [], - deltas, - ); - expect(message).toBeDefined(); - expect(message.role).toBe("assistant"); - expect(changed).toBe(true); - - const toolPart = message.parts.find( - (p) => p.type === "tool-dangerousTool", - ) as any; - expect(toolPart).toBeDefined(); - expect(toolPart.state).toBe("approval-requested"); - expect(toolPart.approval).toEqual({ id: "approval1" }); - }); }); diff --git a/src/deltas.ts b/src/deltas.ts index 15fde2c2..fdd61ec6 100644 --- a/src/deltas.ts +++ b/src/deltas.ts @@ -3,9 +3,7 @@ import { type DynamicToolUIPart, type ProviderMetadata, type ReasoningUIPart, - type TextStreamPart, type TextUIPart, - type ToolSet, type ToolUIPart, type UIMessageChunk, } from "ai"; @@ -453,73 +451,17 @@ export async function deriveUIMessagesFromDeltas( ): Promise { const messages: UIMessage[] = []; for (const streamMessage of streamMessages) { - if (streamMessage.format === "UIMessageChunk") { - const { parts } = getParts( - allDeltas.filter((d) => d.streamId === streamMessage.streamId), - 0, - ); - const uiMessage = await updateFromUIMessageChunks( - blankUIMessage(streamMessage, threadId), - parts, - ); - messages.push(uiMessage); - } else { - const [uiMessages] = deriveUIMessagesFromTextStreamParts( - threadId, - [streamMessage], - [], - allDeltas, - ); - messages.push(...uiMessages); - } - } - return sorted(messages); -} - -export function deriveUIMessagesFromTextStreamParts( - threadId: string, - streamMessages: StreamMessage[], - existingStreams: Array<{ - streamId: string; - cursor: number; - message: UIMessage; - }>, - allDeltas: StreamDelta[], -): [ - UIMessage[], - Array<{ streamId: string; cursor: number; message: UIMessage }>, - boolean, -] { - const newStreams: Array<{ - streamId: string; - cursor: number; - message: UIMessage; - }> = []; - let changed = false; - for (const streamMessage of streamMessages) { - const deltas = allDeltas.filter( - (d) => d.streamId === streamMessage.streamId, + const { parts } = getParts( + allDeltas.filter((d) => d.streamId === streamMessage.streamId), + 0, ); - const existing = existingStreams.find( - (s) => s.streamId === streamMessage.streamId, + const uiMessage = await updateFromUIMessageChunks( + blankUIMessage(streamMessage, threadId), + parts, ); - const [newStream, messageChanged] = updateFromTextStreamParts( - threadId, - streamMessage, - existing, - deltas, - ); - newStreams.push(newStream); - if (messageChanged) changed = true; - } - for (const { streamId } of existingStreams) { - if (!newStreams.find((s) => s.streamId === streamId)) { - // There's a stream that's no longer active. - changed = true; - } + messages.push(uiMessage); } - const messages = sorted(newStreams.map((s) => s.message)); - return [messages, newStreams, changed]; + return sorted(messages); } export function getParts( @@ -553,319 +495,6 @@ export function getParts( return { parts, cursor }; } -export function updateFromTextStreamParts( - threadId: string, - streamMessage: StreamMessage, - existing: - | { streamId: string; cursor: number; message: UIMessage } - | undefined, - deltas: StreamDelta[], -): [{ streamId: string; cursor: number; message: UIMessage }, boolean] { - const { cursor, parts } = getParts>( - deltas, - existing?.cursor, - ); - const changed = - parts.length > 0 || - (existing && - statusFromStreamStatus(streamMessage.status) !== existing.message.status); - const existingMessage = - existing?.message ?? blankUIMessage(streamMessage, threadId); - if (!changed) { - return [ - existing ?? { - streamId: streamMessage.streamId, - cursor, - message: existingMessage, - }, - false, - ]; - } - - const message: UIMessage = structuredClone(existingMessage); - message.status = statusFromStreamStatus(streamMessage.status); - - const textPartsById = new Map(); - const toolPartsById = new Map( - message.parts - .filter( - (p): p is ToolUIPart | DynamicToolUIPart => - p.type.startsWith("tool-") || p.type === "dynamic-tool", - ) - .map((p) => [p.toolCallId, p]), - ); - const reasoningPartsById = new Map(); - - for (const part of parts) { - switch (part.type) { - case "text-start": - case "text-delta": { - if (!textPartsById.has(part.id)) { - const lastPart = message.parts.at(-1); - if (lastPart?.type === "text") { - textPartsById.set(part.id, lastPart); - } else { - const newPart = { - type: "text", - text: "", - providerMetadata: part.providerMetadata, - } satisfies TextUIPart; - textPartsById.set(part.id, newPart); - message.parts.push(newPart); - } - } - if (part.type === "text-delta") { - const textPart = textPartsById.get(part.id)!; - textPart.text += part.text; - textPart.providerMetadata = mergeProviderMetadata( - textPart.providerMetadata, - part.providerMetadata, - ); - } - break; - } - case "tool-input-start": { - let newPart: ToolUIPart | DynamicToolUIPart; - if (part.dynamic) { - newPart = { - type: "dynamic-tool", - toolCallId: part.id, - toolName: part.toolName, - state: "input-streaming", - input: "", - } satisfies DynamicToolUIPart; - } else { - newPart = { - type: `tool-${part.toolName}`, - toolCallId: part.id, - state: "input-streaming", - input: "", - providerExecuted: part.providerExecuted, - } satisfies ToolUIPart; - } - toolPartsById.set(part.id, newPart); - message.parts.push(newPart); - break; - } - case "tool-input-delta": - { - const toUpdate = toolPartsById.get(part.id); - assert( - toUpdate, - `Expected to find tool call part ${part.id} to update`, - ); - toUpdate.input = (toUpdate.input ?? "") + part.delta; - } - break; - case "tool-input-end": - { - const toUpdate = toolPartsById.get(part.id); - assert( - toUpdate, - `Expected to find tool call part ${part.id} to update`, - ); - toUpdate.state = "input-available"; - if (part.providerMetadata) { - const updatable = toUpdate as Extract< - ToolUIPart | DynamicToolUIPart, - { state: "input-available" } - >; - updatable.callProviderMetadata = mergeProviderMetadata( - updatable.callProviderMetadata, - part.providerMetadata, - ); - } - } - break; - case "tool-call": { - let newPart: ToolUIPart | DynamicToolUIPart; - if (part.dynamic) { - newPart = { - type: "dynamic-tool", - toolCallId: part.toolCallId, - toolName: part.toolName, - input: part.input, - state: "input-available", - }; - } else { - newPart = { - type: `tool-${part.toolName}`, - toolCallId: part.toolCallId, - input: part.input, - state: "input-available", - }; - if (part.providerExecuted) { - newPart.providerExecuted = part.providerExecuted; - } - } - if (part.providerMetadata) { - newPart.callProviderMetadata = part.providerMetadata; - } - if (toolPartsById.has(part.toolCallId)) { - const toUpdate = toolPartsById.get(part.toolCallId)!; - Object.assign(toUpdate, newPart); - } else { - toolPartsById.set(part.toolCallId, newPart); - message.parts.push(newPart); - } - break; - } - case "tool-result": { - const toolCall = toolPartsById.get(part.toolCallId); - assert( - toolCall, - `Expected to find tool call part ${part.toolCallId} to update with result`, - ); - let newPart: ToolUIPart | DynamicToolUIPart; - if (toolCall.type === "dynamic-tool") { - newPart = { - ...toolCall, - state: "output-available", - input: part.input ?? toolCall.input, - output: part.output ?? toolCall.output, - ...pick(part, ["preliminary"]), - } as DynamicToolUIPart; - } else { - newPart = { - ...toolCall, - state: "output-available", - input: part.input ?? toolCall.input, - output: part.output ?? toolCall.output, - preliminary: part.preliminary, - } as ToolUIPart; - } - Object.assign(toolCall, newPart); - break; - } - case "reasoning-start": - case "reasoning-delta": { - if (!reasoningPartsById.has(part.id)) { - const lastPart = message.parts.at(-1); - if (lastPart?.type === "reasoning") { - reasoningPartsById.set(part.id, lastPart); - } else { - const newPart = { - type: "reasoning", - state: "streaming", - text: "", - providerMetadata: part.providerMetadata, - } satisfies ReasoningUIPart; - reasoningPartsById.set(part.id, newPart); - message.parts.push(newPart); - } - } - const reasoningPart = reasoningPartsById.get(part.id)!; - if (part.type === "reasoning-delta") { - reasoningPart.text += part.text; - reasoningPart.providerMetadata = mergeProviderMetadata( - reasoningPart.providerMetadata, - part.providerMetadata, - ); - } - break; - } - case "reasoning-end": { - const reasoningPart = - reasoningPartsById.get(part.id) ?? - message.parts.find( - (p): p is ReasoningUIPart => - p.type === "reasoning" && p.state === "streaming", - )!; - if (reasoningPart) { - reasoningPart.state = "done"; - } else { - console.warn( - `Expected to find reasoning part ${part.id} to finish, but found none`, - ); - } - break; - } - case "source": - if (part.sourceType === "url") { - message.parts.push({ - type: "source-url", - url: part.url, - sourceId: part.id, - providerMetadata: part.providerMetadata, - title: part.title, - }); - } else if (part.sourceType === "document") { - message.parts.push({ - type: "source-document", - mediaType: part.mediaType, - sourceId: part.id, - title: part.title, - filename: part.filename, - providerMetadata: part.providerMetadata, - }); - } else { - console.warn("Got source part with unknown source type", part); - } - break; - case "abort": - message.status = "failed"; - break; - case "error": - message.status = "failed"; - console.warn("Generation failed with error", part.error); - break; - case "tool-error": { - const toolPart = toolPartsById.get(part.toolCallId); - if (toolPart) { - toolPart.errorText = getErrorMessage(part.error); - } - break; - } - case "tool-approval-request": { - const typedPart = part as unknown as { - type: "tool-approval-request"; - toolCallId: string; - approvalId: string; - }; - const toolPart = toolPartsById.get(typedPart.toolCallId); - if (toolPart) { - toolPart.state = "approval-requested"; - (toolPart as ToolUIPart & { approval?: object }).approval = { - id: typedPart.approvalId, - }; - } else { - console.warn( - `Expected tool call part ${typedPart.toolCallId} for approval request`, - ); - } - break; - } - case "file": - case "text-end": - case "finish-step": - case "finish": - case "raw": - case "start-step": - case "start": - break; - default: { - console.warn(`Received unexpected part: ${JSON.stringify(part)}`); - break; - } - } - } - for (let i = 0; i < message.parts.length - 1; i++) { - const part = message.parts[i]; - if (part.type === "reasoning") { - part.state = "done"; - } - } - message.text = joinText(message.parts); - return [ - { - streamId: streamMessage.streamId, - cursor, - message, - }, - true, - ]; -} - function mergeProviderMetadata( existing: ProviderMetadata | undefined, part: ProviderMetadata | undefined, diff --git a/src/react/useStreamingUIMessages.ts b/src/react/useStreamingUIMessages.ts index 203ab7e6..3786d8fb 100644 --- a/src/react/useStreamingUIMessages.ts +++ b/src/react/useStreamingUIMessages.ts @@ -9,7 +9,6 @@ import { emptyIncrementalStreamState, getParts, statusFromStreamStatus, - deriveUIMessagesFromTextStreamParts, type IncrementalStreamState, } from "../deltas.js"; import { useDeltaStreams } from "./useDeltaStreams.js"; @@ -111,37 +110,6 @@ export function useStreamingUIMessages< const prevState = existing?.streamState ?? emptyIncrementalStreamState(); - if (streamMessage.format !== "UIMessageChunk") { - const existingStreams = existing - ? [ - { - streamId, - cursor: existing.cursor, - message: existing.uiMessage as UIMessage, - }, - ] - : []; - const [uiMessages, newStreams] = - deriveUIMessagesFromTextStreamParts( - threadId as string, - [streamMessage], - existingStreams, - deltas, - ); - return [ - streamId, - { - uiMessage: (uiMessages[0] ?? existing?.uiMessage) as UIMessage< - METADATA, - DATA_PARTS, - TOOLS - >, - cursor: newStreams[0]?.cursor ?? fromCursor, - streamState: prevState, - }, - ]; - } - const { parts: newParts, cursor } = getParts( deltas, fromCursor, From 995691a79fe80a51529aa326c63e0a4cc2ed0d7f Mon Sep 17 00:00:00 2001 From: Robel Estifanos Date: Wed, 27 May 2026 13:48:14 -0400 Subject: [PATCH 6/6] fix(deltas): address CodeRabbit feedback on TextStreamPart deletion - deriveUIMessagesFromDeltas: add format guard to fail fast on non-UIMessageChunk streams rather than silently processing them through the wrong path - useStreamingUIMessages: replace .at(-1) fast-path with delta.end > cursor scan so unsorted delta arrays don't cause spurious noNewDeltas early returns - deltas.test.ts: drop unnecessary async from tests that don't await - streaming.integration.test.ts: add format: UIMessageChunk to StreamMessage fixtures used by deriveUIMessagesFromDeltas status-mapping test --- src/client/streaming.integration.test.ts | 3 +++ src/deltas.test.ts | 4 ++-- src/deltas.ts | 5 +++++ src/react/useStreamingUIMessages.ts | 3 +-- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/client/streaming.integration.test.ts b/src/client/streaming.integration.test.ts index 1b5bf4a3..68bfbca2 100644 --- a/src/client/streaming.integration.test.ts +++ b/src/client/streaming.integration.test.ts @@ -807,18 +807,21 @@ describe("Fallback Behavior", () => { order: 0, stepOrder: 0, status: "streaming", + format: "UIMessageChunk", }; const finishedMsg: StreamMessage = { streamId: "s2", order: 1, stepOrder: 0, status: "finished", + format: "UIMessageChunk", }; const abortedMsg: StreamMessage = { streamId: "s3", order: 2, stepOrder: 0, status: "aborted", + format: "UIMessageChunk", }; const msgs = await deriveUIMessagesFromDeltas( diff --git a/src/deltas.test.ts b/src/deltas.test.ts index 13e940c2..98c09e48 100644 --- a/src/deltas.test.ts +++ b/src/deltas.test.ts @@ -200,7 +200,7 @@ describe("UIMessageChunks - continuation stream", () => { }); describe("mergeDeltas", () => { - it("incremental apply only consumes parts past the cursor (no re-processing)", async () => { + it("incremental apply only consumes parts past the cursor (no re-processing)", () => { const N = 500; const streamId = "s-perf"; const toolCallId = "tool-0"; @@ -278,7 +278,7 @@ describe("mergeDeltas", () => { expect(toolPart).toBeDefined(); }); - it("applyUIMessageChunksIncremental: text-delta accumulation across calls", async () => { + it("applyUIMessageChunksIncremental: text-delta accumulation across calls", () => { const streamMessage = { streamId: "s-text", status: "streaming" as const, diff --git a/src/deltas.ts b/src/deltas.ts index fdd61ec6..ff5a21ec 100644 --- a/src/deltas.ts +++ b/src/deltas.ts @@ -451,6 +451,11 @@ export async function deriveUIMessagesFromDeltas( ): Promise { const messages: UIMessage[] = []; for (const streamMessage of streamMessages) { + if (streamMessage.format !== "UIMessageChunk") { + throw new Error( + `deriveUIMessagesFromDeltas: unsupported stream format "${streamMessage.format ?? "text"}" for stream ${streamMessage.streamId}`, + ); + } const { parts } = getParts( allDeltas.filter((d) => d.streamId === streamMessage.streamId), 0, diff --git a/src/react/useStreamingUIMessages.ts b/src/react/useStreamingUIMessages.ts index 3786d8fb..540b4f53 100644 --- a/src/react/useStreamingUIMessages.ts +++ b/src/react/useStreamingUIMessages.ts @@ -69,13 +69,12 @@ export function useStreamingUIMessages< let noNewDeltas = true; for (const stream of streams) { const existingStreamState = messageState[stream.streamMessage.streamId]; - const lastDelta = stream.deltas.at(-1); const cursor = existingStreamState?.cursor; if (existingStreamState === undefined || cursor === undefined) { noNewDeltas = false; break; } - if (lastDelta && lastDelta.start >= cursor) { + if (stream.deltas.some((d) => d.parts.length > 0 && d.end > cursor)) { noNewDeltas = false; break; }