Skip to content

chore(deltas): delete legacy TextStreamPart streaming path#275

Open
robelest wants to merge 6 commits into
get-convex:mainfrom
robelest:fix/drop-text-stream-format
Open

chore(deltas): delete legacy TextStreamPart streaming path#275
robelest wants to merge 6 commits into
get-convex:mainfrom
robelest:fix/drop-text-stream-format

Conversation

@robelest
Copy link
Copy Markdown
Collaborator

Stacks on #270.

Problem

deltas.ts carries a ~400-line TextStreamPart state machine (updateFromTextStreamParts, deriveUIMessagesFromTextStreamParts) that predates the UIMessageChunk format — the inline comment on updateFromTextStreamParts names it: "historically from when we would use the onChunk callback."

All new agent-generated streams write format: "UIMessageChunk":

  • streamText.tsDeltaStreamer<UIMessageChunk>
  • streamObject.tsDeltaStreamer<UIMessageChunk>
  • engine.tsDeltaStreamer<UIMessageChunk>

The Stream primitive writes format: "text" but is consumed by useStream / getStreamBody, never through useStreamingUIMessages. The text path in useStreamingUIMessages is dead for all new writes.

Fix

Delete updateFromTextStreamParts, deriveUIMessagesFromTextStreamParts, and the format !== "UIMessageChunk" branch in useStreamingUIMessages. Net: ~-900 lines including tests.

Note on upcoming AI SDK v7

When this repo upgrades to AI SDK v7, a follow-up PR will also clean up:

  • Remove the previousResponseMessageCount watermark from serializeNewMessagesInStep / agent.saveStep — v7 makes step.response.messages per-step, not cumulative (the watermark is correct but redundant in v7)
  • Rename stepCountIsisStepCount (user-facing export)
  • Update result.requestresult.finalStep.request in streamObject.ts
  • Rename system:instructions: in generateText/streamText calls
  • Migrate needsApproval → call-level toolApproval in createTool.ts

robelest added 2 commits May 22, 2026 13:53
useStreamingUIMessages was replaying the entire accumulated delta
history through readUIMessageStream on every incoming chunk —
fromCursor was hardcoded to 0 and the base UIMessage was always
blankUIMessage(). For a 12k-char tool argument (~1.2k chunks) that
is ~720,000 stream-part processings, dropping the UI to ~1 FPS.

readUIMessageStream cannot resume incrementally from an existing
UIMessage — it relies on internal mutable state (partialToolCalls,
activeTextParts) that lives inside the async generator and is not
stored in the UIMessage it returns. Attempting to feed only the new
chunks throws "tool-input-delta for missing tool call".

The fix splits processing into two paths:

  - First batch (cursor === 0): still uses readUIMessageStream so the
    framing chunks (start, start-step, tool-input-start) initialize
    a correctly shaped UIMessage.
  - Subsequent batches (cursor > 0): use a new sync function
    applyUIMessageChunksIncremental that applies new chunks directly
    onto the existing UIMessage without replay.

applyUIMessageChunksIncremental mirrors what processUIMessageStream
does for each chunk type but works against a passed-in message. For
tool-input-delta accumulation it uses JSON.parse with try/catch
rather than the SDK's parsePartialJson — JSON.parse is sync, throws
on incomplete JSON, and only returns on complete JSON. That matches
the "successful-parse" semantic we want and avoids parsePartialJson's
"repaired-parse" foot-gun where a partial parse shadows the raw
accumulator and corrupts subsequent deltas via
"[object Object]" + nextDelta.

Two related fixes bundled in:

  - The early-exit cursor check now also detects status-only
    transitions (streaming -> finished/aborted), since a stream can
    finish without emitting more delta parts.
  - The TextStreamPart path had the same O(n²) bug — it passed an
    empty existingStreams array to deriveUIMessagesFromTextStreamParts.
    Now it forwards the prior stream state so that path is also O(n).

Tooling:

  - updateFromUIMessageChunks early-returns when parts is empty (the
    for-await loop never ran, so the function then mutated the
    caller's message via joinText).
  - transitionToolPart<S> helper provides a single typed mutation
    point for tool-part state changes. The updates argument is
    Partial<Extract<ToolPart, { state: S }>> so typos in state names,
    missing fields, or wrong field types fail at compile time.

Verification (agent-190-repro hook benchmark):

              4k chars / 409 chunks    12k chars / 1223 chunks
  Before      1,341 ms                 ~21,000 ms
  After          34 ms                       73 ms

268 unit tests pass, build clean.

Closes get-convex#190
Collapse the dual SDK/incremental processing into one path. The first
batch previously went through readUIMessageStream (parsePartialJson →
partial object input) while later batches used a hand-rolled incremental
function that stored the raw accumulator in `input` and parsed with strict
JSON.parse. The two diverged across the batch boundary, corrupting tool
input for long (multi-flush) streams — exactly the case this PR targets.

applyUIMessageChunksIncremental now drives every batch:
- persists ephemeral stream state (active text/reasoning indices, raw tool
  input text) that the UIMessage can't hold, so it resumes mid-part
- keeps the raw tool-input accumulator separate from `input` and uses
  parsePartialJson, matching the SDK's partial-object streaming
- handles file / message-metadata / data-* chunks in later batches
- tracks text/reasoning parts by chunk id
- matches the SDK on finish-step (clear active maps, don't force "done")
- leaves message status to the caller; warns on unknown chunk types

An equivalence test pins the incremental port to the SDK output.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 27, 2026

Review Change Stack

Warning

Review limit reached

@robelest, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 11 minutes and 25 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: get-convex/coderabbit/.coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: d02cd3e9-8dc4-4bc1-bd4e-6246f5fe5ca9

📥 Commits

Reviewing files that changed from the base of the PR and between 641c83c and 995691a.

📒 Files selected for processing (4)
  • src/client/streaming.integration.test.ts
  • src/deltas.test.ts
  • src/deltas.ts
  • src/react/useStreamingUIMessages.ts
📝 Walkthrough

Walkthrough

This PR replaces full-rebuild streaming logic with cursor-based incremental state tracking. A new applyUIMessageChunksIncremental() function applies batches of UIMessageChunk updates to an existing UIMessage, tracking the cursor position in IncrementalStreamState to avoid reprocessing earlier parts. The React hook in useStreamingUIMessages now stores and resumes this state per stream, constructing messages incrementally instead of from scratch. Tool input JSON is buffered across batch boundaries until an available/error event, and text/reasoning parts are tracked by id to handle concurrent streaming. Tests validate cursor correctness, part accumulation, tool lifecycle transitions, and equivalence between incremental and full-stream processing.

Suggested reviewers

  • ianmacartney
🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: deletion of the legacy TextStreamPart streaming path from the codebase, which is the primary objective of this PR.
Description check ✅ Passed The description clearly explains the problem (legacy TextStreamPart state machine is dead code), the fix (delete unused functions), and provides helpful context about future work and stacking dependency.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/deltas.test.ts (1)

203-203: ⚡ Quick win

Remove unnecessary async from tests that don't await.

These tests only call the synchronous applyUIMessageChunksIncremental function and don't use await, so the async keyword is unnecessary.

🧹 Proposed cleanup
-  it("incremental apply only consumes parts past the cursor (no re-processing)", async () => {
+  it("incremental apply only consumes parts past the cursor (no re-processing)", () => {
-  it("applyUIMessageChunksIncremental: text-delta accumulation across calls", async () => {
+  it("applyUIMessageChunksIncremental: text-delta accumulation across calls", () => {
-  it("applyUIMessageChunksIncremental: tool-output-available preserves input and sets fields", async () => {
+  it("applyUIMessageChunksIncremental: tool-output-available preserves input and sets fields", () => {
-  it("applyUIMessageChunksIncremental: tool-input-error sets rawInput and clears input for static tools", async () => {
+  it("applyUIMessageChunksIncremental: tool-input-error sets rawInput and clears input for static tools", () => {
-  it("accumulates tool input across a batch boundary", async () => {
+  it("accumulates tool input across a batch boundary", () => {
-  it("pushes file parts and merges message metadata in later batches", async () => {
+  it("pushes file parts and merges message metadata in later batches", () => {
-  it("tracks concurrent text parts by id across batches", async () => {
+  it("tracks concurrent text parts by id across batches", () => {

Also applies to: 281-281, 324-324, 374-374, 421-421, 466-466, 506-506

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/deltas.test.ts` at line 203, The test cases that call the synchronous
function applyUIMessageChunksIncremental (e.g., the "incremental apply only
consumes parts past the cursor (no re-processing)" it block and the other
mentioned it blocks with matching descriptions) incorrectly include the async
keyword despite not awaiting anything; remove the unnecessary async modifier
from the test function declarations (e.g., change it("...", async () => { ... })
to it("...", () => { ... })) for applyUIMessageChunksIncremental tests so they
are plain synchronous tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/deltas.ts`:
- Around line 453-461: deriveUIMessagesFromDeltas currently assumes every
StreamMessage is a UIMessageChunk and unconditionally casts through
getParts<UIMessageChunk> → updateFromUIMessageChunks; add a guard to fail fast:
inside the loop over streamMessages check streamMessage.format ===
"UIMessageChunk" (or equivalent discriminator) before calling
getParts/updateFromUIMessageChunks and if not, throw a clear error (or
return/skip) indicating unsupported stream format for
deriveUIMessagesFromDeltas; alternatively, narrow the function signature to only
accept StreamMessage<"UIMessageChunk"> so callers must pass the correct type.
Ensure references to getParts, updateFromUIMessageChunks, blankUIMessage and
deriveUIMessagesFromDeltas are updated accordingly.

In `@src/react/useStreamingUIMessages.ts`:
- Around line 72-80: The fast-path incorrectly assumes stream.deltas.at(-1) is
the newest delta (in useStreamingUIMessages.ts): instead of taking lastDelta,
scan stream.deltas for any delta with content newer than the saved cursor (e.g.,
check delta.parts.length > 0 && delta.end > cursor) and set noNewDeltas = false
only if none match; update the checks that reference lastDelta and the cursor to
use this scan so unsorted incoming deltas aren't ignored.

---

Nitpick comments:
In `@src/deltas.test.ts`:
- Line 203: The test cases that call the synchronous function
applyUIMessageChunksIncremental (e.g., the "incremental apply only consumes
parts past the cursor (no re-processing)" it block and the other mentioned it
blocks with matching descriptions) incorrectly include the async keyword despite
not awaiting anything; remove the unnecessary async modifier from the test
function declarations (e.g., change it("...", async () => { ... }) to it("...",
() => { ... })) for applyUIMessageChunksIncremental tests so they are plain
synchronous tests.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: get-convex/coderabbit/.coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: ea1e7f94-a61a-4e3b-8507-7e93a6608335

📥 Commits

Reviewing files that changed from the base of the PR and between 3cf5064 and 641c83c.

📒 Files selected for processing (4)
  • src/client/streaming.integration.test.ts
  • src/deltas.test.ts
  • src/deltas.ts
  • src/react/useStreamingUIMessages.ts
💤 Files with no reviewable changes (1)
  • src/client/streaming.integration.test.ts

Comment thread src/deltas.ts
Comment thread src/react/useStreamingUIMessages.ts Outdated
robelest added 4 commits May 27, 2026 13:39
…test)

- deltas.ts: drop `toolInputText[id]` from incremental state when a tool
  transitions to input-available / tool-input-error. The raw JSON is no longer
  needed and was being carried through every later batch on the hot path.
- deltas.test.ts: rename the "O(N) not O(N²)" test to reflect what it actually
  proves (cursor slicing — each part handed to applyUIMessageChunksIncremental
  exactly once); document that the algorithmic claim is proven by the PR's
  21,000 ms → 73 ms benchmark, not by the unit test.

Skipped CodeRabbit's third nit ("parse-on-complete" assertion for the
partial-input test): that would contradict the deliberate partial-JSON parse
during streaming (deltas.ts:435-443), which mirrors the AI SDK's streamObject
behavior and is documented inline.
parsePartialJson was the only await — replace with JSON.parse + try/catch.
JSON.parse only materializes input when JSON is complete; parsePartialJson
repair-parsed partial JSON eagerly, which could corrupt the accumulator
if input was used before parsing finished.

Fixes the test asserting old eager-parse behavior and removes the
parsePartialJson import that was added but not needed.
All agent streams now write UIMessageChunk. The text-format path
(updateFromTextStreamParts, deriveUIMessagesFromTextStreamParts) has been
dead code since streaming.ts switched to DeltaStreamer<UIMessageChunk>.
Removes ~900 lines including tests.
- deriveUIMessagesFromDeltas: add format guard to fail fast on non-UIMessageChunk
  streams rather than silently processing them through the wrong path
- useStreamingUIMessages: replace .at(-1) fast-path with delta.end > cursor scan
  so unsorted delta arrays don't cause spurious noNewDeltas early returns
- deltas.test.ts: drop unnecessary async from tests that don't await
- streaming.integration.test.ts: add format: UIMessageChunk to StreamMessage
  fixtures used by deriveUIMessagesFromDeltas status-mapping test
@robelest robelest force-pushed the fix/drop-text-stream-format branch from 641c83c to 995691a Compare May 27, 2026 17:48
@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new Bot commented May 27, 2026

Open in StackBlitz

npm i https://pkg.pr.new/@convex-dev/agent@275

commit: 995691a

await Promise.all(
streams.map(async ({ deltas, streamMessage }) => {
const { parts, cursor } = getParts<UIMessageChunk>(deltas, 0);
if (streamMessage.format === "UIMessageChunk") {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should still do some check for this. the intent was to be able to have multiple formats supported - we should at least throw if we see a format we don't understand / if they were using code that was using the simpler text stream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants