Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion agents/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ export const TOPIC_CHAT = 'lk.chat';
export const ATTRIBUTE_AGENT_STATE = 'lk.agent.state';
export const ATTRIBUTE_AGENT_NAME = 'lk.agent.name';

// TODO(eval): export const ATTRIBUTE_SIMULATOR = 'lk.simulator';
export const ATTRIBUTE_SIMULATOR = 'lk.simulator';
export const ATTRIBUTE_SIMULATOR_DISPATCH = 'lk.simulator.dispatch';

export const TOPIC_CLIENT_EVENTS = 'lk.agent.events';
export const RPC_GET_SESSION_STATE = 'lk.agent.get_session_state';
Expand Down
100 changes: 100 additions & 0 deletions agents/src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { AsyncLocalStorage } from 'node:async_hooks';
import * as os from 'node:os';
import * as path from 'node:path';
import type { Logger } from 'pino';
import { ATTRIBUTE_SIMULATOR, ATTRIBUTE_SIMULATOR_DISPATCH } from './constants.js';
import type { InferenceExecutor } from './ipc/inference_executor.js';
import { log } from './log.js';
import { flushOtelLogs, setupCloudTracer, uploadSessionReport } from './telemetry/index.js';
Expand Down Expand Up @@ -90,6 +91,49 @@ export type RunningJobInfo = {
apiSecret?: string;
};

export enum SimulationMode {
SIMULATION_MODE_UNSPECIFIED = 0,
SIMULATION_MODE_TEXT = 1,
SIMULATION_MODE_AUDIO = 2,
}

export type SimulationDispatch = {
simulationRunId?: string;
simulation_run_id?: string;
mode?: string | number;
scenario?: unknown;
};

export class SimulationContext<ProcessUserData = Record<string, unknown>> {
#dispatch: SimulationDispatch;
#jobContext: JobContext<ProcessUserData>;

constructor(dispatch: SimulationDispatch, jobContext: JobContext<ProcessUserData>) {
this.#dispatch = dispatch;
this.#jobContext = jobContext;
}

get scenario(): unknown {
return this.#dispatch.scenario;
}

get simulationMode(): SimulationMode {
const mode = this.#dispatch.mode;
if (mode === SimulationMode.SIMULATION_MODE_AUDIO || mode === 'SIMULATION_MODE_AUDIO') {
return SimulationMode.SIMULATION_MODE_AUDIO;
}
if (mode === SimulationMode.SIMULATION_MODE_TEXT || mode === 'SIMULATION_MODE_TEXT') {
return SimulationMode.SIMULATION_MODE_TEXT;
}
// Simulations predating the mode field were text-only.
return SimulationMode.SIMULATION_MODE_TEXT;
}
Comment on lines +120 to +130

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 SimulationMode defaults to TEXT for unspecified modes

At job.ts:128-129, when the mode field doesn't match SIMULATION_MODE_AUDIO or SIMULATION_MODE_TEXT (including when mode is undefined, 0/SIMULATION_MODE_UNSPECIFIED, or any unrecognized value), the getter defaults to SIMULATION_MODE_TEXT. This means SIMULATION_MODE_UNSPECIFIED (value 0) maps to TEXT mode rather than being treated as truly unspecified. The comment says "Simulations predating the mode field were text-only" which explains backward compatibility, but callers should be aware that SIMULATION_MODE_UNSPECIFIED triggers text-only behavior (audio I/O disabled) rather than being a no-op.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


get jobContext(): JobContext<ProcessUserData> {
return this.#jobContext;
}
}

/** Attempted to add a function callback, but the function already exists. */
export class FunctionExistsError extends Error {
constructor(msg?: string) {
Expand Down Expand Up @@ -119,6 +163,8 @@ export class JobContext<ProcessUserData = Record<string, unknown>> {
} = {};
#logger: Logger;
#inferenceExecutor: InferenceExecutor;
#simulationResolved = false;
#simulationContext?: SimulationContext<ProcessUserData>;

/** @internal */
_primaryAgentSession?: AgentSession;
Expand Down Expand Up @@ -172,6 +218,47 @@ export class JobContext<ProcessUserData = Record<string, unknown>> {
return this.#info;
}

simulationContext(): SimulationContext<ProcessUserData> | undefined {
if (this.#simulationResolved) {
return this.#simulationContext;
}

let metadata = '';
for (const participant of this.#room.remoteParticipants.values()) {
if (!Object.hasOwn(participant.attributes, ATTRIBUTE_SIMULATOR)) {
continue;
}

metadata = participant.attributes[ATTRIBUTE_SIMULATOR_DISPATCH] || '';
if (metadata) {
break;
}
}

if (!metadata) {
// The simulator joins before the agent, so a miss is only final once the room is
// connected and a remote participant is visible.
this.#simulationResolved = this.#room.isConnected && this.#room.remoteParticipants.size > 0;
return undefined;
}

this.#simulationResolved = true;

let dispatch: SimulationDispatch;
try {
dispatch = JSON.parse(metadata) as SimulationDispatch;
} catch {
return undefined;
}

if (!(dispatch.simulationRunId || dispatch.simulation_run_id)) {
return undefined;
}

this.#simulationContext = new SimulationContext(dispatch, this);
return this.#simulationContext;
}

/** @returns The agent's participant if connected to the room, otherwise `undefined` */
get agent(): LocalParticipant | undefined {
return this.#room.localParticipant;
Expand Down Expand Up @@ -259,6 +346,10 @@ export class JobContext<ProcessUserData = Record<string, unknown>> {
await this.#room.connect(this.#info.url, this.#info.token, opts);
this.#onConnect();

// Always registered: the callback ignores participants without the simulator attribute, and
// gating on simulationContext() here would race the participant-list sync.
this.#room.on(RoomEvent.ParticipantDisconnected, this.onSimulatorDisconnected);

this.#room.remoteParticipants.forEach(this.onParticipantConnected);

if ([AutoSubscribe.AUDIO_ONLY, AutoSubscribe.VIDEO_ONLY].includes(autoSubscribe)) {
Expand Down Expand Up @@ -411,6 +502,15 @@ export class JobContext<ProcessUserData = Record<string, unknown>> {
}
}

private onSimulatorDisconnected = (p: RemoteParticipant) => {
if (!Object.hasOwn(p.attributes, ATTRIBUTE_SIMULATOR)) {
return;
}

this.#logger.debug('simulator disconnected, shutting down the job');
this.shutdown('simulation completed');
};

/**
* Adds a promise to be awaited whenever a new participant joins the room.
*
Expand Down
17 changes: 16 additions & 1 deletion agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
type TTSModelString,
} from '../inference/index.js';
import type { OverlappingSpeechEvent } from '../inference/interruption/types.js';
import { getJobContext } from '../job.js';
import { SimulationMode, getJobContext } from '../job.js';
import type { FunctionCall, FunctionCallOutput } from '../llm/chat_context.js';
import {
AgentHandoffItem,
Expand Down Expand Up @@ -533,6 +533,12 @@ export class AgentSession<
return this.sessionOptions.useTtsAlignedTranscript;
}

/** @internal */
get _textOnly(): boolean {
const ctx = getJobContext(false);
return ctx?.simulationContext()?.simulationMode === SimulationMode.SIMULATION_MODE_TEXT;
}

set userData(value: UserData) {
this._userData = value;
}
Expand All @@ -557,6 +563,12 @@ export class AgentSession<

const tasks: Promise<void>[] = [];

if (this._textOnly) {
this.logger.info('text simulation: disabling STT/TTS/VAD and audio I/O');
inputOptions = { ...inputOptions, audioEnabled: false };
outputOptions = { ...outputOptions, audioEnabled: false };
}
Comment on lines +566 to +570

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 _textOnly simulation check always returns false in the auto-connect flow because room isn't connected yet

The _textOnly getter (agent_session.ts:537-540) calls ctx.simulationContext(), which scans room.remoteParticipants to find a simulator participant. Both call sites — start() at line 699 and _startImpl() at line 566 — execute before the room is connected. In the common auto-connect pattern (used in most examples, e.g. session.start({ agent, room: ctx.room }) then ctx.connect()), the room connects later via tasks.push(ctx.connect()) at agent_session.ts:616, but by that point the _textOnly checks have already passed with false. This means text simulation mode never activates: audio I/O is not disabled (lines 568–569) and audio recording is not disabled (line 700).

Trace of the auto-connect flow
  1. start() calls this._textOnly at line 699 → room not connected → simulationContext() finds no participants → returns undefined_textOnly = false
  2. start() calls _startImpl() which checks this._textOnly at line 566 → same result
  3. _startImpl() creates RoomIO with unmodified inputOptions/outputOptions at line 592–597
  4. Auto-connect pushes ctx.connect() at line 616, but the audio IO options were already applied
Prompt for agents
The _textOnly getter relies on simulationContext() which reads room.remoteParticipants, but the two call sites in start() (line 699) and _startImpl() (line 566) both execute before the room is connected in the common auto-connect flow. The auto-connect happens via tasks.push(ctx.connect()) at _startImpl:616, which runs after the _textOnly checks and RoomIO setup.

Possible approaches:
1. Move the _textOnly check in _startImpl to after `await ThrowsPromise.allSettled(tasks)` (line 636), then retroactively reconfigure RoomIO's audio settings. This is complex since RoomIO is already constructed.
2. In start(), if the room is not connected, defer the _textOnly/recording check until after _startImpl completes (which connects the room). However, this requires restructuring the start() flow.
3. Have simulationContext() accept a Room parameter or pre-resolve the simulation context during ctx.connect() so it's available immediately. The connect() method at job.ts:351 already registers the onSimulatorDisconnected handler, so it could also resolve the simulation context there and cache it.
4. The simplest fix: in _startImpl, move the _textOnly check to after the auto-connect task completes. Split the tasks into a connect phase and a post-connect phase, checking _textOnly between them.

Files: agents/src/voice/agent_session.ts (start method around line 699, _startImpl around line 566), agents/src/job.ts (simulationContext around line 221, connect around line 330).
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


if (room && !this._roomIO) {
// Check for existing input/output configuration and warn if needed
if (this.input.audio && inputOptions?.audioEnabled !== false) {
Expand Down Expand Up @@ -684,6 +696,9 @@ export class AgentSession<
}

this._recordingOptions = resolveRecordingOptions(record);
if (this._textOnly) {
this._recordingOptions.audio = false;
}

// Only one AgentSession per job can be the primary (and therefore record).
// Designate the primary before initRecording so a demoted secondary session
Expand Down