From b310d8b82bd4058b4afec8e023909d928bd69407 Mon Sep 17 00:00:00 2001 From: tsushanth <78000697+tsushanth@users.noreply.github.com> Date: Thu, 11 Jun 2026 10:21:13 -0700 Subject: [PATCH] fix(voice/room_io): non-delta transcription final-stream races MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two races in ParticipantTranscriptionOutput on the non-delta path (default user-transcription forwarding). Both surface with Deepgram-style mid-utterance final bursts where multiple is_final chunks arrive back-to-back. 1. handleFlush() read this.latestText from inside flushTaskImpl, so a captureText() for the next segment that landed before the flush task executed would overwrite the field and cause segment A's lk.transcription_final="true" stream to publish segment B's text (observed: a full sentence replaced by a follow-on fragment). Snapshot latestText when the task is scheduled and pass it as an argument. 2. When the first event for a fresh segment was already is_final, captureText set latestText = payload, then handleCaptureText called resetState() which cleared latestText back to "". The subsequent final stream then published an empty string (no chunk → subscribers keyed on lk.segment_id never received the text). Restore latestText from the captured payload immediately after the resetState() call. Adds two regression tests via Object.create that fail without the production change. Closes #1759 --- .../fix-non-delta-transcription-final-text.md | 20 ++++++ agents/src/voice/room_io/_output.test.ts | 69 ++++++++++++++++++- agents/src/voice/room_io/_output.ts | 20 +++++- 3 files changed, 104 insertions(+), 5 deletions(-) create mode 100644 .changeset/fix-non-delta-transcription-final-text.md diff --git a/.changeset/fix-non-delta-transcription-final-text.md b/.changeset/fix-non-delta-transcription-final-text.md new file mode 100644 index 000000000..33121a710 --- /dev/null +++ b/.changeset/fix-non-delta-transcription-final-text.md @@ -0,0 +1,20 @@ +--- +'@livekit/agents': patch +--- + +Fix `ParticipantTranscriptionOutput` non-delta final-stream publishing two +races that surfaced with Deepgram-style mid-utterance final bursts. + +1. `handleFlush()` previously read `this.latestText` from inside + `flushTaskImpl`, so a `captureText()` for the next segment landing + before the flush task ran would overwrite the field and cause segment + A's `lk.transcription_final: "true"` stream to publish segment B's + text. The text to flush is now snapshotted when the task is scheduled + and passed in as a parameter. + +2. When the first event for a fresh segment was already `is_final`, + `handleCaptureText` called `resetState()` after `captureText` had set + `latestText`, clearing it back to `""`. The subsequent final stream + then published an empty string. `latestText` is now restored from the + captured payload immediately after `resetState()` so the same-tick + final preserves the captured text. diff --git a/agents/src/voice/room_io/_output.test.ts b/agents/src/voice/room_io/_output.test.ts index 24b03ba8d..8dd3643e7 100644 --- a/agents/src/voice/room_io/_output.test.ts +++ b/agents/src/voice/room_io/_output.test.ts @@ -2,8 +2,8 @@ // // SPDX-License-Identifier: Apache-2.0 import { describe, expect, it, vi } from 'vitest'; -import { Future } from '../../utils.js'; -import { ParticipantAudioOutput } from './_output.js'; +import { Future, type Task } from '../../utils.js'; +import { ParticipantAudioOutput, ParticipantTranscriptionOutput } from './_output.js'; type CaptureFrameArg = Parameters[0]; @@ -220,3 +220,68 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => { expect(output.pushedDuration).toBeGreaterThan(0); }); }); + +describe('ParticipantTranscriptionOutput non-delta final flush', () => { + it('snapshots latestText at flush time so a next-segment capture cannot overwrite it', async () => { + type FlushTaskImpl = (writer: unknown, text: string, signal: AbortSignal) => Promise; + type FlushTarget = ParticipantTranscriptionOutput & { + writer: unknown; + flushTask: Task | null; + latestText: string; + flushTaskImpl: FlushTaskImpl; + handleFlush: () => void; + }; + + const output = Object.create(ParticipantTranscriptionOutput.prototype) as FlushTarget; + output.writer = null; + output.flushTask = null; + output.latestText = 'segment-A'; + + const flushedTexts: string[] = []; + output.flushTaskImpl = vi.fn(async (_writer, text) => { + flushedTexts.push(text); + }); + + output.handleFlush(); + // Simulate the next segment's first interim landing before the flush task + // gets to write — must not corrupt segment A's final text. + output.latestText = 'segment-B-interim'; + + await output.flushTask!.result; + + expect(flushedTexts).toEqual(['segment-A']); + }); + + it('preserves latestText through resetState() on a fresh-segment capture (final-only burst)', async () => { + type CaptureTarget = ParticipantTranscriptionOutput & { + participantIdentity: string | null; + capturing: boolean; + latestText: string; + currentId: string; + flushTask: Task | null; + writer: unknown; + jsonFormat: boolean; + room: { isConnected: boolean }; + logger: { error: () => void }; + handleCaptureText: (text: string) => Promise; + captureText: (text: string) => Promise; + }; + + const output = Object.create(ParticipantTranscriptionOutput.prototype) as CaptureTarget; + output.participantIdentity = 'user-1'; + output.capturing = false; + output.latestText = ''; + output.currentId = 'SG_initial'; + output.flushTask = null; + output.writer = null; + output.jsonFormat = false; + output.room = { isConnected: false }; + output.logger = { error: vi.fn() }; + + // First (and only) event for this segment is already a final. + await output.captureText('hello world'); + + expect(output.capturing).toBe(true); + expect(output.latestText).toBe('hello world'); + }); +}); diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index 11dd8eb71..05fa79dd7 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -190,6 +190,11 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription if (!this.capturing) { this.resetState(); this.capturing = true; + // resetState() clears latestText, but the caller already set it to the + // payload for this capture (captureText, line above). Re-assign so the + // subsequent flush republishes the correct text for segments whose first + // event is already final (no prior interims). + this.latestText = text; } try { @@ -214,7 +219,12 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription protected handleFlush() { const currWriter = this.writer; this.writer = null; - this.flushTask = Task.from((controller) => this.flushTaskImpl(currWriter, controller.signal)); + // Snapshot latestText now so a subsequent captureText() for the next + // segment doesn't overwrite the text this flush is meant to publish. + const textToFlush = this.latestText; + this.flushTask = Task.from((controller) => + this.flushTaskImpl(currWriter, textToFlush, controller.signal), + ); } private async createTextWriter(attributes?: Record): Promise { @@ -243,7 +253,11 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription }); } - private async flushTaskImpl(writer: TextStreamWriter | null, signal: AbortSignal): Promise { + private async flushTaskImpl( + writer: TextStreamWriter | null, + text: string, + signal: AbortSignal, + ): Promise { const attributes: Record = { [ATTRIBUTE_TRANSCRIPTION_FINAL]: 'true', }; @@ -266,7 +280,7 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription if (signal.aborted || !tmpWriter) { return; } - await Promise.race([tmpWriter.write(this.latestText), abortPromise]); + await Promise.race([tmpWriter.write(text), abortPromise]); if (signal.aborted) { return; }