-
Notifications
You must be signed in to change notification settings - Fork 309
fix(sarvam): use provider speech timing for eos #1763
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 1.5.0
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| '@livekit/agents-plugin-sarvam': patch | ||
| --- | ||
|
|
||
| Fix Sarvam streaming STT end-of-speech timing. |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -35,6 +35,7 @@ const SARVAM_STT_TRANSLATE_WS_URL = 'wss://api.sarvam.ai/speech-to-text-translat | |||||||||||||
|
|
||||||||||||||
| const SAMPLE_RATE = 16000; | ||||||||||||||
| const NUM_CHANNELS = 1; | ||||||||||||||
| const EOS_FALLBACK_TIMEOUT = 1000; | ||||||||||||||
|
|
||||||||||||||
| // --------------------------------------------------------------------------- | ||||||||||||||
| // Model-specific option types | ||||||||||||||
|
|
@@ -409,6 +410,8 @@ interface SarvamWSTranscriptData { | |||||||||||||
| transcript?: string; | ||||||||||||||
| language_code?: string | null; | ||||||||||||||
| language_probability?: number | null; | ||||||||||||||
| speech_start?: number | null; | ||||||||||||||
| speech_end?: number | null; | ||||||||||||||
| timestamps?: Record<string, unknown> | null; | ||||||||||||||
| diarized_transcript?: Record<string, unknown> | null; | ||||||||||||||
| metrics?: { | ||||||||||||||
|
|
@@ -419,6 +422,7 @@ interface SarvamWSTranscriptData { | |||||||||||||
|
|
||||||||||||||
| /** type: "events" */ | ||||||||||||||
| interface SarvamWSEventData { | ||||||||||||||
| request_id?: string; | ||||||||||||||
| event_type?: string; | ||||||||||||||
| timestamp?: string; | ||||||||||||||
| signal_type?: 'START_SPEECH' | 'END_SPEECH'; | ||||||||||||||
|
|
@@ -554,6 +558,11 @@ export class SpeechStream extends stt.SpeechStream { | |||||||||||||
| #speaking = false; | ||||||||||||||
| #resetWS = new Future(); | ||||||||||||||
| #requestId = ''; | ||||||||||||||
| #pendingFinalData: SarvamWSTranscriptData | undefined; | ||||||||||||||
| #pendingEos = false; | ||||||||||||||
| #eosFallbackTimer: ReturnType<typeof setTimeout> | undefined; | ||||||||||||||
| #finalReceivedForUtterance = false; | ||||||||||||||
| #eosEmittedForUtterance = false; | ||||||||||||||
| label = 'sarvam.SpeechStream'; | ||||||||||||||
|
|
||||||||||||||
| constructor(sttInstance: STT, opts: ResolvedSTTOptions, connOptions?: APIConnectOptions) { | ||||||||||||||
|
|
@@ -583,6 +592,83 @@ export class SpeechStream extends stt.SpeechStream { | |||||||||||||
| this.#resetWS.resolve(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #positiveTime(value: unknown): number | undefined { | ||||||||||||||
| if (typeof value !== 'number' || !Number.isFinite(value) || value <= 0) { | ||||||||||||||
| return undefined; | ||||||||||||||
| } | ||||||||||||||
| return value + this.startTimeOffset; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #resetUtteranceState() { | ||||||||||||||
| this.#cancelEosFallback(); | ||||||||||||||
| this.#pendingFinalData = undefined; | ||||||||||||||
| this.#pendingEos = false; | ||||||||||||||
| this.#finalReceivedForUtterance = false; | ||||||||||||||
| this.#eosEmittedForUtterance = false; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #cancelEosFallback() { | ||||||||||||||
| if (this.#eosFallbackTimer != null) { | ||||||||||||||
| clearTimeout(this.#eosFallbackTimer); | ||||||||||||||
| this.#eosFallbackTimer = undefined; | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #emitEndOfSpeech(putMessage: (event: stt.SpeechEvent) => void) { | ||||||||||||||
| if (this.#eosEmittedForUtterance) { | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| this.#cancelEosFallback(); | ||||||||||||||
| putMessage({ type: stt.SpeechEventType.END_OF_SPEECH, requestId: this.#requestId }); | ||||||||||||||
| this.#eosEmittedForUtterance = true; | ||||||||||||||
| this.#pendingEos = false; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #sendFinalTranscript( | ||||||||||||||
| transcriptData: SarvamWSTranscriptData, | ||||||||||||||
| putMessage: (event: stt.SpeechEvent) => void, | ||||||||||||||
| ): boolean { | ||||||||||||||
| const transcript = transcriptData.transcript ?? ''; | ||||||||||||||
| if (!transcript) { | ||||||||||||||
| return false; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| const language = normalizeLanguage( | ||||||||||||||
| transcriptData.language_code ?? this.#opts.languageCode ?? 'unknown', | ||||||||||||||
| ); | ||||||||||||||
| const requestId = transcriptData.request_id ?? this.#requestId; | ||||||||||||||
| const confidence = extractConfidence(transcriptData, this.#logger); | ||||||||||||||
| this.#requestId = requestId; | ||||||||||||||
|
|
||||||||||||||
| putMessage({ | ||||||||||||||
| type: stt.SpeechEventType.FINAL_TRANSCRIPT, | ||||||||||||||
| requestId, | ||||||||||||||
| alternatives: [ | ||||||||||||||
| { | ||||||||||||||
| text: transcript, | ||||||||||||||
| language, | ||||||||||||||
| startTime: this.#positiveTime(transcriptData.speech_start) ?? 0, | ||||||||||||||
| endTime: this.#positiveTime(transcriptData.speech_end) ?? 0, | ||||||||||||||
| confidence, | ||||||||||||||
| }, | ||||||||||||||
| ], | ||||||||||||||
| }); | ||||||||||||||
| return true; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| #tryCommitUtterance(putMessage: (event: stt.SpeechEvent) => void) { | ||||||||||||||
| if (this.#pendingFinalData == null || this.#eosEmittedForUtterance) { | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| const committedData = this.#pendingFinalData; | ||||||||||||||
| if (this.#sendFinalTranscript(committedData, putMessage)) { | ||||||||||||||
| this.#emitEndOfSpeech(putMessage); | ||||||||||||||
| this.#pendingFinalData = undefined; | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| protected async run() { | ||||||||||||||
| const maxRetry = 32; | ||||||||||||||
| let retries = 0; | ||||||||||||||
|
|
@@ -642,6 +728,7 @@ export class SpeechStream extends stt.SpeechStream { | |||||||||||||
| async #runWS(ws: WebSocket) { | ||||||||||||||
| this.#resetWS = new Future(); | ||||||||||||||
| this.#speaking = false; | ||||||||||||||
| this.#resetUtteranceState(); | ||||||||||||||
| let closing = false; | ||||||||||||||
| // Session-scoped controller: aborted in finally to cancel sendTask on WS reset | ||||||||||||||
| const sessionController = new AbortController(); | ||||||||||||||
|
|
@@ -765,28 +852,42 @@ export class SpeechStream extends stt.SpeechStream { | |||||||||||||
|
|
||||||||||||||
| if (msgType === 'events') { | ||||||||||||||
| const eventData = (json['data'] as SarvamWSEventData | undefined) ?? {}; | ||||||||||||||
| if (eventData.request_id) { | ||||||||||||||
| this.#requestId = eventData.request_id; | ||||||||||||||
| } | ||||||||||||||
| const signalType = eventData.signal_type; | ||||||||||||||
|
|
||||||||||||||
| if (signalType === 'START_SPEECH') { | ||||||||||||||
| if (!this.#speaking) { | ||||||||||||||
| this.#resetUtteranceState(); | ||||||||||||||
| this.#speaking = true; | ||||||||||||||
| putMessage({ type: stt.SpeechEventType.START_OF_SPEECH }); | ||||||||||||||
| } | ||||||||||||||
| } else if (signalType === 'END_SPEECH') { | ||||||||||||||
| if (this.#speaking) { | ||||||||||||||
| this.#speaking = false; | ||||||||||||||
| putMessage({ type: stt.SpeechEventType.END_OF_SPEECH }); | ||||||||||||||
| this.#pendingEos = true; | ||||||||||||||
| this.#tryCommitUtterance(putMessage); | ||||||||||||||
| if (this.#pendingEos && this.#pendingFinalData == null) { | ||||||||||||||
| if (this.#finalReceivedForUtterance) { | ||||||||||||||
| this.#emitEndOfSpeech(putMessage); | ||||||||||||||
| } else if (this.#eosFallbackTimer == null) { | ||||||||||||||
| this.#eosFallbackTimer = setTimeout(() => { | ||||||||||||||
| if (this.#pendingEos && !this.#eosEmittedForUtterance) { | ||||||||||||||
| this.#emitEndOfSpeech(putMessage); | ||||||||||||||
| } | ||||||||||||||
| }, EOS_FALLBACK_TIMEOUT); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } else if (msgType === 'data') { | ||||||||||||||
| const td = (json['data'] as SarvamWSTranscriptData | undefined) ?? {}; | ||||||||||||||
| const transcript = td.transcript ?? ''; | ||||||||||||||
| const language = normalizeLanguage( | ||||||||||||||
| td.language_code ?? this.#opts.languageCode ?? 'unknown', | ||||||||||||||
| ); | ||||||||||||||
| const requestId = td.request_id ?? ''; | ||||||||||||||
| const confidence = extractConfidence(td, this.#logger); | ||||||||||||||
| this.#requestId = requestId; | ||||||||||||||
| if (requestId) { | ||||||||||||||
| this.#requestId = requestId; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Log metrics when available | ||||||||||||||
| if (td.metrics) { | ||||||||||||||
|
|
@@ -796,24 +897,18 @@ export class SpeechStream extends stt.SpeechStream { | |||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| if (transcript) { | ||||||||||||||
| if (!this.#speaking) { | ||||||||||||||
| if (!this.#speaking && !this.#pendingEos && !this.#eosEmittedForUtterance) { | ||||||||||||||
| this.#speaking = true; | ||||||||||||||
| putMessage({ type: stt.SpeechEventType.START_OF_SPEECH }); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| putMessage({ | ||||||||||||||
| type: stt.SpeechEventType.FINAL_TRANSCRIPT, | ||||||||||||||
| requestId, | ||||||||||||||
| alternatives: [ | ||||||||||||||
| { | ||||||||||||||
| text: transcript, | ||||||||||||||
| language, | ||||||||||||||
| startTime: 0, | ||||||||||||||
| endTime: td.metrics?.audio_duration ?? 0, | ||||||||||||||
| confidence, | ||||||||||||||
| }, | ||||||||||||||
| ], | ||||||||||||||
| }); | ||||||||||||||
| if (this.#pendingEos) { | ||||||||||||||
| this.#pendingFinalData = td; | ||||||||||||||
| this.#finalReceivedForUtterance = true; | ||||||||||||||
| this.#tryCommitUtterance(putMessage); | ||||||||||||||
| } else if (this.#sendFinalTranscript(td, putMessage)) { | ||||||||||||||
| this.#finalReceivedForUtterance = true; | ||||||||||||||
| } | ||||||||||||||
|
Comment on lines
+909
to
+911
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Missing When the EOS fallback timer fires (because the server sent Note that
Suggested change
Was this helpful? React with 👍 or 👎 to provide feedback. |
||||||||||||||
| } | ||||||||||||||
| } else if (msgType === 'error') { | ||||||||||||||
| // Server format: { type: "error", data: { message: "...", code: "..." } } | ||||||||||||||
|
|
@@ -852,6 +947,7 @@ export class SpeechStream extends stt.SpeechStream { | |||||||||||||
| } finally { | ||||||||||||||
| closing = true; | ||||||||||||||
| sessionController.abort(); | ||||||||||||||
| this.#cancelEosFallback(); | ||||||||||||||
| // Do NOT call listenTask.cancel() — it would abort this.abortController | ||||||||||||||
| // (passed to Task.from) and permanently break the stream. Instead, ws.close() | ||||||||||||||
| // triggers the ws.once('close') handler inside listenMessage, letting listenTask | ||||||||||||||
|
|
||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚩 EOS_FALLBACK_TIMEOUT of 1000ms may need tuning
The
EOS_FALLBACK_TIMEOUTconstant is set to 1000ms at line 38. This is the maximum time the system will wait for a transcript after receivingEND_SPEECHbefore emittingEND_OF_SPEECHwithout one. If Sarvam's server processing latency is sometimes >1000ms (e.g., for longer utterances or under load), the fallback could fire prematurely, causing the transcript to arrive after END_OF_SPEECH (which is the scenario in BUG-0001). The Sarvam STT metrics logging at line 894-896 capturesprocessing_latency— monitoring this in production would help determine if the 1000ms timeout is appropriate.Was this helpful? React with 👍 or 👎 to provide feedback.