Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion agents/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ export const TOPIC_CHAT = 'lk.chat';
export const ATTRIBUTE_AGENT_STATE = 'lk.agent.state';
export const ATTRIBUTE_AGENT_NAME = 'lk.agent.name';

// TODO(eval): export const ATTRIBUTE_SIMULATOR = 'lk.simulator';
export const ATTRIBUTE_SIMULATOR = 'lk.simulator';
export const ATTRIBUTE_SIMULATOR_DISPATCH = 'lk.simulator.dispatch';

export const TOPIC_CLIENT_EVENTS = 'lk.agent.events';
export const RPC_GET_SESSION_STATE = 'lk.agent.get_session_state';
Expand Down
88 changes: 88 additions & 0 deletions agents/src/job.ts

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 waitForParticipant does not filter simulator participants

The existing waitForParticipant method at agents/src/job.ts:285-319 filters out ParticipantKind.AGENT but does not filter out simulator participants (those with the lk.simulator attribute). With this PR introducing simulation support, a simulator participant could potentially be returned by waitForParticipant. This is likely fine if simulator participants use ParticipantKind.AGENT (which would already be filtered), but if they use a different kind, they could be incorrectly returned as the "user" participant. Not flagged as a bug since we can't verify the participant kind used by simulators without access to the server code, and this is pre-existing behavior.

(Refers to lines 285-319)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { AsyncLocalStorage } from 'node:async_hooks';
import * as os from 'node:os';
import * as path from 'node:path';
import type { Logger } from 'pino';
import { ATTRIBUTE_SIMULATOR, ATTRIBUTE_SIMULATOR_DISPATCH } from './constants.js';
import type { InferenceExecutor } from './ipc/inference_executor.js';
import { log } from './log.js';
import { flushOtelLogs, setupCloudTracer, uploadSessionReport } from './telemetry/index.js';
Expand Down Expand Up @@ -90,6 +91,49 @@ export type RunningJobInfo = {
apiSecret?: string;
};

export enum SimulationMode {
SIMULATION_MODE_UNSPECIFIED = 0,
SIMULATION_MODE_TEXT = 1,
SIMULATION_MODE_AUDIO = 2,
}

export type SimulationDispatch = {
simulationRunId?: string;
simulation_run_id?: string;
mode?: string | number;
scenario?: unknown;
};

export class SimulationContext<ProcessUserData = Record<string, unknown>> {
#dispatch: SimulationDispatch;
#jobContext: JobContext<ProcessUserData>;

constructor(dispatch: SimulationDispatch, jobContext: JobContext<ProcessUserData>) {
this.#dispatch = dispatch;
this.#jobContext = jobContext;
}

get scenario(): unknown {
return this.#dispatch.scenario;
}

get simulationMode(): SimulationMode {
const mode = this.#dispatch.mode;
if (mode === SimulationMode.SIMULATION_MODE_AUDIO || mode === 'SIMULATION_MODE_AUDIO') {
return SimulationMode.SIMULATION_MODE_AUDIO;
}
if (mode === SimulationMode.SIMULATION_MODE_TEXT || mode === 'SIMULATION_MODE_TEXT') {
return SimulationMode.SIMULATION_MODE_TEXT;
}
// Simulations predating the mode field were text-only.
return SimulationMode.SIMULATION_MODE_TEXT;
}

get jobContext(): JobContext<ProcessUserData> {
return this.#jobContext;
}
}

/** Attempted to add a function callback, but the function already exists. */
export class FunctionExistsError extends Error {
constructor(msg?: string) {
Expand Down Expand Up @@ -119,6 +163,8 @@ export class JobContext<ProcessUserData = Record<string, unknown>> {
} = {};
#logger: Logger;
#inferenceExecutor: InferenceExecutor;
#simulationResolved = false;
#simulationContext?: SimulationContext<ProcessUserData>;

/** @internal */
_primaryAgentSession?: AgentSession;
Expand Down Expand Up @@ -172,6 +218,48 @@ export class JobContext<ProcessUserData = Record<string, unknown>> {
return this.#info;
}

simulationContext(): SimulationContext<ProcessUserData> | undefined {
if (this.#simulationResolved) {
return this.#simulationContext;
}

this.#simulationResolved = true;
Comment on lines +222 to +226

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 simulationContext() permanently caches undefined when called before room is connected or simulator participant has joined

The simulationContext() method at agents/src/job.ts:221-261 sets #simulationResolved = true unconditionally on the first call and caches the result. If the method is called before ctx.connect() (when remoteParticipants is empty) or before the simulator participant has joined the room, the participant attribute lookup at line 229-238 will find nothing. On newer servers where the dispatch is placed in participant attributes (not job metadata), the job-metadata fallback at line 242 will also be empty, causing undefined to be cached permanently. Any subsequent call — even after the room is connected and the simulator participant is present — returns the stale cached undefined, making the simulation context permanently unresolvable.

Prompt for agents
The simulationContext() method in agents/src/job.ts uses a #simulationResolved boolean flag to cache the result after the first call. The problem is it caches even when the result is undefined (no simulation found), which prevents correct resolution if called before the room is connected or before the simulator participant has joined.

Possible approaches:
1. Only set #simulationResolved = true when a SimulationContext is successfully found. When returning undefined, leave #simulationResolved as false so subsequent calls can re-check. This allows the method to be called early (returning undefined) and then called again after connect (finding the actual context).
2. Alternatively, require the room to be connected before caching, e.g. check this.#room.isConnected and only cache when connected.
3. Document that the method must only be called after ctx.connect() and the simulator participant is present, and throw or warn if the room is not connected.

Approach 1 is simplest and most forgiving. The tradeoff is that the method will scan participants on every call until a simulation context is found, but this is a lightweight operation.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


let metadata = '';
for (const participant of this.#room.remoteParticipants.values()) {
if (!Object.hasOwn(participant.attributes, ATTRIBUTE_SIMULATOR)) {
continue;
}

metadata = participant.attributes[ATTRIBUTE_SIMULATOR_DISPATCH] || '';
if (metadata) {
break;
}
}

if (!metadata) {
// Older servers and fake job contexts placed the dispatch in job metadata.
metadata = (this.#info.job as proto.Job & { metadata?: string }).metadata || '';

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Job metadata fallback uses type assertion for metadata field

At agents/src/job.ts:242, the code casts this.#info.job as proto.Job & { metadata?: string } to access a metadata field. This suggests uncertainty about whether proto.Job from @livekit/protocol@1.46.4 includes a metadata property. If the protocol type already has metadata as a different type (e.g., Uint8Array for protobuf bytes), the || '' fallback with string comparison could behave unexpectedly. If it doesn't have metadata at all, the assertion is necessary but fragile — a future protocol update adding metadata with an incompatible type would silently break. Could not verify the actual proto.Job type definition as node_modules aren't installed.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

}
if (!metadata) {
return undefined;
}

let dispatch: SimulationDispatch;
try {
dispatch = JSON.parse(metadata) as SimulationDispatch;
} catch {
return undefined;
}

if (!(dispatch.simulationRunId || dispatch.simulation_run_id)) {
return undefined;
}

this.#simulationContext = new SimulationContext(dispatch, this);
return this.#simulationContext;
}

/** @returns The agent's participant if connected to the room, otherwise `undefined` */
get agent(): LocalParticipant | undefined {
return this.#room.localParticipant;
Expand Down