fix(voice/room_io): non-delta transcription final-stream races#1765
fix(voice/room_io): non-delta transcription final-stream races#1765tsushanth wants to merge 1 commit into
Conversation
🦋 Changeset detectedLatest commit: b310d8b The changes in this PR will be included in the next version bump. This PR includes changesets to release 35 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
| protected handleFlush() { | ||
| const currWriter = this.writer; | ||
| this.writer = null; | ||
| this.flushTask = Task.from((controller) => this.flushTaskImpl(currWriter, controller.signal)); | ||
| // Snapshot latestText now so a subsequent captureText() for the next | ||
| // segment doesn't overwrite the text this flush is meant to publish. | ||
| const textToFlush = this.latestText; | ||
| this.flushTask = Task.from((controller) => | ||
| this.flushTaskImpl(currWriter, textToFlush, controller.signal), | ||
| ); | ||
| } |
There was a problem hiding this comment.
🚩 currentId is not snapshotted in handleFlush, relying on await ordering for correctness
The PR correctly snapshots latestText in handleFlush() at _output.ts:224, but currentId is still read from this.currentId lazily inside flushTaskImpl → createTextWriter at _output.ts:247. This is safe in the normal capture→flush flow because handleCaptureText at _output.ts:186-188 awaits the pending flush task before calling resetState() (which generates a new currentId). However, setParticipant() at _output.ts:68-69 calls flush() then resetState() synchronously without awaiting the flush task. If setParticipant is called while a flush task is in-flight, the flush could pick up the new segment ID. This is a pre-existing issue not introduced by this PR, but it's worth noting the asymmetry: latestText is now robustly snapshotted while currentId relies on caller ordering.
Was this helpful? React with 👍 or 👎 to provide feedback.
Two races in ParticipantTranscriptionOutput on the non-delta path (default user-transcription forwarding). Both surface with Deepgram-style mid-utterance final bursts where multiple is_final chunks arrive back-to-back. 1. handleFlush() read this.latestText from inside flushTaskImpl, so a captureText() for the next segment that landed before the flush task executed would overwrite the field and cause segment A's lk.transcription_final="true" stream to publish segment B's text (observed: a full sentence replaced by a follow-on fragment). Snapshot latestText when the task is scheduled and pass it as an argument. 2. When the first event for a fresh segment was already is_final, captureText set latestText = payload, then handleCaptureText called resetState() which cleared latestText back to "". The subsequent final stream then published an empty string (no chunk → subscribers keyed on lk.segment_id never received the text). Restore latestText from the captured payload immediately after the resetState() call. Adds two regression tests via Object.create that fail without the production change. Closes livekit#1759
f53612b to
b310d8b
Compare
Summary
ParticipantTranscriptionOutput(non-delta path, default user-transcription forwarding) had two race conditions that surfaced with Deepgram-style mid-utterance final bursts where multipleis_finalchunks arrive back-to-back.Bug 1 — next-segment capture overwrites the in-flight flush text
handleFlush()schedulesflushTaskImpl, which readsthis.latestTextwhen it later writes thelk.transcription_final: \"true\"stream. When the next segment's firstcaptureText()lands before that task runs, it overwriteslatestText, so segment A's final stream publishes segment B's text.Observed in the issue: the learner said "So, you made a big purchase with, a service. You tell me what exactly it was?" — segment A's final stream arrived carrying just "service." (the next chunk's fragment), and the client — keyed one entry per
lk.segment_id, last write wins — replaced the full sentence.Fix: snapshot
latestTextwhen the flush task is scheduled and pass it as a parameter toflushTaskImpl.protected handleFlush() { const currWriter = this.writer; this.writer = null; - this.flushTask = Task.from((controller) => this.flushTaskImpl(currWriter, controller.signal)); + const textToFlush = this.latestText; + this.flushTask = Task.from((controller) => + this.flushTaskImpl(currWriter, textToFlush, controller.signal), + ); }Bug 2 —
resetState()wipes the captured text on a final-only first eventcaptureText()setsthis.latestText = payload, thenhandleCaptureText()runsresetState()(which setslatestText = '') when a new segment starts. For any segment whose first event is alreadyis_final— common with multi-final bursts — the final stream publishes an empty string. An empty write produces no chunk, so subscribers keyed on the segment never receive the final text.Fix: restore
latestTextfrom the captured payload immediately after theresetState()call inhandleCaptureText. The legacy class (ParticipantLegacyTranscriptionOutput) is unaffected — it trackspushedTextseparately and resets that explicitly.if (!this.capturing) { this.resetState(); this.capturing = true; + this.latestText = text; }Test plan
main, both pass after the fixpnpm vitest run agents/src/voice/room_io/_output.test.ts— 5/5 pass (3 existing + 2 new)pnpm build:agents— cleanpnpm lint— no new warnings on touched filespnpm format:check— cleanParticipantLegacyTranscriptionOutputsemantics untouchedCloses #1759