diff --git a/.changeset/amd-prediction-event.md b/.changeset/amd-prediction-event.md new file mode 100644 index 000000000..0e10692dd --- /dev/null +++ b/.changeset/amd-prediction-event.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +Port AMD result → `AMDPredictionEvent` rename and event emission from Python (livekit/agents#5621). The `AMD` detector now extends an `EventEmitter` and emits `amd_prediction` with an `AMDPredictionEvent` payload (`type: 'amd_prediction'`, plus the existing `category` / `reason` / `transcript` / `rawResponse` / `isMachine` fields and a new optional `speechDurationMs`). `AMDResult` is kept as a deprecated type alias for `AMDPredictionEvent` for backward compatibility. The remote-session wire serialization for AMD predictions is intentionally deferred until `@livekit/protocol` ships the corresponding `AgentSessionEvent.AmdPrediction` / `AmdCategory` message types; a TODO marker has been left in `voice/remote_session.ts` where it will be wired up. diff --git a/agents/src/voice/amd.test.ts b/agents/src/voice/amd.test.ts index 98b767df3..e04ff7cff 100644 --- a/agents/src/voice/amd.test.ts +++ b/agents/src/voice/amd.test.ts @@ -9,7 +9,7 @@ import { LLM, type LLMStream } from '../llm/llm.js'; import type { ToolChoice, ToolContext } from '../llm/tool_context.js'; import type { APIConnectOptions } from '../types.js'; import type { AgentSession } from './agent_session.js'; -import { AMD, AMDCategory } from './amd.js'; +import { AMD, AMDCategory, type AMDPredictionEvent } from './amd.js'; import { AgentSessionEventTypes } from './events.js'; class StaticLLM extends LLM { @@ -70,6 +70,9 @@ describe('AMD', () => { llm.on('error', () => {}); const amd = new AMD(asAgentSession(session), { llm, detectionTimeoutMs: 50 }); + const events: AMDPredictionEvent[] = []; + amd.on('amd_prediction', (ev) => events.push(ev)); + const promise = amd.execute(); session.emit(AgentSessionEventTypes.UserInputTranscribed, { type: 'user_input_transcribed', @@ -81,12 +84,18 @@ describe('AMD', () => { }); await expect(promise).resolves.toMatchObject({ + type: 'amd_prediction', category: AMDCategory.MACHINE_VM, isMachine: true, }); expect(session.pauseReplyAuthorization).toHaveBeenCalledTimes(1); expect(session.resumeReplyAuthorization).toHaveBeenCalled(); expect(session.interrupt).toHaveBeenCalledWith({ force: true }); + expect(events).toHaveLength(1); + expect(events[0]).toMatchObject({ + type: 'amd_prediction', + category: AMDCategory.MACHINE_VM, + }); }); it('should classify unavailable mailbox as machine', async () => { diff --git a/agents/src/voice/amd.ts b/agents/src/voice/amd.ts index 9bab6efe6..46a1faa5d 100644 --- a/agents/src/voice/amd.ts +++ b/agents/src/voice/amd.ts @@ -1,7 +1,9 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import type { TypedEventEmitter } from '@livekit/typed-emitter'; import type { Span } from '@opentelemetry/api'; +import EventEmitter from 'node:events'; import { ChatContext } from '../llm/chat_context.js'; import { LLM } from '../llm/llm.js'; import { traceTypes, tracer } from '../telemetry/index.js'; @@ -21,14 +23,26 @@ export enum AMDCategory { UNCERTAIN = 'uncertain', } -export interface AMDResult { +export interface AMDPredictionEvent { + type: 'amd_prediction'; category: AMDCategory; transcript: string; reason: string; rawResponse: string; isMachine: boolean; + /** Duration of detected user speech in milliseconds, when known. */ + speechDurationMs?: number; } +/** + * @deprecated Use {@link AMDPredictionEvent}. + */ +export type AMDResult = AMDPredictionEvent; + +export type AMDCallbacks = { + amd_prediction: (ev: AMDPredictionEvent) => void; +}; + export interface AMDOptions { llm?: LLM; interruptOnMachine?: boolean; @@ -80,8 +94,11 @@ Do not include markdown fences or extra text.`; * Mirrors Python's `_AMDClassifier` two-gate architecture: * a result is only emitted when both a **verdict** (from LLM or heuristic) and * a **silence gate** (from VAD or timeout) are satisfied. + * + * Emits `amd_prediction` with an {@link AMDPredictionEvent} payload when a + * prediction settles. Callers can subscribe via `amd.on('amd_prediction', ...)`. */ -export class AMD { +export class AMD extends (EventEmitter as new () => TypedEventEmitter) { private readonly llm: LLM; private readonly interruptOnMachine: boolean; private readonly noSpeechTimeoutMs: number; @@ -92,16 +109,17 @@ export class AMD { private active = false; private settled = false; private transcriptParts: string[] = []; - private verdictResult: AMDResult | undefined; + private verdictResult: AMDPredictionEvent | undefined; private machineSilenceReached = false; private speechStartedAt: number | undefined; + private speechEndedAt: number | undefined; private detectGeneration = 0; private noSpeechTimer: ReturnType | undefined; private detectionTimer: ReturnType | undefined; private silenceTimer: ReturnType | undefined; - private resolveRun: ((value: AMDResult) => void) | undefined; + private resolveRun: ((value: AMDPredictionEvent) => void) | undefined; private rejectRun: ((reason?: unknown) => void) | undefined; private span: Span | undefined; @@ -109,6 +127,7 @@ export class AMD { private readonly session: AgentSession, options: AMDOptions = {}, ) { + super(); const llm = options.llm ?? this.resolveSessionLLM(); if (!llm) { throw new Error( @@ -125,7 +144,7 @@ export class AMD { // ─── public API ────────────────────────────────────────────────────────────── - async execute(): Promise { + async execute(): Promise { return tracer.startActiveSpan( async (span) => { if (this.active) { @@ -146,7 +165,7 @@ export class AMD { } try { - const result = await new Promise((resolve, reject) => { + const result = await new Promise((resolve, reject) => { this.resolveRun = resolve; this.rejectRun = reject; this.subscribe(); @@ -181,6 +200,7 @@ export class AMD { this.verdictResult = undefined; this.machineSilenceReached = false; this.speechStartedAt = undefined; + this.speechEndedAt = undefined; this.detectGeneration = 0; this.resolveRun = undefined; this.rejectRun = undefined; @@ -220,7 +240,7 @@ export class AMD { * Ref: python classifier.py `_set_verdict` — stores the LLM/heuristic verdict. * Emission is deferred until the silence gate also opens. */ - private setVerdict(result: AMDResult): void { + private setVerdict(result: AMDPredictionEvent): void { this.verdictResult = result; this.tryEmitResult(); } @@ -237,7 +257,7 @@ export class AMD { this.finish(this.verdictResult); } - private finish(result: AMDResult): void { + private finish(result: AMDPredictionEvent): void { if (this.settled) { return; } @@ -247,6 +267,7 @@ export class AMD { if (result.isMachine && this.interruptOnMachine) { this.session.interrupt({ force: true }).await.catch(() => {}); } + this.emit('amd_prediction', result); this.resolveRun?.(result); } @@ -260,11 +281,13 @@ export class AMD { private onSilenceTimerFired(category?: AMDCategory, reason?: string): void { if (category && reason && !this.verdictResult) { this.setVerdict({ + type: 'amd_prediction', category, reason, transcript: this.joinTranscript(), rawResponse: '', isMachine: isMachineCategory(category), + speechDurationMs: this.computeSpeechDurationMs(), }); } this.machineSilenceReached = true; @@ -304,6 +327,7 @@ export class AMD { return; } + this.speechEndedAt = ev.createdAt; const speechDurationMs = ev.createdAt - (this.speechStartedAt ?? ev.createdAt); this.clearTimer('silence'); @@ -390,14 +414,22 @@ export class AMD { return this.transcriptParts.join('\n'); } - private setSpanAttributes(result: AMDResult): void { + private computeSpeechDurationMs(): number | undefined { + if (this.speechStartedAt === undefined) { + return undefined; + } + const end = this.speechEndedAt ?? Date.now(); + return Math.max(0, end - this.speechStartedAt); + } + + private setSpanAttributes(result: AMDPredictionEvent): void { this.span?.setAttribute(traceTypes.ATTR_AMD_CATEGORY, result.category); this.span?.setAttribute(traceTypes.ATTR_AMD_REASON, result.reason); this.span?.setAttribute(traceTypes.ATTR_AMD_IS_MACHINE, result.isMachine); this.span?.setAttribute(traceTypes.ATTR_USER_TRANSCRIPT, result.transcript); } - private async detect(transcript: string): Promise { + private async detect(transcript: string): Promise { const chatCtx = new ChatContext(); chatCtx.addMessage({ role: 'system', content: AMD_PROMPT }); chatCtx.addMessage({ @@ -417,14 +449,16 @@ export class AMD { const parsed = this.parseDetection(rawResponse); return { + type: 'amd_prediction', ...parsed, transcript, rawResponse, isMachine: isMachineCategory(parsed.category), + speechDurationMs: this.computeSpeechDurationMs(), }; } - private parseDetection(rawResponse: string): Pick { + private parseDetection(rawResponse: string): Pick { const normalized = rawResponse.trim(); const jsonStart = normalized.indexOf('{'); const jsonEnd = normalized.lastIndexOf('}'); diff --git a/agents/src/voice/remote_session.ts b/agents/src/voice/remote_session.ts index b3a96dc3b..26daaee2f 100644 --- a/agents/src/voice/remote_session.ts +++ b/agents/src/voice/remote_session.ts @@ -655,6 +655,12 @@ export class SessionHost { this.emitEvent({ case: 'overlappingSpeech', value }); }; + // TODO(amd_prediction): mirror Python `_on_amd_prediction` once + // `@livekit/protocol` ships `AgentSessionEvent.AmdPrediction` / `AmdCategory`. + // The AMD detector now emits an `amd_prediction` event (see voice/amd.ts); + // wire this handler up via `amd.on('amd_prediction', ...)` and forward the + // payload through `emitEvent({ case: 'amdPrediction', value: ... })`. + private onMetricsCollected = (event: MetricsCollectedEvent): void => { if (!this.session) return; this.emitEvent(