fix(react): O(n²) lag streaming long tool-input deltas (#190)#270
fix(react): O(n²) lag streaming long tool-input deltas (#190)#270robelest wants to merge 4 commits into
Conversation
useStreamingUIMessages was replaying the entire accumulated delta
history through readUIMessageStream on every incoming chunk —
fromCursor was hardcoded to 0 and the base UIMessage was always
blankUIMessage(). For a 12k-char tool argument (~1.2k chunks) that
is ~720,000 stream-part processings, dropping the UI to ~1 FPS.
readUIMessageStream cannot resume incrementally from an existing
UIMessage — it relies on internal mutable state (partialToolCalls,
activeTextParts) that lives inside the async generator and is not
stored in the UIMessage it returns. Attempting to feed only the new
chunks throws "tool-input-delta for missing tool call".
The fix splits processing into two paths:
- First batch (cursor === 0): still uses readUIMessageStream so the
framing chunks (start, start-step, tool-input-start) initialize
a correctly shaped UIMessage.
- Subsequent batches (cursor > 0): use a new sync function
applyUIMessageChunksIncremental that applies new chunks directly
onto the existing UIMessage without replay.
applyUIMessageChunksIncremental mirrors what processUIMessageStream
does for each chunk type but works against a passed-in message. For
tool-input-delta accumulation it uses JSON.parse with try/catch
rather than the SDK's parsePartialJson — JSON.parse is sync, throws
on incomplete JSON, and only returns on complete JSON. That matches
the "successful-parse" semantic we want and avoids parsePartialJson's
"repaired-parse" foot-gun where a partial parse shadows the raw
accumulator and corrupts subsequent deltas via
"[object Object]" + nextDelta.
Two related fixes bundled in:
- The early-exit cursor check now also detects status-only
transitions (streaming -> finished/aborted), since a stream can
finish without emitting more delta parts.
- The TextStreamPart path had the same O(n²) bug — it passed an
empty existingStreams array to deriveUIMessagesFromTextStreamParts.
Now it forwards the prior stream state so that path is also O(n).
Tooling:
- updateFromUIMessageChunks early-returns when parts is empty (the
for-await loop never ran, so the function then mutated the
caller's message via joinText).
- transitionToolPart<S> helper provides a single typed mutation
point for tool-part state changes. The updates argument is
Partial<Extract<ToolPart, { state: S }>> so typos in state names,
missing fields, or wrong field types fail at compile time.
Verification (agent-190-repro hook benchmark):
4k chars / 409 chunks 12k chars / 1223 chunks
Before 1,341 ms ~21,000 ms
After 34 ms 73 ms
268 unit tests pass, build clean.
Closes get-convex#190
|
Warning Review limit reached
More reviews will be available in 11 minutes and 25 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Repository: get-convex/coderabbit/.coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughAdds an incremental streaming API (IncrementalStreamState, emptyIncrementalStreamState, applyUIMessageChunksIncremental) that applies UIMessageChunk batches onto an existing UIMessage without replaying prior chunks, tracks per-chunk active indices and per-tool input accumulation (with JSON parse when complete), and recomputes message text. useStreamingUIMessages now stores per-stream IncrementalStreamState, resumes from saved cursors, normalizes status changes, and uses the incremental applier when parts arrive. Tests validate incremental semantics and equivalence to full-stream processing. Sequence DiagramsequenceDiagram
participant Hook as useStreamingUIMessages
participant Stream as Stream.getParts
participant Incremental as applyUIMessageChunksIncremental
participant Parser as JSON.parse
Hook->>Stream: getParts(fromCursor)
alt no parts
Hook->>Hook: maybe update status only
else parts exist
Hook->>Incremental: apply incremental parts + prevStreamState
Incremental->>Parser: try parse accumulated tool-input text
Parser-->>Incremental: parsed object or parse error
Incremental->>Hook: return updated message + streamState
end
Possibly related issues
Suggested reviewers
🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Collapse the dual SDK/incremental processing into one path. The first batch previously went through readUIMessageStream (parsePartialJson → partial object input) while later batches used a hand-rolled incremental function that stored the raw accumulator in `input` and parsed with strict JSON.parse. The two diverged across the batch boundary, corrupting tool input for long (multi-flush) streams — exactly the case this PR targets. applyUIMessageChunksIncremental now drives every batch: - persists ephemeral stream state (active text/reasoning indices, raw tool input text) that the UIMessage can't hold, so it resumes mid-part - keeps the raw tool-input accumulator separate from `input` and uses parsePartialJson, matching the SDK's partial-object streaming - handles file / message-metadata / data-* chunks in later batches - tracks text/reasoning parts by chunk id - matches the SDK on finish-step (clear active maps, don't force "done") - leaves message status to the caller; warns on unknown chunk types An equivalence test pins the incremental port to the SDK output.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/deltas.test.ts (1)
583-587:⚠️ Potential issue | 🟠 Major | ⚡ Quick winThe “O(N) not O(N²)” assertion is not validating
applyUIMessageChunksIncrementalcomplexity.
totalPartsProcessedonly countsgetParts(...)output size, so this still passes even ifapplyUIMessageChunksIncrementaldoes quadratic internal work. The test currently proves cursor slicing behavior, not algorithmic complexity of incremental application.Please either (a) narrow the test name/assertion to cursor semantics, or (b) add instrumentation that measures work inside incremental apply.
Also applies to: 635-647
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/deltas.test.ts` around lines 583 - 587, The test's assertion name is misleading because totalPartsProcessed only measures getParts(...) output, not work done inside applyUIMessageChunksIncremental; either rename the test to reflect cursor/chunk slicing semantics or add instrumentation to actually measure internal work of applyUIMessageChunksIncremental: e.g., inject or monkey-patch a lightweight counter/spy into the incremental function (or its internal helper functions used to merge/append parts) to increment on each processed part/iteration, run the test with N=500 and assert the counter grows O(N) (≈N) rather than O(N²); reference applyUIMessageChunksIncremental, totalPartsProcessed, and getParts so you can locate where to add the counter or rename the test.
🧹 Nitpick comments (1)
src/deltas.ts (1)
278-292: ⚡ Quick winDrop completed tool buffers from incremental state.
Once a tool leaves
input-streaming, its raw JSON is no longer needed, but it stays intoolInputTextand gets carried through every later batch. That keeps stale payloads alive on the exact hot path this PR is optimizing.Suggested fix
case "tool-input-available": { const toolPart = toolPartAt(part.toolCallId); if (toolPart) { transitionToolPart(toolPart, { state: "input-available", input: part.input, callProviderMetadata: mergeProviderMetadata( (toolPart as { callProviderMetadata?: ProviderMetadata }) .callProviderMetadata, part.providerMetadata, ), }); } touchedTools.delete(part.toolCallId); + delete toolInputText[part.toolCallId]; break; } case "tool-input-error": { const toolPart = toolPartAt(part.toolCallId); if (toolPart) { transitionToolPart(toolPart, { state: "output-error", errorText: part.errorText, providerExecuted: part.providerExecuted, ...(toolPart.type === "dynamic-tool" ? { input: part.input } : { input: undefined, rawInput: part.input }), callProviderMetadata: mergeProviderMetadata( (toolPart as { callProviderMetadata?: ProviderMetadata }) .callProviderMetadata, part.providerMetadata, ), }); } touchedTools.delete(part.toolCallId); + delete toolInputText[part.toolCallId]; break; }Also applies to: 294-312
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/deltas.ts` around lines 278 - 292, The tool's raw JSON buffer is left in incremental state (toolInputText) after a tool transitions out of input-streaming, keeping stale payloads alive; in the "tool-input-available" handling (use toolPartAt and transitionToolPart) remove the tool's incremental buffer for part.toolCallId from toolInputText (and likewise in the later "tool-completed"/related cases around 294-312) immediately after transitioning the part so the raw input is dropped and not carried into later batches.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/deltas.test.ts`:
- Around line 797-825: The test currently expects partial JSON to be parsed
mid-stream (old parsePartialJson behavior); change it to assert raw accumulation
after batch A and only assert a structured object after batch B completes valid
JSON. Specifically, in the "accumulates tool input..." case that uses
blankUIMessage, emptyIncrementalStreamState and applyUIMessageChunksIncremental,
replace the assertion on afterA?.input to expect the raw accumulated string
(e.g. '{"a":1') and move/keep the structured expect ({ a: 1 }) to after the
subsequent batch completion where applyUIMessageChunksIncremental receives the
remainder and the streamState yields a complete parse; apply the same change to
the similar assertion around lines 837-839.
---
Outside diff comments:
In `@src/deltas.test.ts`:
- Around line 583-587: The test's assertion name is misleading because
totalPartsProcessed only measures getParts(...) output, not work done inside
applyUIMessageChunksIncremental; either rename the test to reflect cursor/chunk
slicing semantics or add instrumentation to actually measure internal work of
applyUIMessageChunksIncremental: e.g., inject or monkey-patch a lightweight
counter/spy into the incremental function (or its internal helper functions used
to merge/append parts) to increment on each processed part/iteration, run the
test with N=500 and assert the counter grows O(N) (≈N) rather than O(N²);
reference applyUIMessageChunksIncremental, totalPartsProcessed, and getParts so
you can locate where to add the counter or rename the test.
---
Nitpick comments:
In `@src/deltas.ts`:
- Around line 278-292: The tool's raw JSON buffer is left in incremental state
(toolInputText) after a tool transitions out of input-streaming, keeping stale
payloads alive; in the "tool-input-available" handling (use toolPartAt and
transitionToolPart) remove the tool's incremental buffer for part.toolCallId
from toolInputText (and likewise in the later "tool-completed"/related cases
around 294-312) immediately after transitioning the part so the raw input is
dropped and not carried into later batches.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository: get-convex/coderabbit/.coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 47191215-a510-470c-ba0e-c80f0ac2b585
📒 Files selected for processing (3)
src/deltas.test.tssrc/deltas.tssrc/react/useStreamingUIMessages.ts
| // Unfortunately this can't handle resuming from a UIMessage and | ||
| // adding more chunks, so we re-create it from scratch each time. |
There was a problem hiding this comment.
!cursor is falsy for cursor === 0, so a stream in its first batch — state initialized, cursor still at 0 — would reset unnecessarily. The explicit check only resets when there is no prior state at all. cursor === 0 falls through to the inner fromCursor === 0 branch, which routes to updateFromUIMessageChunks.
…test)
- deltas.ts: drop `toolInputText[id]` from incremental state when a tool
transitions to input-available / tool-input-error. The raw JSON is no longer
needed and was being carried through every later batch on the hot path.
- deltas.test.ts: rename the "O(N) not O(N²)" test to reflect what it actually
proves (cursor slicing — each part handed to applyUIMessageChunksIncremental
exactly once); document that the algorithmic claim is proven by the PR's
21,000 ms → 73 ms benchmark, not by the unit test.
Skipped CodeRabbit's third nit ("parse-on-complete" assertion for the
partial-input test): that would contradict the deliberate partial-JSON parse
during streaming (deltas.ts:435-443), which mirrors the AI SDK's streamObject
behavior and is documented inline.
parsePartialJson was the only await — replace with JSON.parse + try/catch. JSON.parse only materializes input when JSON is complete; parsePartialJson repair-parsed partial JSON eagerly, which could corrupt the accumulator if input was used before parsing finished. Fixes the test asserting old eager-parse behavior and removes the parsePartialJson import that was added but not needed.
e77ec40 to
ff22c69
Compare
commit: |
Problem
Streaming a long tool-call argument (e.g. a 12k-char JSON) dropped
useUIMessages/useThreadMessagesto ≈1 FPS. Plain text at the same volume streamed smoothly. Fixes #190.The hook replayed every accumulated delta through
readUIMessageStreamon each new chunk.fromCursorwas hardcoded to0and the base was alwaysblankUIMessage(). 1.2k chunks meant ≈720k stream-part processings: O(n²).readUIMessageStreammust start from a freshUIMessage. Its mutable state (partialToolCalls,activeTextParts) lives inside the async generator, not in theUIMessageit returns. Feeding it only the new chunks throwstool-input-delta for missing tool call.Fix
Split processing into two paths in
useStreamingUIMessages:cursor === 0): usereadUIMessageStream, so the framing chunks (start,start-step,tool-input-start) initialize theUIMessage.applyUIMessageChunksIncrementalmutates the existingUIMessage. No replay.For
tool-input-deltaaccumulation,JSON.parsewith try/catch replacesparsePartialJson.parsePartialJsonrepair-parses partial JSON eagerly, turningtoolPart.inputinto an object mid-stream. The next delta concatenates with that object and corrupts the accumulator ("[object Object]" + nextDelta).JSON.parsethrows on incomplete input and returns only when the JSON is complete.Other fixes:
streamingtofinished/aborted). A stream can complete without new delta parts.TextStreamPartpath had the same O(n²) bug. It passed[]toderiveUIMessagesFromTextStreamParts; now it forwards prior state.updateFromUIMessageChunksearly-returns on emptyparts. Thefor-awaitloop never ran, sojoinTextthen mutated the caller's message.transitionToolPart<S>typechecks tool-part state changes.updatesisPartial<Extract<ToolPart, { state: S }>>, so typos and missing or wrong-typed fields fail at compile time.Numbers
Headless benchmark from https://github.com/brandon-julio-t/agent-190-repro:
268 tests pass.
Follow-up
applyUIMessageChunksIncrementalopens withstructuredClone(uiMessage), which is O(message-size) per call. The benchmark hides this becausestructuredCloneis fast on these payloads, but total work is still O(n²) for very large streams (100k+ chars). The fix is structural sharing or clone-on-write per handler. Will land separately.