From 4baec50ff5e0d8048e54a79984a43b9798df41d2 Mon Sep 17 00:00:00 2001 From: "rosetta-livekit-bot[bot]" <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com> Date: Thu, 11 Jun 2026 16:59:52 +0000 Subject: [PATCH] fix(sarvam): use provider speech timing for eos --- .changeset/fix-sarvam-eos-timing.md | 5 + plugins/sarvam/src/stt.ts | 136 ++++++++++++++++++++++++---- 2 files changed, 121 insertions(+), 20 deletions(-) create mode 100644 .changeset/fix-sarvam-eos-timing.md diff --git a/.changeset/fix-sarvam-eos-timing.md b/.changeset/fix-sarvam-eos-timing.md new file mode 100644 index 000000000..8d7e03fb9 --- /dev/null +++ b/.changeset/fix-sarvam-eos-timing.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents-plugin-sarvam': patch +--- + +Fix Sarvam streaming STT end-of-speech timing. diff --git a/plugins/sarvam/src/stt.ts b/plugins/sarvam/src/stt.ts index 48a6d7abc..6f40687cb 100644 --- a/plugins/sarvam/src/stt.ts +++ b/plugins/sarvam/src/stt.ts @@ -35,6 +35,7 @@ const SARVAM_STT_TRANSLATE_WS_URL = 'wss://api.sarvam.ai/speech-to-text-translat const SAMPLE_RATE = 16000; const NUM_CHANNELS = 1; +const EOS_FALLBACK_TIMEOUT = 1000; // --------------------------------------------------------------------------- // Model-specific option types @@ -409,6 +410,8 @@ interface SarvamWSTranscriptData { transcript?: string; language_code?: string | null; language_probability?: number | null; + speech_start?: number | null; + speech_end?: number | null; timestamps?: Record | null; diarized_transcript?: Record | null; metrics?: { @@ -419,6 +422,7 @@ interface SarvamWSTranscriptData { /** type: "events" */ interface SarvamWSEventData { + request_id?: string; event_type?: string; timestamp?: string; signal_type?: 'START_SPEECH' | 'END_SPEECH'; @@ -554,6 +558,11 @@ export class SpeechStream extends stt.SpeechStream { #speaking = false; #resetWS = new Future(); #requestId = ''; + #pendingFinalData: SarvamWSTranscriptData | undefined; + #pendingEos = false; + #eosFallbackTimer: ReturnType | undefined; + #finalReceivedForUtterance = false; + #eosEmittedForUtterance = false; label = 'sarvam.SpeechStream'; constructor(sttInstance: STT, opts: ResolvedSTTOptions, connOptions?: APIConnectOptions) { @@ -583,6 +592,83 @@ export class SpeechStream extends stt.SpeechStream { this.#resetWS.resolve(); } + #positiveTime(value: unknown): number | undefined { + if (typeof value !== 'number' || !Number.isFinite(value) || value <= 0) { + return undefined; + } + return value + this.startTimeOffset; + } + + #resetUtteranceState() { + this.#cancelEosFallback(); + this.#pendingFinalData = undefined; + this.#pendingEos = false; + this.#finalReceivedForUtterance = false; + this.#eosEmittedForUtterance = false; + } + + #cancelEosFallback() { + if (this.#eosFallbackTimer != null) { + clearTimeout(this.#eosFallbackTimer); + this.#eosFallbackTimer = undefined; + } + } + + #emitEndOfSpeech(putMessage: (event: stt.SpeechEvent) => void) { + if (this.#eosEmittedForUtterance) { + return; + } + + this.#cancelEosFallback(); + putMessage({ type: stt.SpeechEventType.END_OF_SPEECH, requestId: this.#requestId }); + this.#eosEmittedForUtterance = true; + this.#pendingEos = false; + } + + #sendFinalTranscript( + transcriptData: SarvamWSTranscriptData, + putMessage: (event: stt.SpeechEvent) => void, + ): boolean { + const transcript = transcriptData.transcript ?? ''; + if (!transcript) { + return false; + } + + const language = normalizeLanguage( + transcriptData.language_code ?? this.#opts.languageCode ?? 'unknown', + ); + const requestId = transcriptData.request_id ?? this.#requestId; + const confidence = extractConfidence(transcriptData, this.#logger); + this.#requestId = requestId; + + putMessage({ + type: stt.SpeechEventType.FINAL_TRANSCRIPT, + requestId, + alternatives: [ + { + text: transcript, + language, + startTime: this.#positiveTime(transcriptData.speech_start) ?? 0, + endTime: this.#positiveTime(transcriptData.speech_end) ?? 0, + confidence, + }, + ], + }); + return true; + } + + #tryCommitUtterance(putMessage: (event: stt.SpeechEvent) => void) { + if (this.#pendingFinalData == null || this.#eosEmittedForUtterance) { + return; + } + + const committedData = this.#pendingFinalData; + if (this.#sendFinalTranscript(committedData, putMessage)) { + this.#emitEndOfSpeech(putMessage); + this.#pendingFinalData = undefined; + } + } + protected async run() { const maxRetry = 32; let retries = 0; @@ -642,6 +728,7 @@ export class SpeechStream extends stt.SpeechStream { async #runWS(ws: WebSocket) { this.#resetWS = new Future(); this.#speaking = false; + this.#resetUtteranceState(); let closing = false; // Session-scoped controller: aborted in finally to cancel sendTask on WS reset const sessionController = new AbortController(); @@ -765,28 +852,42 @@ export class SpeechStream extends stt.SpeechStream { if (msgType === 'events') { const eventData = (json['data'] as SarvamWSEventData | undefined) ?? {}; + if (eventData.request_id) { + this.#requestId = eventData.request_id; + } const signalType = eventData.signal_type; if (signalType === 'START_SPEECH') { if (!this.#speaking) { + this.#resetUtteranceState(); this.#speaking = true; putMessage({ type: stt.SpeechEventType.START_OF_SPEECH }); } } else if (signalType === 'END_SPEECH') { if (this.#speaking) { this.#speaking = false; - putMessage({ type: stt.SpeechEventType.END_OF_SPEECH }); + this.#pendingEos = true; + this.#tryCommitUtterance(putMessage); + if (this.#pendingEos && this.#pendingFinalData == null) { + if (this.#finalReceivedForUtterance) { + this.#emitEndOfSpeech(putMessage); + } else if (this.#eosFallbackTimer == null) { + this.#eosFallbackTimer = setTimeout(() => { + if (this.#pendingEos && !this.#eosEmittedForUtterance) { + this.#emitEndOfSpeech(putMessage); + } + }, EOS_FALLBACK_TIMEOUT); + } + } } } } else if (msgType === 'data') { const td = (json['data'] as SarvamWSTranscriptData | undefined) ?? {}; const transcript = td.transcript ?? ''; - const language = normalizeLanguage( - td.language_code ?? this.#opts.languageCode ?? 'unknown', - ); const requestId = td.request_id ?? ''; - const confidence = extractConfidence(td, this.#logger); - this.#requestId = requestId; + if (requestId) { + this.#requestId = requestId; + } // Log metrics when available if (td.metrics) { @@ -796,24 +897,18 @@ export class SpeechStream extends stt.SpeechStream { } if (transcript) { - if (!this.#speaking) { + if (!this.#speaking && !this.#pendingEos && !this.#eosEmittedForUtterance) { this.#speaking = true; putMessage({ type: stt.SpeechEventType.START_OF_SPEECH }); } - putMessage({ - type: stt.SpeechEventType.FINAL_TRANSCRIPT, - requestId, - alternatives: [ - { - text: transcript, - language, - startTime: 0, - endTime: td.metrics?.audio_duration ?? 0, - confidence, - }, - ], - }); + if (this.#pendingEos) { + this.#pendingFinalData = td; + this.#finalReceivedForUtterance = true; + this.#tryCommitUtterance(putMessage); + } else if (this.#sendFinalTranscript(td, putMessage)) { + this.#finalReceivedForUtterance = true; + } } } else if (msgType === 'error') { // Server format: { type: "error", data: { message: "...", code: "..." } } @@ -852,6 +947,7 @@ export class SpeechStream extends stt.SpeechStream { } finally { closing = true; sessionController.abort(); + this.#cancelEosFallback(); // Do NOT call listenTask.cancel() — it would abort this.abortController // (passed to Task.from) and permanently break the stream. Instead, ws.close() // triggers the ws.once('close') handler inside listenMessage, letting listenTask