Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .changeset/fix-non-delta-transcription-final-text.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
'@livekit/agents': patch
---

Fix `ParticipantTranscriptionOutput` non-delta final-stream publishing two
races that surfaced with Deepgram-style mid-utterance final bursts.

1. `handleFlush()` previously read `this.latestText` from inside
`flushTaskImpl`, so a `captureText()` for the next segment landing
before the flush task ran would overwrite the field and cause segment
A's `lk.transcription_final: "true"` stream to publish segment B's
text. The text to flush is now snapshotted when the task is scheduled
and passed in as a parameter.

2. When the first event for a fresh segment was already `is_final`,
`handleCaptureText` called `resetState()` after `captureText` had set
`latestText`, clearing it back to `""`. The subsequent final stream
then published an empty string. `latestText` is now restored from the
captured payload immediately after `resetState()` so the same-tick
final preserves the captured text.
69 changes: 67 additions & 2 deletions agents/src/voice/room_io/_output.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//
// SPDX-License-Identifier: Apache-2.0
import { describe, expect, it, vi } from 'vitest';
import { Future } from '../../utils.js';
import { ParticipantAudioOutput } from './_output.js';
import { Future, type Task } from '../../utils.js';
import { ParticipantAudioOutput, ParticipantTranscriptionOutput } from './_output.js';

type CaptureFrameArg = Parameters<ParticipantAudioOutput['captureFrame']>[0];

Expand Down Expand Up @@ -220,3 +220,68 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => {
expect(output.pushedDuration).toBeGreaterThan(0);
});
});

describe('ParticipantTranscriptionOutput non-delta final flush', () => {
it('snapshots latestText at flush time so a next-segment capture cannot overwrite it', async () => {
type FlushTaskImpl = (writer: unknown, text: string, signal: AbortSignal) => Promise<void>;
type FlushTarget = ParticipantTranscriptionOutput & {
writer: unknown;
flushTask: Task<void> | null;
latestText: string;
flushTaskImpl: FlushTaskImpl;
handleFlush: () => void;
};

const output = Object.create(ParticipantTranscriptionOutput.prototype) as FlushTarget;
output.writer = null;
output.flushTask = null;
output.latestText = 'segment-A';

const flushedTexts: string[] = [];
output.flushTaskImpl = vi.fn(async (_writer, text) => {
flushedTexts.push(text);
});

output.handleFlush();
// Simulate the next segment's first interim landing before the flush task
// gets to write — must not corrupt segment A's final text.
output.latestText = 'segment-B-interim';

await output.flushTask!.result;

expect(flushedTexts).toEqual(['segment-A']);
});

it('preserves latestText through resetState() on a fresh-segment capture (final-only burst)', async () => {
type CaptureTarget = ParticipantTranscriptionOutput & {
participantIdentity: string | null;
capturing: boolean;
latestText: string;
currentId: string;
flushTask: Task<void> | null;
writer: unknown;
jsonFormat: boolean;
room: { isConnected: boolean };
logger: { error: () => void };
handleCaptureText: (text: string) => Promise<void>;
captureText: (text: string) => Promise<void>;
};

const output = Object.create(ParticipantTranscriptionOutput.prototype) as CaptureTarget;
output.participantIdentity = 'user-1';
output.capturing = false;
output.latestText = '';
output.currentId = 'SG_initial';
output.flushTask = null;
output.writer = null;
output.jsonFormat = false;
output.room = { isConnected: false };
output.logger = { error: vi.fn() };

// First (and only) event for this segment is already a final.
await output.captureText('hello world');

expect(output.capturing).toBe(true);
expect(output.latestText).toBe('hello world');
});
});
20 changes: 17 additions & 3 deletions agents/src/voice/room_io/_output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription
if (!this.capturing) {
this.resetState();
this.capturing = true;
// resetState() clears latestText, but the caller already set it to the
// payload for this capture (captureText, line above). Re-assign so the
// subsequent flush republishes the correct text for segments whose first
// event is already final (no prior interims).
this.latestText = text;
}

try {
Expand All @@ -214,7 +219,12 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription
protected handleFlush() {
const currWriter = this.writer;
this.writer = null;
this.flushTask = Task.from((controller) => this.flushTaskImpl(currWriter, controller.signal));
// Snapshot latestText now so a subsequent captureText() for the next
// segment doesn't overwrite the text this flush is meant to publish.
const textToFlush = this.latestText;
this.flushTask = Task.from((controller) =>
this.flushTaskImpl(currWriter, textToFlush, controller.signal),
);
}
Comment on lines 219 to 228

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 currentId is not snapshotted in handleFlush, relying on await ordering for correctness

The PR correctly snapshots latestText in handleFlush() at _output.ts:224, but currentId is still read from this.currentId lazily inside flushTaskImplcreateTextWriter at _output.ts:247. This is safe in the normal capture→flush flow because handleCaptureText at _output.ts:186-188 awaits the pending flush task before calling resetState() (which generates a new currentId). However, setParticipant() at _output.ts:68-69 calls flush() then resetState() synchronously without awaiting the flush task. If setParticipant is called while a flush task is in-flight, the flush could pick up the new segment ID. This is a pre-existing issue not introduced by this PR, but it's worth noting the asymmetry: latestText is now robustly snapshotted while currentId relies on caller ordering.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


private async createTextWriter(attributes?: Record<string, string>): Promise<TextStreamWriter> {
Expand Down Expand Up @@ -243,7 +253,11 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription
});
}

private async flushTaskImpl(writer: TextStreamWriter | null, signal: AbortSignal): Promise<void> {
private async flushTaskImpl(
writer: TextStreamWriter | null,
text: string,
signal: AbortSignal,
): Promise<void> {
const attributes: Record<string, string> = {
[ATTRIBUTE_TRANSCRIPTION_FINAL]: 'true',
};
Expand All @@ -266,7 +280,7 @@ export class ParticipantTranscriptionOutput extends BaseParticipantTranscription
if (signal.aborted || !tmpWriter) {
return;
}
await Promise.race([tmpWriter.write(this.latestText), abortPromise]);
await Promise.race([tmpWriter.write(text), abortPromise]);
if (signal.aborted) {
return;
}
Expand Down