From 525539d6952712183a962f89bee7a6efe776b04e Mon Sep 17 00:00:00 2001 From: "rosetta-livekit-bot[bot]" <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com> Date: Fri, 12 Jun 2026 03:30:49 +0000 Subject: [PATCH] fix: port simulation participant sync race --- agents/src/constants.ts | 3 +- agents/src/job.ts | 100 ++++++++++++++++++++++++++++++ agents/src/voice/agent_session.ts | 17 ++++- 3 files changed, 118 insertions(+), 2 deletions(-) diff --git a/agents/src/constants.ts b/agents/src/constants.ts index 3da61af46..da8acb173 100644 --- a/agents/src/constants.ts +++ b/agents/src/constants.ts @@ -11,7 +11,8 @@ export const TOPIC_CHAT = 'lk.chat'; export const ATTRIBUTE_AGENT_STATE = 'lk.agent.state'; export const ATTRIBUTE_AGENT_NAME = 'lk.agent.name'; -// TODO(eval): export const ATTRIBUTE_SIMULATOR = 'lk.simulator'; +export const ATTRIBUTE_SIMULATOR = 'lk.simulator'; +export const ATTRIBUTE_SIMULATOR_DISPATCH = 'lk.simulator.dispatch'; export const TOPIC_CLIENT_EVENTS = 'lk.agent.events'; export const RPC_GET_SESSION_STATE = 'lk.agent.get_session_state'; diff --git a/agents/src/job.ts b/agents/src/job.ts index efbd02fb6..d914b4f8c 100644 --- a/agents/src/job.ts +++ b/agents/src/job.ts @@ -17,6 +17,7 @@ import { AsyncLocalStorage } from 'node:async_hooks'; import * as os from 'node:os'; import * as path from 'node:path'; import type { Logger } from 'pino'; +import { ATTRIBUTE_SIMULATOR, ATTRIBUTE_SIMULATOR_DISPATCH } from './constants.js'; import type { InferenceExecutor } from './ipc/inference_executor.js'; import { log } from './log.js'; import { flushOtelLogs, setupCloudTracer, uploadSessionReport } from './telemetry/index.js'; @@ -90,6 +91,49 @@ export type RunningJobInfo = { apiSecret?: string; }; +export enum SimulationMode { + SIMULATION_MODE_UNSPECIFIED = 0, + SIMULATION_MODE_TEXT = 1, + SIMULATION_MODE_AUDIO = 2, +} + +export type SimulationDispatch = { + simulationRunId?: string; + simulation_run_id?: string; + mode?: string | number; + scenario?: unknown; +}; + +export class SimulationContext> { + #dispatch: SimulationDispatch; + #jobContext: JobContext; + + constructor(dispatch: SimulationDispatch, jobContext: JobContext) { + this.#dispatch = dispatch; + this.#jobContext = jobContext; + } + + get scenario(): unknown { + return this.#dispatch.scenario; + } + + get simulationMode(): SimulationMode { + const mode = this.#dispatch.mode; + if (mode === SimulationMode.SIMULATION_MODE_AUDIO || mode === 'SIMULATION_MODE_AUDIO') { + return SimulationMode.SIMULATION_MODE_AUDIO; + } + if (mode === SimulationMode.SIMULATION_MODE_TEXT || mode === 'SIMULATION_MODE_TEXT') { + return SimulationMode.SIMULATION_MODE_TEXT; + } + // Simulations predating the mode field were text-only. + return SimulationMode.SIMULATION_MODE_TEXT; + } + + get jobContext(): JobContext { + return this.#jobContext; + } +} + /** Attempted to add a function callback, but the function already exists. */ export class FunctionExistsError extends Error { constructor(msg?: string) { @@ -119,6 +163,8 @@ export class JobContext> { } = {}; #logger: Logger; #inferenceExecutor: InferenceExecutor; + #simulationResolved = false; + #simulationContext?: SimulationContext; /** @internal */ _primaryAgentSession?: AgentSession; @@ -172,6 +218,47 @@ export class JobContext> { return this.#info; } + simulationContext(): SimulationContext | undefined { + if (this.#simulationResolved) { + return this.#simulationContext; + } + + let metadata = ''; + for (const participant of this.#room.remoteParticipants.values()) { + if (!Object.hasOwn(participant.attributes, ATTRIBUTE_SIMULATOR)) { + continue; + } + + metadata = participant.attributes[ATTRIBUTE_SIMULATOR_DISPATCH] || ''; + if (metadata) { + break; + } + } + + if (!metadata) { + // The simulator joins before the agent, so a miss is only final once the room is + // connected and a remote participant is visible. + this.#simulationResolved = this.#room.isConnected && this.#room.remoteParticipants.size > 0; + return undefined; + } + + this.#simulationResolved = true; + + let dispatch: SimulationDispatch; + try { + dispatch = JSON.parse(metadata) as SimulationDispatch; + } catch { + return undefined; + } + + if (!(dispatch.simulationRunId || dispatch.simulation_run_id)) { + return undefined; + } + + this.#simulationContext = new SimulationContext(dispatch, this); + return this.#simulationContext; + } + /** @returns The agent's participant if connected to the room, otherwise `undefined` */ get agent(): LocalParticipant | undefined { return this.#room.localParticipant; @@ -259,6 +346,10 @@ export class JobContext> { await this.#room.connect(this.#info.url, this.#info.token, opts); this.#onConnect(); + // Always registered: the callback ignores participants without the simulator attribute, and + // gating on simulationContext() here would race the participant-list sync. + this.#room.on(RoomEvent.ParticipantDisconnected, this.onSimulatorDisconnected); + this.#room.remoteParticipants.forEach(this.onParticipantConnected); if ([AutoSubscribe.AUDIO_ONLY, AutoSubscribe.VIDEO_ONLY].includes(autoSubscribe)) { @@ -411,6 +502,15 @@ export class JobContext> { } } + private onSimulatorDisconnected = (p: RemoteParticipant) => { + if (!Object.hasOwn(p.attributes, ATTRIBUTE_SIMULATOR)) { + return; + } + + this.#logger.debug('simulator disconnected, shutting down the job'); + this.shutdown('simulation completed'); + }; + /** * Adds a promise to be awaited whenever a new participant joins the room. * diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index d308df854..8e418a926 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -21,7 +21,7 @@ import { type TTSModelString, } from '../inference/index.js'; import type { OverlappingSpeechEvent } from '../inference/interruption/types.js'; -import { getJobContext } from '../job.js'; +import { SimulationMode, getJobContext } from '../job.js'; import type { FunctionCall, FunctionCallOutput } from '../llm/chat_context.js'; import { AgentHandoffItem, @@ -533,6 +533,12 @@ export class AgentSession< return this.sessionOptions.useTtsAlignedTranscript; } + /** @internal */ + get _textOnly(): boolean { + const ctx = getJobContext(false); + return ctx?.simulationContext()?.simulationMode === SimulationMode.SIMULATION_MODE_TEXT; + } + set userData(value: UserData) { this._userData = value; } @@ -557,6 +563,12 @@ export class AgentSession< const tasks: Promise[] = []; + if (this._textOnly) { + this.logger.info('text simulation: disabling STT/TTS/VAD and audio I/O'); + inputOptions = { ...inputOptions, audioEnabled: false }; + outputOptions = { ...outputOptions, audioEnabled: false }; + } + if (room && !this._roomIO) { // Check for existing input/output configuration and warn if needed if (this.input.audio && inputOptions?.audioEnabled !== false) { @@ -684,6 +696,9 @@ export class AgentSession< } this._recordingOptions = resolveRecordingOptions(record); + if (this._textOnly) { + this._recordingOptions.audio = false; + } // Only one AgentSession per job can be the primary (and therefore record). // Designate the primary before initRecording so a demoted secondary session