From 0fbbbafc61a8602a8c8fa8da501ddff0cfd6f53e Mon Sep 17 00:00:00 2001 From: "rosetta-livekit-bot[bot]" <282703043+rosetta-livekit-bot[bot]@users.noreply.github.com> Date: Wed, 10 Jun 2026 23:54:04 +0000 Subject: [PATCH] Port simulation text mode handling --- agents/src/cli.ts | 7 +++ agents/src/job.ts | 72 ++++++++++++++++++++++++++++++ agents/src/voice/agent_activity.ts | 9 ++++ agents/src/voice/agent_session.ts | 14 +++++- agents/src/worker.ts | 11 ++++- 5 files changed, 111 insertions(+), 2 deletions(-) diff --git a/agents/src/cli.ts b/agents/src/cli.ts index c5f941dbd..509325a3a 100644 --- a/agents/src/cli.ts +++ b/agents/src/cli.ts @@ -135,6 +135,12 @@ export const runApp = (opts: ServerOptions) => { .command('start') .description('Start the worker in production mode') .addOption(logLevelOption('info')) + .addOption( + new Option( + '--simulation', + 'Run under an agent simulation with worker load limit disabled', + ).hideHelp(), + ) .action((...[, command]) => { const globalOptions = program.optsWithGlobals(); const commandOptions = command.opts(); @@ -143,6 +149,7 @@ export const runApp = (opts: ServerOptions) => { opts.apiSecret = globalOptions.apiSecret || opts.apiSecret; opts.logLevel = commandOptions.logLevel; opts.workerToken = globalOptions.workerToken || opts.workerToken; + opts.simulation = commandOptions.simulation; runServer({ opts, production: true, diff --git a/agents/src/job.ts b/agents/src/job.ts index efbd02fb6..15654b36a 100644 --- a/agents/src/job.ts +++ b/agents/src/job.ts @@ -90,6 +90,49 @@ export type RunningJobInfo = { apiSecret?: string; }; +export enum SimulationMode { + SIMULATION_MODE_UNSPECIFIED = 0, + SIMULATION_MODE_TEXT = 1, + SIMULATION_MODE_AUDIO = 2, +} + +export type SimulationDispatch = { + simulationRunId?: string; + simulation_run_id?: string; + mode?: string | number; + scenario?: unknown; +}; + +export class SimulationContext> { + #dispatch: SimulationDispatch; + #jobContext: JobContext; + + constructor(dispatch: SimulationDispatch, jobContext: JobContext) { + this.#dispatch = dispatch; + this.#jobContext = jobContext; + } + + get scenario(): unknown { + return this.#dispatch.scenario; + } + + get simulationMode(): SimulationMode { + const mode = this.#dispatch.mode; + if (mode === SimulationMode.SIMULATION_MODE_AUDIO || mode === 'SIMULATION_MODE_AUDIO') { + return SimulationMode.SIMULATION_MODE_AUDIO; + } + if (mode === SimulationMode.SIMULATION_MODE_TEXT || mode === 'SIMULATION_MODE_TEXT') { + return SimulationMode.SIMULATION_MODE_TEXT; + } + // Simulations predating the mode field were text-only. + return SimulationMode.SIMULATION_MODE_TEXT; + } + + get jobContext(): JobContext { + return this.#jobContext; + } +} + /** Attempted to add a function callback, but the function already exists. */ export class FunctionExistsError extends Error { constructor(msg?: string) { @@ -119,6 +162,8 @@ export class JobContext> { } = {}; #logger: Logger; #inferenceExecutor: InferenceExecutor; + #simulationResolved = false; + #simulationContext?: SimulationContext; /** @internal */ _primaryAgentSession?: AgentSession; @@ -172,6 +217,33 @@ export class JobContext> { return this.#info; } + simulationContext(): SimulationContext | undefined { + if (this.#simulationResolved) { + return this.#simulationContext; + } + + this.#simulationResolved = true; + const roomMetadata = (this.#room as { metadata?: string }).metadata; + const metadata = this.#info.job.metadata || roomMetadata; + if (!metadata) { + return undefined; + } + + let dispatch: SimulationDispatch; + try { + dispatch = JSON.parse(metadata) as SimulationDispatch; + } catch { + return undefined; + } + + if (!dispatch.simulationRunId && !dispatch.simulation_run_id) { + return undefined; + } + + this.#simulationContext = new SimulationContext(dispatch, this); + return this.#simulationContext; + } + /** @returns The agent's participant if connected to the room, otherwise `undefined` */ get agent(): LocalParticipant | undefined { return this.#room.localParticipant; diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 66c55a747..470515cb3 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -656,10 +656,16 @@ export class AgentActivity implements RecognitionHooks { } get vad(): VAD | undefined { + if (this.agentSession._textOnly) { + return undefined; + } return this.agent.vad || this.agentSession.vad; } get stt(): STT | undefined { + if (this.agentSession._textOnly) { + return undefined; + } return this.agent.stt || this.agentSession.stt; } @@ -679,6 +685,9 @@ export class AgentActivity implements RecognitionHooks { } get tts(): TTS | undefined { + if (this.agentSession._textOnly) { + return undefined; + } return this.agent.tts || this.agentSession.tts; } diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index d308df854..51155b77d 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -21,7 +21,7 @@ import { type TTSModelString, } from '../inference/index.js'; import type { OverlappingSpeechEvent } from '../inference/interruption/types.js'; -import { getJobContext } from '../job.js'; +import { SimulationMode, getJobContext } from '../job.js'; import type { FunctionCall, FunctionCallOutput } from '../llm/chat_context.js'; import { AgentHandoffItem, @@ -533,6 +533,12 @@ export class AgentSession< return this.sessionOptions.useTtsAlignedTranscript; } + /** @internal */ + get _textOnly(): boolean { + const ctx = getJobContext(false); + return ctx?.simulationContext()?.simulationMode === SimulationMode.SIMULATION_MODE_TEXT; + } + set userData(value: UserData) { this._userData = value; } @@ -557,6 +563,12 @@ export class AgentSession< const tasks: Promise[] = []; + if (this._textOnly) { + this.logger.info('text simulation: disabling STT/TTS/VAD and audio I/O'); + inputOptions = { ...inputOptions, audioEnabled: false }; + outputOptions = { ...outputOptions, audioEnabled: false }; + } + if (room && !this._roomIO) { // Check for existing input/output configuration and warn if needed if (this.input.audio && inputOptions?.audioEnabled !== false) { diff --git a/agents/src/worker.ts b/agents/src/worker.ts index 3ecc45dc3..f3d34f727 100644 --- a/agents/src/worker.ts +++ b/agents/src/worker.ts @@ -146,6 +146,8 @@ export class ServerOptions { production: boolean; jobMemoryWarnMB: number; jobMemoryLimitMB: number; + /** @internal */ + simulation: boolean; /** @param options - Worker options */ constructor({ @@ -171,6 +173,7 @@ export class ServerOptions { production = false, jobMemoryWarnMB = 500, jobMemoryLimitMB = 0, + simulation = false, }: { /** * Path to a file that has {@link Agent} as a default export, dynamically imported later for @@ -212,6 +215,8 @@ export class ServerOptions { production?: boolean; jobMemoryWarnMB?: number; jobMemoryLimitMB?: number; + /** @internal */ + simulation?: boolean; }) { this.agent = agent; if (!this.agent) { @@ -248,6 +253,7 @@ export class ServerOptions { this.production = production; this.jobMemoryWarnMB = jobMemoryWarnMB; this.jobMemoryLimitMB = jobMemoryLimitMB; + this.simulation = simulation; } } @@ -390,6 +396,9 @@ export class AgentServer { } this.#logger.info('starting worker'); + if (this.#opts.simulation) { + this.#logger.info('simulation mode enabled: worker load limit disabled'); + } this.#closed = false; this.#procPool.start(); @@ -705,7 +714,7 @@ export class AgentServer { this.#opts .loadFunc(this) .then((currentLoad: number) => { - const isFull = currentLoad >= this.#opts.loadThreshold; + const isFull = !this.#opts.simulation && currentLoad >= this.#opts.loadThreshold; const currentlyAvailable = !isFull; currentStatus = currentlyAvailable ? WorkerStatus.WS_AVAILABLE : WorkerStatus.WS_FULL;