diff --git a/.changeset/fix-non-delta-transcription-final-text.md b/.changeset/fix-non-delta-transcription-final-text.md new file mode 100644 index 000000000..33121a710 --- /dev/null +++ b/.changeset/fix-non-delta-transcription-final-text.md @@ -0,0 +1,20 @@ +--- +'@livekit/agents': patch +--- + +Fix `ParticipantTranscriptionOutput` non-delta final-stream publishing two +races that surfaced with Deepgram-style mid-utterance final bursts. + +1. `handleFlush()` previously read `this.latestText` from inside + `flushTaskImpl`, so a `captureText()` for the next segment landing + before the flush task ran would overwrite the field and cause segment + A's `lk.transcription_final: "true"` stream to publish segment B's + text. The text to flush is now snapshotted when the task is scheduled + and passed in as a parameter. + +2. When the first event for a fresh segment was already `is_final`, + `handleCaptureText` called `resetState()` after `captureText` had set + `latestText`, clearing it back to `""`. The subsequent final stream + then published an empty string. `latestText` is now restored from the + captured payload immediately after `resetState()` so the same-tick + final preserves the captured text. diff --git a/agents/src/voice/room_io/_output.test.ts b/agents/src/voice/room_io/_output.test.ts index 24b03ba8d..8dd3643e7 100644 --- a/agents/src/voice/room_io/_output.test.ts +++ b/agents/src/voice/room_io/_output.test.ts @@ -2,8 +2,8 @@ // // SPDX-License-Identifier: Apache-2.0 import { describe, expect, it, vi } from 'vitest'; -import { Future } from '../../utils.js'; -import { ParticipantAudioOutput } from './_output.js'; +import { Future, type Task } from '../../utils.js'; +import { ParticipantAudioOutput, ParticipantTranscriptionOutput } from './_output.js'; type CaptureFrameArg = Parameters[0]; @@ -220,3 +220,68 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => { expect(output.pushedDuration).toBeGreaterThan(0); }); }); + +describe('ParticipantTranscriptionOutput non-delta final flush', () => { + it('snapshots latestText at flush time so a next-segment capture cannot overwrite it', async () => { + type FlushTaskImpl = (writer: unknown, text: string, signal: AbortSignal) => Promise; + type FlushTarget = ParticipantTranscriptionOutput & { + writer: unknown; + flushTask: Task | null; + latestText: string; + flushTaskImpl: FlushTaskImpl; + handleFlush: () => void; + }; + + const output = Object.create(ParticipantTranscriptionOutput.prototype) as FlushTarget; + output.writer = null; + output.flushTask = null; + output.latestText = 'segment-A'; + + const flushedTexts: string[] = []; + output.flushTaskImpl = vi.fn(async (_writer, text) => { + flushedTexts.push(text); + }); + + output.handleFlush(); + // Simulate the next segment's first interim landing before the flush task + // gets to write — must not corrupt segment A's final text. + output.latestText = 'segment-B-interim'; + + await output.flushTask!.result; + + expect(flushedTexts).toEqual(['segment-A']); + }); + + it('preserves latestText through resetState() on a fresh-segment capture (final-only burst)', async () => { + type CaptureTarget = ParticipantTranscriptionOutput & { + participantIdentity: string | null; + capturing: boolean; + latestText: string; + currentId: string; + flushTask: Task | null; + writer: unknown; + jsonFormat: boolean; + room: { isConnected: boolean }; + logger: { error: () => void }; + handleCaptureText: (text: string) => Promise; + captureText: (text: string) => Promise; + }; + + const output = Object.create(ParticipantTranscriptionOutput.prototype) as CaptureTarget; + output.participantIdentity = 'user-1'; + output.capturing = false; + output.latestText = ''; + output.currentId = 'SG_initial'; + output.flushTask = null; + output.writer = null; + output.jsonFormat = false; + output.room = { isConnected: false }; + output.logger = { error: vi.fn() }; + + // First (and only) event for this segment is already a final. + await output.captureText('hello world'); + + expect(output.capturing).toBe(true); + expect(output.latestText).toBe('hello world'); + }); +}); diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index 11dd8eb71..7c34849a5 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -190,6 +190,11 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription if (!this.capturing) { this.resetState(); this.capturing = true; + // resetState() clears latestText, but the caller already set it to the + // payload for this capture (captureText, line above). Re-assign so the + // subsequent flush republishes the correct text for segments whose first + // event is already final (no prior interims). + this.latestText = text; } try { @@ -214,7 +219,16 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription protected handleFlush() { const currWriter = this.writer; this.writer = null; - this.flushTask = Task.from((controller) => this.flushTaskImpl(currWriter, controller.signal)); + // Snapshot latestText and currentId now. `setParticipant()` calls + // `flush()` then `resetState()` synchronously, so without the snapshot + // the in-flight flushTaskImpl would read the next segment's + // `this.currentId` when it builds its attributes — publishing the prior + // turn's text under the new segment id. + const textToFlush = this.latestText; + const segmentIdToFlush = this.currentId; + this.flushTask = Task.from((controller) => + this.flushTaskImpl(currWriter, textToFlush, segmentIdToFlush, controller.signal), + ); } private async createTextWriter(attributes?: Record): Promise { @@ -234,7 +248,12 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription attributes[ATTRIBUTE_TRANSCRIPTION_TRACK_ID] = this.trackId; } } - attributes[ATTRIBUTE_TRANSCRIPTION_SEGMENT_ID] = this.currentId; + // Honor a caller-provided SEGMENT_ID — flushTaskImpl snapshots it at + // handleFlush time so a racing resetState() can't shift it. Default + // (live captures) still reads the current id. + if (!attributes[ATTRIBUTE_TRANSCRIPTION_SEGMENT_ID]) { + attributes[ATTRIBUTE_TRANSCRIPTION_SEGMENT_ID] = this.currentId; + } return await this.room.localParticipant.streamText({ topic: TOPIC_TRANSCRIPTION, @@ -243,9 +262,15 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription }); } - private async flushTaskImpl(writer: TextStreamWriter | null, signal: AbortSignal): Promise { + private async flushTaskImpl( + writer: TextStreamWriter | null, + text: string, + segmentId: string, + signal: AbortSignal, + ): Promise { const attributes: Record = { [ATTRIBUTE_TRANSCRIPTION_FINAL]: 'true', + [ATTRIBUTE_TRANSCRIPTION_SEGMENT_ID]: segmentId, }; if (this.trackId) { attributes[ATTRIBUTE_TRANSCRIPTION_TRACK_ID] = this.trackId; @@ -266,7 +291,7 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription if (signal.aborted || !tmpWriter) { return; } - await Promise.race([tmpWriter.write(this.latestText), abortPromise]); + await Promise.race([tmpWriter.write(text), abortPromise]); if (signal.aborted) { return; }