Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-sarvam-eos-timing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents-plugin-sarvam': patch
---

Fix Sarvam streaming STT end-of-speech timing.
136 changes: 116 additions & 20 deletions plugins/sarvam/src/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const SARVAM_STT_TRANSLATE_WS_URL = 'wss://api.sarvam.ai/speech-to-text-translat

const SAMPLE_RATE = 16000;
const NUM_CHANNELS = 1;
const EOS_FALLBACK_TIMEOUT = 1000;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 EOS_FALLBACK_TIMEOUT of 1000ms may need tuning

The EOS_FALLBACK_TIMEOUT constant is set to 1000ms at line 38. This is the maximum time the system will wait for a transcript after receiving END_SPEECH before emitting END_OF_SPEECH without one. If Sarvam's server processing latency is sometimes >1000ms (e.g., for longer utterances or under load), the fallback could fire prematurely, causing the transcript to arrive after END_OF_SPEECH (which is the scenario in BUG-0001). The Sarvam STT metrics logging at line 894-896 captures processing_latency — monitoring this in production would help determine if the 1000ms timeout is appropriate.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


// ---------------------------------------------------------------------------
// Model-specific option types
Expand Down Expand Up @@ -409,6 +410,8 @@ interface SarvamWSTranscriptData {
transcript?: string;
language_code?: string | null;
language_probability?: number | null;
speech_start?: number | null;
speech_end?: number | null;
timestamps?: Record<string, unknown> | null;
diarized_transcript?: Record<string, unknown> | null;
metrics?: {
Expand All @@ -419,6 +422,7 @@ interface SarvamWSTranscriptData {

/** type: "events" */
interface SarvamWSEventData {
request_id?: string;
event_type?: string;
timestamp?: string;
signal_type?: 'START_SPEECH' | 'END_SPEECH';
Expand Down Expand Up @@ -554,6 +558,11 @@ export class SpeechStream extends stt.SpeechStream {
#speaking = false;
#resetWS = new Future();
#requestId = '';
#pendingFinalData: SarvamWSTranscriptData | undefined;
#pendingEos = false;
#eosFallbackTimer: ReturnType<typeof setTimeout> | undefined;
#finalReceivedForUtterance = false;
#eosEmittedForUtterance = false;
label = 'sarvam.SpeechStream';

constructor(sttInstance: STT, opts: ResolvedSTTOptions, connOptions?: APIConnectOptions) {
Expand Down Expand Up @@ -583,6 +592,83 @@ export class SpeechStream extends stt.SpeechStream {
this.#resetWS.resolve();
}

#positiveTime(value: unknown): number | undefined {
if (typeof value !== 'number' || !Number.isFinite(value) || value <= 0) {
return undefined;
}
return value + this.startTimeOffset;
}

#resetUtteranceState() {
this.#cancelEosFallback();
this.#pendingFinalData = undefined;
this.#pendingEos = false;
this.#finalReceivedForUtterance = false;
this.#eosEmittedForUtterance = false;
}

#cancelEosFallback() {
if (this.#eosFallbackTimer != null) {
clearTimeout(this.#eosFallbackTimer);
this.#eosFallbackTimer = undefined;
}
}

#emitEndOfSpeech(putMessage: (event: stt.SpeechEvent) => void) {
if (this.#eosEmittedForUtterance) {
return;
}

this.#cancelEosFallback();
putMessage({ type: stt.SpeechEventType.END_OF_SPEECH, requestId: this.#requestId });
this.#eosEmittedForUtterance = true;
this.#pendingEos = false;
}

#sendFinalTranscript(
transcriptData: SarvamWSTranscriptData,
putMessage: (event: stt.SpeechEvent) => void,
): boolean {
const transcript = transcriptData.transcript ?? '';
if (!transcript) {
return false;
}

const language = normalizeLanguage(
transcriptData.language_code ?? this.#opts.languageCode ?? 'unknown',
);
const requestId = transcriptData.request_id ?? this.#requestId;
const confidence = extractConfidence(transcriptData, this.#logger);
this.#requestId = requestId;

putMessage({
type: stt.SpeechEventType.FINAL_TRANSCRIPT,
requestId,
alternatives: [
{
text: transcript,
language,
startTime: this.#positiveTime(transcriptData.speech_start) ?? 0,
endTime: this.#positiveTime(transcriptData.speech_end) ?? 0,
confidence,
},
],
});
return true;
}

#tryCommitUtterance(putMessage: (event: stt.SpeechEvent) => void) {
if (this.#pendingFinalData == null || this.#eosEmittedForUtterance) {
return;
}

const committedData = this.#pendingFinalData;
if (this.#sendFinalTranscript(committedData, putMessage)) {
this.#emitEndOfSpeech(putMessage);
this.#pendingFinalData = undefined;
}
}

protected async run() {
const maxRetry = 32;
let retries = 0;
Expand Down Expand Up @@ -642,6 +728,7 @@ export class SpeechStream extends stt.SpeechStream {
async #runWS(ws: WebSocket) {
this.#resetWS = new Future();
this.#speaking = false;
this.#resetUtteranceState();
let closing = false;
// Session-scoped controller: aborted in finally to cancel sendTask on WS reset
const sessionController = new AbortController();
Expand Down Expand Up @@ -765,28 +852,42 @@ export class SpeechStream extends stt.SpeechStream {

if (msgType === 'events') {
const eventData = (json['data'] as SarvamWSEventData | undefined) ?? {};
if (eventData.request_id) {
this.#requestId = eventData.request_id;
}
const signalType = eventData.signal_type;

if (signalType === 'START_SPEECH') {
if (!this.#speaking) {
this.#resetUtteranceState();
this.#speaking = true;
putMessage({ type: stt.SpeechEventType.START_OF_SPEECH });
}
} else if (signalType === 'END_SPEECH') {
if (this.#speaking) {
this.#speaking = false;
putMessage({ type: stt.SpeechEventType.END_OF_SPEECH });
this.#pendingEos = true;
this.#tryCommitUtterance(putMessage);
if (this.#pendingEos && this.#pendingFinalData == null) {
if (this.#finalReceivedForUtterance) {
this.#emitEndOfSpeech(putMessage);
} else if (this.#eosFallbackTimer == null) {
this.#eosFallbackTimer = setTimeout(() => {
if (this.#pendingEos && !this.#eosEmittedForUtterance) {
this.#emitEndOfSpeech(putMessage);
}
}, EOS_FALLBACK_TIMEOUT);
}
}
}
}
} else if (msgType === 'data') {
const td = (json['data'] as SarvamWSTranscriptData | undefined) ?? {};
const transcript = td.transcript ?? '';
const language = normalizeLanguage(
td.language_code ?? this.#opts.languageCode ?? 'unknown',
);
const requestId = td.request_id ?? '';
const confidence = extractConfidence(td, this.#logger);
this.#requestId = requestId;
if (requestId) {
this.#requestId = requestId;
}

// Log metrics when available
if (td.metrics) {
Expand All @@ -796,24 +897,18 @@ export class SpeechStream extends stt.SpeechStream {
}

if (transcript) {
if (!this.#speaking) {
if (!this.#speaking && !this.#pendingEos && !this.#eosEmittedForUtterance) {
this.#speaking = true;
putMessage({ type: stt.SpeechEventType.START_OF_SPEECH });
}

putMessage({
type: stt.SpeechEventType.FINAL_TRANSCRIPT,
requestId,
alternatives: [
{
text: transcript,
language,
startTime: 0,
endTime: td.metrics?.audio_duration ?? 0,
confidence,
},
],
});
if (this.#pendingEos) {
this.#pendingFinalData = td;
this.#finalReceivedForUtterance = true;
this.#tryCommitUtterance(putMessage);
} else if (this.#sendFinalTranscript(td, putMessage)) {
this.#finalReceivedForUtterance = true;
}
Comment on lines +909 to +911

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Missing #eosEmittedForUtterance guard allows FINAL_TRANSCRIPT after END_OF_SPEECH

When the EOS fallback timer fires (because the server sent END_SPEECH but no transcript arrived within 1000ms), #pendingEos is set to false and #eosEmittedForUtterance is set to true. If a late transcript data message subsequently arrives, the code at line 909 takes the else if branch (since #pendingEos is false) and calls #sendFinalTranscript without checking #eosEmittedForUtterance. This emits a FINAL_TRANSCRIPT event after END_OF_SPEECH was already emitted, violating the expected event ordering (START_OF_SPEECH → FINAL_TRANSCRIPT → END_OF_SPEECH). Downstream in audio_recognition.ts:837-897, this late FINAL_TRANSCRIPT updates audioTranscript, triggers preemptive generation, and runs EOU detection again — all after the user turn was already committed at audio_recognition.ts:1047.

Note that #tryCommitUtterance at plugins/sarvam/src/stt.ts:661 correctly guards against this with this.#eosEmittedForUtterance, but the direct #sendFinalTranscript call path at line 909 does not.

Suggested change
} else if (this.#sendFinalTranscript(td, putMessage)) {
this.#finalReceivedForUtterance = true;
}
} else if (!this.#eosEmittedForUtterance && this.#sendFinalTranscript(td, putMessage)) {
this.#finalReceivedForUtterance = true;
}
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

}
} else if (msgType === 'error') {
// Server format: { type: "error", data: { message: "...", code: "..." } }
Expand Down Expand Up @@ -852,6 +947,7 @@ export class SpeechStream extends stt.SpeechStream {
} finally {
closing = true;
sessionController.abort();
this.#cancelEosFallback();
// Do NOT call listenTask.cancel() — it would abort this.abortController
// (passed to Task.from) and permanently break the stream. Instead, ws.close()
// triggers the ws.once('close') handler inside listenMessage, letting listenTask
Expand Down