chore(deltas): delete legacy TextStreamPart streaming path#275
chore(deltas): delete legacy TextStreamPart streaming path#275robelest wants to merge 6 commits into
Conversation
useStreamingUIMessages was replaying the entire accumulated delta
history through readUIMessageStream on every incoming chunk —
fromCursor was hardcoded to 0 and the base UIMessage was always
blankUIMessage(). For a 12k-char tool argument (~1.2k chunks) that
is ~720,000 stream-part processings, dropping the UI to ~1 FPS.
readUIMessageStream cannot resume incrementally from an existing
UIMessage — it relies on internal mutable state (partialToolCalls,
activeTextParts) that lives inside the async generator and is not
stored in the UIMessage it returns. Attempting to feed only the new
chunks throws "tool-input-delta for missing tool call".
The fix splits processing into two paths:
- First batch (cursor === 0): still uses readUIMessageStream so the
framing chunks (start, start-step, tool-input-start) initialize
a correctly shaped UIMessage.
- Subsequent batches (cursor > 0): use a new sync function
applyUIMessageChunksIncremental that applies new chunks directly
onto the existing UIMessage without replay.
applyUIMessageChunksIncremental mirrors what processUIMessageStream
does for each chunk type but works against a passed-in message. For
tool-input-delta accumulation it uses JSON.parse with try/catch
rather than the SDK's parsePartialJson — JSON.parse is sync, throws
on incomplete JSON, and only returns on complete JSON. That matches
the "successful-parse" semantic we want and avoids parsePartialJson's
"repaired-parse" foot-gun where a partial parse shadows the raw
accumulator and corrupts subsequent deltas via
"[object Object]" + nextDelta.
Two related fixes bundled in:
- The early-exit cursor check now also detects status-only
transitions (streaming -> finished/aborted), since a stream can
finish without emitting more delta parts.
- The TextStreamPart path had the same O(n²) bug — it passed an
empty existingStreams array to deriveUIMessagesFromTextStreamParts.
Now it forwards the prior stream state so that path is also O(n).
Tooling:
- updateFromUIMessageChunks early-returns when parts is empty (the
for-await loop never ran, so the function then mutated the
caller's message via joinText).
- transitionToolPart<S> helper provides a single typed mutation
point for tool-part state changes. The updates argument is
Partial<Extract<ToolPart, { state: S }>> so typos in state names,
missing fields, or wrong field types fail at compile time.
Verification (agent-190-repro hook benchmark):
4k chars / 409 chunks 12k chars / 1223 chunks
Before 1,341 ms ~21,000 ms
After 34 ms 73 ms
268 unit tests pass, build clean.
Closes get-convex#190
Collapse the dual SDK/incremental processing into one path. The first batch previously went through readUIMessageStream (parsePartialJson → partial object input) while later batches used a hand-rolled incremental function that stored the raw accumulator in `input` and parsed with strict JSON.parse. The two diverged across the batch boundary, corrupting tool input for long (multi-flush) streams — exactly the case this PR targets. applyUIMessageChunksIncremental now drives every batch: - persists ephemeral stream state (active text/reasoning indices, raw tool input text) that the UIMessage can't hold, so it resumes mid-part - keeps the raw tool-input accumulator separate from `input` and uses parsePartialJson, matching the SDK's partial-object streaming - handles file / message-metadata / data-* chunks in later batches - tracks text/reasoning parts by chunk id - matches the SDK on finish-step (clear active maps, don't force "done") - leaves message status to the caller; warns on unknown chunk types An equivalence test pins the incremental port to the SDK output.
|
Warning Review limit reached
More reviews will be available in 11 minutes and 25 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Repository: get-convex/coderabbit/.coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
📝 WalkthroughWalkthroughThis PR replaces full-rebuild streaming logic with cursor-based incremental state tracking. A new Suggested reviewers
🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/deltas.test.ts (1)
203-203: ⚡ Quick winRemove unnecessary
asyncfrom tests that don't await.These tests only call the synchronous
applyUIMessageChunksIncrementalfunction and don't useawait, so theasynckeyword is unnecessary.🧹 Proposed cleanup
- it("incremental apply only consumes parts past the cursor (no re-processing)", async () => { + it("incremental apply only consumes parts past the cursor (no re-processing)", () => {- it("applyUIMessageChunksIncremental: text-delta accumulation across calls", async () => { + it("applyUIMessageChunksIncremental: text-delta accumulation across calls", () => {- it("applyUIMessageChunksIncremental: tool-output-available preserves input and sets fields", async () => { + it("applyUIMessageChunksIncremental: tool-output-available preserves input and sets fields", () => {- it("applyUIMessageChunksIncremental: tool-input-error sets rawInput and clears input for static tools", async () => { + it("applyUIMessageChunksIncremental: tool-input-error sets rawInput and clears input for static tools", () => {- it("accumulates tool input across a batch boundary", async () => { + it("accumulates tool input across a batch boundary", () => {- it("pushes file parts and merges message metadata in later batches", async () => { + it("pushes file parts and merges message metadata in later batches", () => {- it("tracks concurrent text parts by id across batches", async () => { + it("tracks concurrent text parts by id across batches", () => {Also applies to: 281-281, 324-324, 374-374, 421-421, 466-466, 506-506
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/deltas.test.ts` at line 203, The test cases that call the synchronous function applyUIMessageChunksIncremental (e.g., the "incremental apply only consumes parts past the cursor (no re-processing)" it block and the other mentioned it blocks with matching descriptions) incorrectly include the async keyword despite not awaiting anything; remove the unnecessary async modifier from the test function declarations (e.g., change it("...", async () => { ... }) to it("...", () => { ... })) for applyUIMessageChunksIncremental tests so they are plain synchronous tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/deltas.ts`:
- Around line 453-461: deriveUIMessagesFromDeltas currently assumes every
StreamMessage is a UIMessageChunk and unconditionally casts through
getParts<UIMessageChunk> → updateFromUIMessageChunks; add a guard to fail fast:
inside the loop over streamMessages check streamMessage.format ===
"UIMessageChunk" (or equivalent discriminator) before calling
getParts/updateFromUIMessageChunks and if not, throw a clear error (or
return/skip) indicating unsupported stream format for
deriveUIMessagesFromDeltas; alternatively, narrow the function signature to only
accept StreamMessage<"UIMessageChunk"> so callers must pass the correct type.
Ensure references to getParts, updateFromUIMessageChunks, blankUIMessage and
deriveUIMessagesFromDeltas are updated accordingly.
In `@src/react/useStreamingUIMessages.ts`:
- Around line 72-80: The fast-path incorrectly assumes stream.deltas.at(-1) is
the newest delta (in useStreamingUIMessages.ts): instead of taking lastDelta,
scan stream.deltas for any delta with content newer than the saved cursor (e.g.,
check delta.parts.length > 0 && delta.end > cursor) and set noNewDeltas = false
only if none match; update the checks that reference lastDelta and the cursor to
use this scan so unsorted incoming deltas aren't ignored.
---
Nitpick comments:
In `@src/deltas.test.ts`:
- Line 203: The test cases that call the synchronous function
applyUIMessageChunksIncremental (e.g., the "incremental apply only consumes
parts past the cursor (no re-processing)" it block and the other mentioned it
blocks with matching descriptions) incorrectly include the async keyword despite
not awaiting anything; remove the unnecessary async modifier from the test
function declarations (e.g., change it("...", async () => { ... }) to it("...",
() => { ... })) for applyUIMessageChunksIncremental tests so they are plain
synchronous tests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository: get-convex/coderabbit/.coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ea1e7f94-a61a-4e3b-8507-7e93a6608335
📒 Files selected for processing (4)
src/client/streaming.integration.test.tssrc/deltas.test.tssrc/deltas.tssrc/react/useStreamingUIMessages.ts
💤 Files with no reviewable changes (1)
- src/client/streaming.integration.test.ts
…test)
- deltas.ts: drop `toolInputText[id]` from incremental state when a tool
transitions to input-available / tool-input-error. The raw JSON is no longer
needed and was being carried through every later batch on the hot path.
- deltas.test.ts: rename the "O(N) not O(N²)" test to reflect what it actually
proves (cursor slicing — each part handed to applyUIMessageChunksIncremental
exactly once); document that the algorithmic claim is proven by the PR's
21,000 ms → 73 ms benchmark, not by the unit test.
Skipped CodeRabbit's third nit ("parse-on-complete" assertion for the
partial-input test): that would contradict the deliberate partial-JSON parse
during streaming (deltas.ts:435-443), which mirrors the AI SDK's streamObject
behavior and is documented inline.
parsePartialJson was the only await — replace with JSON.parse + try/catch. JSON.parse only materializes input when JSON is complete; parsePartialJson repair-parsed partial JSON eagerly, which could corrupt the accumulator if input was used before parsing finished. Fixes the test asserting old eager-parse behavior and removes the parsePartialJson import that was added but not needed.
All agent streams now write UIMessageChunk. The text-format path (updateFromTextStreamParts, deriveUIMessagesFromTextStreamParts) has been dead code since streaming.ts switched to DeltaStreamer<UIMessageChunk>. Removes ~900 lines including tests.
- deriveUIMessagesFromDeltas: add format guard to fail fast on non-UIMessageChunk streams rather than silently processing them through the wrong path - useStreamingUIMessages: replace .at(-1) fast-path with delta.end > cursor scan so unsorted delta arrays don't cause spurious noNewDeltas early returns - deltas.test.ts: drop unnecessary async from tests that don't await - streaming.integration.test.ts: add format: UIMessageChunk to StreamMessage fixtures used by deriveUIMessagesFromDeltas status-mapping test
641c83c to
995691a
Compare
commit: |
| await Promise.all( | ||
| streams.map(async ({ deltas, streamMessage }) => { | ||
| const { parts, cursor } = getParts<UIMessageChunk>(deltas, 0); | ||
| if (streamMessage.format === "UIMessageChunk") { |
There was a problem hiding this comment.
I think we should still do some check for this. the intent was to be able to have multiple formats supported - we should at least throw if we see a format we don't understand / if they were using code that was using the simpler text stream
Stacks on #270.
Problem
deltas.tscarries a ~400-line TextStreamPart state machine (updateFromTextStreamParts,deriveUIMessagesFromTextStreamParts) that predates the UIMessageChunk format — the inline comment onupdateFromTextStreamPartsnames it: "historically from when we would use the onChunk callback."All new agent-generated streams write
format: "UIMessageChunk":streamText.ts→DeltaStreamer<UIMessageChunk>streamObject.ts→DeltaStreamer<UIMessageChunk>engine.ts→DeltaStreamer<UIMessageChunk>The
Streamprimitive writesformat: "text"but is consumed byuseStream/getStreamBody, never throughuseStreamingUIMessages. The text path inuseStreamingUIMessagesis dead for all new writes.Fix
Delete
updateFromTextStreamParts,deriveUIMessagesFromTextStreamParts, and theformat !== "UIMessageChunk"branch inuseStreamingUIMessages. Net: ~-900 lines including tests.Note on upcoming AI SDK v7
When this repo upgrades to AI SDK v7, a follow-up PR will also clean up:
previousResponseMessageCountwatermark fromserializeNewMessagesInStep/agent.saveStep— v7 makesstep.response.messagesper-step, not cumulative (the watermark is correct but redundant in v7)stepCountIs→isStepCount(user-facing export)result.request→result.finalStep.requestinstreamObject.tssystem:→instructions:ingenerateText/streamTextcallsneedsApproval→ call-leveltoolApprovalincreateTool.ts