Skip to content

fix(react): O(n²) lag streaming long tool-input deltas (#190)#270

Open
robelest wants to merge 4 commits into
get-convex:mainfrom
robelest:fix/issue-190-tool-input-streaming-on2
Open

fix(react): O(n²) lag streaming long tool-input deltas (#190)#270
robelest wants to merge 4 commits into
get-convex:mainfrom
robelest:fix/issue-190-tool-input-streaming-on2

Conversation

@robelest
Copy link
Copy Markdown
Collaborator

@robelest robelest commented May 22, 2026

Problem

Streaming a long tool-call argument (e.g. a 12k-char JSON) dropped useUIMessages / useThreadMessages to ≈1 FPS. Plain text at the same volume streamed smoothly. Fixes #190.

The hook replayed every accumulated delta through readUIMessageStream on each new chunk. fromCursor was hardcoded to 0 and the base was always blankUIMessage(). 1.2k chunks meant ≈720k stream-part processings: O(n²).

readUIMessageStream must start from a fresh UIMessage. Its mutable state (partialToolCalls, activeTextParts) lives inside the async generator, not in the UIMessage it returns. Feeding it only the new chunks throws tool-input-delta for missing tool call.

Fix

Split processing into two paths in useStreamingUIMessages:

  • First batch (cursor === 0): use readUIMessageStream, so the framing chunks (start, start-step, tool-input-start) initialize the UIMessage.
  • Later batches: a new sync applyUIMessageChunksIncremental mutates the existing UIMessage. No replay.

For tool-input-delta accumulation, JSON.parse with try/catch replaces parsePartialJson. parsePartialJson repair-parses partial JSON eagerly, turning toolPart.input into an object mid-stream. The next delta concatenates with that object and corrupts the accumulator ("[object Object]" + nextDelta). JSON.parse throws on incomplete input and returns only when the JSON is complete.

Other fixes:

  • Early-exit detects status-only transitions (streaming to finished/aborted). A stream can complete without new delta parts.
  • The TextStreamPart path had the same O(n²) bug. It passed [] to deriveUIMessagesFromTextStreamParts; now it forwards prior state.
  • updateFromUIMessageChunks early-returns on empty parts. The for-await loop never ran, so joinText then mutated the caller's message.
  • transitionToolPart<S> typechecks tool-part state changes. updates is Partial<Extract<ToolPart, { state: S }>>, so typos and missing or wrong-typed fields fail at compile time.

Numbers

Headless benchmark from https://github.com/brandon-julio-t/agent-190-repro:

Workload Before After
4k chars / 409 chunks 1,341 ms 34 ms
12k chars / 1,223 chunks ≈21,000 ms 73 ms

268 tests pass.

Follow-up

applyUIMessageChunksIncremental opens with structuredClone(uiMessage), which is O(message-size) per call. The benchmark hides this because structuredClone is fast on these payloads, but total work is still O(n²) for very large streams (100k+ chars). The fix is structural sharing or clone-on-write per handler. Will land separately.

useStreamingUIMessages was replaying the entire accumulated delta
history through readUIMessageStream on every incoming chunk —
fromCursor was hardcoded to 0 and the base UIMessage was always
blankUIMessage(). For a 12k-char tool argument (~1.2k chunks) that
is ~720,000 stream-part processings, dropping the UI to ~1 FPS.

readUIMessageStream cannot resume incrementally from an existing
UIMessage — it relies on internal mutable state (partialToolCalls,
activeTextParts) that lives inside the async generator and is not
stored in the UIMessage it returns. Attempting to feed only the new
chunks throws "tool-input-delta for missing tool call".

The fix splits processing into two paths:

  - First batch (cursor === 0): still uses readUIMessageStream so the
    framing chunks (start, start-step, tool-input-start) initialize
    a correctly shaped UIMessage.
  - Subsequent batches (cursor > 0): use a new sync function
    applyUIMessageChunksIncremental that applies new chunks directly
    onto the existing UIMessage without replay.

applyUIMessageChunksIncremental mirrors what processUIMessageStream
does for each chunk type but works against a passed-in message. For
tool-input-delta accumulation it uses JSON.parse with try/catch
rather than the SDK's parsePartialJson — JSON.parse is sync, throws
on incomplete JSON, and only returns on complete JSON. That matches
the "successful-parse" semantic we want and avoids parsePartialJson's
"repaired-parse" foot-gun where a partial parse shadows the raw
accumulator and corrupts subsequent deltas via
"[object Object]" + nextDelta.

Two related fixes bundled in:

  - The early-exit cursor check now also detects status-only
    transitions (streaming -> finished/aborted), since a stream can
    finish without emitting more delta parts.
  - The TextStreamPart path had the same O(n²) bug — it passed an
    empty existingStreams array to deriveUIMessagesFromTextStreamParts.
    Now it forwards the prior stream state so that path is also O(n).

Tooling:

  - updateFromUIMessageChunks early-returns when parts is empty (the
    for-await loop never ran, so the function then mutated the
    caller's message via joinText).
  - transitionToolPart<S> helper provides a single typed mutation
    point for tool-part state changes. The updates argument is
    Partial<Extract<ToolPart, { state: S }>> so typos in state names,
    missing fields, or wrong field types fail at compile time.

Verification (agent-190-repro hook benchmark):

              4k chars / 409 chunks    12k chars / 1223 chunks
  Before      1,341 ms                 ~21,000 ms
  After          34 ms                       73 ms

268 unit tests pass, build clean.

Closes get-convex#190
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 22, 2026

Review Change Stack

Warning

Review limit reached

@robelest, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 11 minutes and 25 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: get-convex/coderabbit/.coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: c0059ad3-68ac-4bcb-a693-195cd560a5ca

📥 Commits

Reviewing files that changed from the base of the PR and between e77ec40 and ff22c69.

📒 Files selected for processing (3)
  • src/deltas.test.ts
  • src/deltas.ts
  • src/react/useStreamingUIMessages.ts
📝 Walkthrough

Walkthrough

Adds an incremental streaming API (IncrementalStreamState, emptyIncrementalStreamState, applyUIMessageChunksIncremental) that applies UIMessageChunk batches onto an existing UIMessage without replaying prior chunks, tracks per-chunk active indices and per-tool input accumulation (with JSON parse when complete), and recomputes message text. useStreamingUIMessages now stores per-stream IncrementalStreamState, resumes from saved cursors, normalizes status changes, and uses the incremental applier when parts arrive. Tests validate incremental semantics and equivalence to full-stream processing.

Sequence Diagram

sequenceDiagram
  participant Hook as useStreamingUIMessages
  participant Stream as Stream.getParts
  participant Incremental as applyUIMessageChunksIncremental
  participant Parser as JSON.parse
  Hook->>Stream: getParts(fromCursor)
  alt no parts
    Hook->>Hook: maybe update status only
  else parts exist
    Hook->>Incremental: apply incremental parts + prevStreamState
    Incremental->>Parser: try parse accumulated tool-input text
    Parser-->>Incremental: parsed object or parse error
    Incremental->>Hook: return updated message + streamState
  end
Loading

Possibly related issues

Suggested reviewers

  • ianmacartney
🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title directly matches the main change: fixing O(n²) performance lag when streaming long tool-input deltas, which is the core objective of this PR.
Description check ✅ Passed The PR description thoroughly explains the problem, the fix, the specific changes made, and performance benchmarks, all directly related to the changeset.
Linked Issues check ✅ Passed The PR fully addresses issue #190 by eliminating O(n²) replay behavior, replacing parsePartialJson with proper JSON.parse, and enabling incremental processing for long tool-input deltas without lag.
Out of Scope Changes check ✅ Passed All changes directly support the O(n²) performance fix: applyUIMessageChunksIncremental incremental processing, status-only transition detection, prior-state forwarding, and transitionToolPart typing all address the linked issue.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Collapse the dual SDK/incremental processing into one path. The first
batch previously went through readUIMessageStream (parsePartialJson →
partial object input) while later batches used a hand-rolled incremental
function that stored the raw accumulator in `input` and parsed with strict
JSON.parse. The two diverged across the batch boundary, corrupting tool
input for long (multi-flush) streams — exactly the case this PR targets.

applyUIMessageChunksIncremental now drives every batch:
- persists ephemeral stream state (active text/reasoning indices, raw tool
  input text) that the UIMessage can't hold, so it resumes mid-part
- keeps the raw tool-input accumulator separate from `input` and uses
  parsePartialJson, matching the SDK's partial-object streaming
- handles file / message-metadata / data-* chunks in later batches
- tracks text/reasoning parts by chunk id
- matches the SDK on finish-step (clear active maps, don't force "done")
- leaves message status to the caller; warns on unknown chunk types

An equivalence test pins the incremental port to the SDK output.
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/deltas.test.ts (1)

583-587: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

The “O(N) not O(N²)” assertion is not validating applyUIMessageChunksIncremental complexity.

totalPartsProcessed only counts getParts(...) output size, so this still passes even if applyUIMessageChunksIncremental does quadratic internal work. The test currently proves cursor slicing behavior, not algorithmic complexity of incremental application.

Please either (a) narrow the test name/assertion to cursor semantics, or (b) add instrumentation that measures work inside incremental apply.

Also applies to: 635-647

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/deltas.test.ts` around lines 583 - 587, The test's assertion name is
misleading because totalPartsProcessed only measures getParts(...) output, not
work done inside applyUIMessageChunksIncremental; either rename the test to
reflect cursor/chunk slicing semantics or add instrumentation to actually
measure internal work of applyUIMessageChunksIncremental: e.g., inject or
monkey-patch a lightweight counter/spy into the incremental function (or its
internal helper functions used to merge/append parts) to increment on each
processed part/iteration, run the test with N=500 and assert the counter grows
O(N) (≈N) rather than O(N²); reference applyUIMessageChunksIncremental,
totalPartsProcessed, and getParts so you can locate where to add the counter or
rename the test.
🧹 Nitpick comments (1)
src/deltas.ts (1)

278-292: ⚡ Quick win

Drop completed tool buffers from incremental state.

Once a tool leaves input-streaming, its raw JSON is no longer needed, but it stays in toolInputText and gets carried through every later batch. That keeps stale payloads alive on the exact hot path this PR is optimizing.

Suggested fix
       case "tool-input-available": {
         const toolPart = toolPartAt(part.toolCallId);
         if (toolPart) {
           transitionToolPart(toolPart, {
             state: "input-available",
             input: part.input,
             callProviderMetadata: mergeProviderMetadata(
               (toolPart as { callProviderMetadata?: ProviderMetadata })
                 .callProviderMetadata,
               part.providerMetadata,
             ),
           });
         }
         touchedTools.delete(part.toolCallId);
+        delete toolInputText[part.toolCallId];
         break;
       }
       case "tool-input-error": {
         const toolPart = toolPartAt(part.toolCallId);
         if (toolPart) {
           transitionToolPart(toolPart, {
             state: "output-error",
             errorText: part.errorText,
             providerExecuted: part.providerExecuted,
             ...(toolPart.type === "dynamic-tool"
               ? { input: part.input }
               : { input: undefined, rawInput: part.input }),
             callProviderMetadata: mergeProviderMetadata(
               (toolPart as { callProviderMetadata?: ProviderMetadata })
                 .callProviderMetadata,
               part.providerMetadata,
             ),
           });
         }
         touchedTools.delete(part.toolCallId);
+        delete toolInputText[part.toolCallId];
         break;
       }

Also applies to: 294-312

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/deltas.ts` around lines 278 - 292, The tool's raw JSON buffer is left in
incremental state (toolInputText) after a tool transitions out of
input-streaming, keeping stale payloads alive; in the "tool-input-available"
handling (use toolPartAt and transitionToolPart) remove the tool's incremental
buffer for part.toolCallId from toolInputText (and likewise in the later
"tool-completed"/related cases around 294-312) immediately after transitioning
the part so the raw input is dropped and not carried into later batches.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/deltas.test.ts`:
- Around line 797-825: The test currently expects partial JSON to be parsed
mid-stream (old parsePartialJson behavior); change it to assert raw accumulation
after batch A and only assert a structured object after batch B completes valid
JSON. Specifically, in the "accumulates tool input..." case that uses
blankUIMessage, emptyIncrementalStreamState and applyUIMessageChunksIncremental,
replace the assertion on afterA?.input to expect the raw accumulated string
(e.g. '{"a":1') and move/keep the structured expect ({ a: 1 }) to after the
subsequent batch completion where applyUIMessageChunksIncremental receives the
remainder and the streamState yields a complete parse; apply the same change to
the similar assertion around lines 837-839.

---

Outside diff comments:
In `@src/deltas.test.ts`:
- Around line 583-587: The test's assertion name is misleading because
totalPartsProcessed only measures getParts(...) output, not work done inside
applyUIMessageChunksIncremental; either rename the test to reflect cursor/chunk
slicing semantics or add instrumentation to actually measure internal work of
applyUIMessageChunksIncremental: e.g., inject or monkey-patch a lightweight
counter/spy into the incremental function (or its internal helper functions used
to merge/append parts) to increment on each processed part/iteration, run the
test with N=500 and assert the counter grows O(N) (≈N) rather than O(N²);
reference applyUIMessageChunksIncremental, totalPartsProcessed, and getParts so
you can locate where to add the counter or rename the test.

---

Nitpick comments:
In `@src/deltas.ts`:
- Around line 278-292: The tool's raw JSON buffer is left in incremental state
(toolInputText) after a tool transitions out of input-streaming, keeping stale
payloads alive; in the "tool-input-available" handling (use toolPartAt and
transitionToolPart) remove the tool's incremental buffer for part.toolCallId
from toolInputText (and likewise in the later "tool-completed"/related cases
around 294-312) immediately after transitioning the part so the raw input is
dropped and not carried into later batches.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: get-convex/coderabbit/.coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 47191215-a510-470c-ba0e-c80f0ac2b585

📥 Commits

Reviewing files that changed from the base of the PR and between ba2adc5 and c7929d8.

📒 Files selected for processing (3)
  • src/deltas.test.ts
  • src/deltas.ts
  • src/react/useStreamingUIMessages.ts

Comment thread src/deltas.test.ts Outdated
Comment on lines -96 to -97
// Unfortunately this can't handle resuming from a UIMessage and
// adding more chunks, so we re-create it from scratch each time.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what changed here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!cursor is falsy for cursor === 0, so a stream in its first batch — state initialized, cursor still at 0 — would reset unnecessarily. The explicit check only resets when there is no prior state at all. cursor === 0 falls through to the inner fromCursor === 0 branch, which routes to updateFromUIMessageChunks.

robelest added 2 commits May 27, 2026 13:39
…test)

- deltas.ts: drop `toolInputText[id]` from incremental state when a tool
  transitions to input-available / tool-input-error. The raw JSON is no longer
  needed and was being carried through every later batch on the hot path.
- deltas.test.ts: rename the "O(N) not O(N²)" test to reflect what it actually
  proves (cursor slicing — each part handed to applyUIMessageChunksIncremental
  exactly once); document that the algorithmic claim is proven by the PR's
  21,000 ms → 73 ms benchmark, not by the unit test.

Skipped CodeRabbit's third nit ("parse-on-complete" assertion for the
partial-input test): that would contradict the deliberate partial-JSON parse
during streaming (deltas.ts:435-443), which mirrors the AI SDK's streamObject
behavior and is documented inline.
parsePartialJson was the only await — replace with JSON.parse + try/catch.
JSON.parse only materializes input when JSON is complete; parsePartialJson
repair-parsed partial JSON eagerly, which could corrupt the accumulator
if input was used before parsing finished.

Fixes the test asserting old eager-parse behavior and removes the
parsePartialJson import that was added but not needed.
@robelest robelest force-pushed the fix/issue-190-tool-input-streaming-on2 branch from e77ec40 to ff22c69 Compare May 27, 2026 17:48
@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new Bot commented May 27, 2026

Open in StackBlitz

npm i https://pkg.pr.new/@convex-dev/agent@270

commit: ff22c69

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

lag when streaming long tool-input deltas (useUIMessages / useThreadMessages)

2 participants