Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions agents/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ export const runApp = (opts: ServerOptions) => {
.command('start')
.description('Start the worker in production mode')
.addOption(logLevelOption('info'))
.addOption(
new Option(
'--simulation',
'Run under an agent simulation with worker load limit disabled',
).hideHelp(),
)
.action((...[, command]) => {
const globalOptions = program.optsWithGlobals();
const commandOptions = command.opts();
Expand All @@ -143,6 +149,7 @@ export const runApp = (opts: ServerOptions) => {
opts.apiSecret = globalOptions.apiSecret || opts.apiSecret;
opts.logLevel = commandOptions.logLevel;
opts.workerToken = globalOptions.workerToken || opts.workerToken;
opts.simulation = commandOptions.simulation;
runServer({
opts,
production: true,
Expand Down
72 changes: 72 additions & 0 deletions agents/src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,49 @@ export type RunningJobInfo = {
apiSecret?: string;
};

export enum SimulationMode {
SIMULATION_MODE_UNSPECIFIED = 0,
SIMULATION_MODE_TEXT = 1,
SIMULATION_MODE_AUDIO = 2,
}

export type SimulationDispatch = {
simulationRunId?: string;
simulation_run_id?: string;
mode?: string | number;
scenario?: unknown;
};

export class SimulationContext<ProcessUserData = Record<string, unknown>> {
#dispatch: SimulationDispatch;
#jobContext: JobContext<ProcessUserData>;

constructor(dispatch: SimulationDispatch, jobContext: JobContext<ProcessUserData>) {
this.#dispatch = dispatch;
this.#jobContext = jobContext;
}

get scenario(): unknown {
return this.#dispatch.scenario;
}

get simulationMode(): SimulationMode {
const mode = this.#dispatch.mode;
if (mode === SimulationMode.SIMULATION_MODE_AUDIO || mode === 'SIMULATION_MODE_AUDIO') {
return SimulationMode.SIMULATION_MODE_AUDIO;
}
if (mode === SimulationMode.SIMULATION_MODE_TEXT || mode === 'SIMULATION_MODE_TEXT') {
return SimulationMode.SIMULATION_MODE_TEXT;
}
// Simulations predating the mode field were text-only.
return SimulationMode.SIMULATION_MODE_TEXT;
}

get jobContext(): JobContext<ProcessUserData> {
return this.#jobContext;
}
}

/** Attempted to add a function callback, but the function already exists. */
export class FunctionExistsError extends Error {
constructor(msg?: string) {
Expand Down Expand Up @@ -119,6 +162,8 @@ export class JobContext<ProcessUserData = Record<string, unknown>> {
} = {};
#logger: Logger;
#inferenceExecutor: InferenceExecutor;
#simulationResolved = false;
#simulationContext?: SimulationContext<ProcessUserData>;

/** @internal */
_primaryAgentSession?: AgentSession;
Expand Down Expand Up @@ -172,6 +217,33 @@ export class JobContext<ProcessUserData = Record<string, unknown>> {
return this.#info;
}

simulationContext(): SimulationContext<ProcessUserData> | undefined {
if (this.#simulationResolved) {
return this.#simulationContext;
}

this.#simulationResolved = true;
const roomMetadata = (this.#room as { metadata?: string }).metadata;
const metadata = this.#info.job.metadata || roomMetadata;
if (!metadata) {
return undefined;
}

let dispatch: SimulationDispatch;
try {
dispatch = JSON.parse(metadata) as SimulationDispatch;
} catch {
return undefined;
}

if (!dispatch.simulationRunId && !dispatch.simulation_run_id) {
return undefined;
}

this.#simulationContext = new SimulationContext(dispatch, this);
return this.#simulationContext;
}

/** @returns The agent's participant if connected to the room, otherwise `undefined` */
get agent(): LocalParticipant | undefined {
return this.#room.localParticipant;
Expand Down
9 changes: 9 additions & 0 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -656,10 +656,16 @@ export class AgentActivity implements RecognitionHooks {
}

get vad(): VAD | undefined {
if (this.agentSession._textOnly) {
return undefined;
}
return this.agent.vad || this.agentSession.vad;
}

get stt(): STT | undefined {
if (this.agentSession._textOnly) {
return undefined;
}
return this.agent.stt || this.agentSession.stt;
}

Expand All @@ -679,6 +685,9 @@ export class AgentActivity implements RecognitionHooks {
}

get tts(): TTS | undefined {
if (this.agentSession._textOnly) {
return undefined;
}
return this.agent.tts || this.agentSession.tts;
}

Expand Down
14 changes: 13 additions & 1 deletion agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
type TTSModelString,
} from '../inference/index.js';
import type { OverlappingSpeechEvent } from '../inference/interruption/types.js';
import { getJobContext } from '../job.js';
import { SimulationMode, getJobContext } from '../job.js';
import type { FunctionCall, FunctionCallOutput } from '../llm/chat_context.js';
import {
AgentHandoffItem,
Expand Down Expand Up @@ -533,6 +533,12 @@ export class AgentSession<
return this.sessionOptions.useTtsAlignedTranscript;
}

/** @internal */
get _textOnly(): boolean {
const ctx = getJobContext(false);
return ctx?.simulationContext()?.simulationMode === SimulationMode.SIMULATION_MODE_TEXT;
}

set userData(value: UserData) {
this._userData = value;
}
Expand All @@ -557,6 +563,12 @@ export class AgentSession<

const tasks: Promise<void>[] = [];

if (this._textOnly) {
this.logger.info('text simulation: disabling STT/TTS/VAD and audio I/O');
inputOptions = { ...inputOptions, audioEnabled: false };
outputOptions = { ...outputOptions, audioEnabled: false };
}

if (room && !this._roomIO) {
// Check for existing input/output configuration and warn if needed
if (this.input.audio && inputOptions?.audioEnabled !== false) {
Expand Down
11 changes: 10 additions & 1 deletion agents/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ export class ServerOptions {
production: boolean;
jobMemoryWarnMB: number;
jobMemoryLimitMB: number;
/** @internal */
simulation: boolean;

/** @param options - Worker options */
constructor({
Expand All @@ -171,6 +173,7 @@ export class ServerOptions {
production = false,
jobMemoryWarnMB = 500,
jobMemoryLimitMB = 0,
simulation = false,
}: {
/**
* Path to a file that has {@link Agent} as a default export, dynamically imported later for
Expand Down Expand Up @@ -212,6 +215,8 @@ export class ServerOptions {
production?: boolean;
jobMemoryWarnMB?: number;
jobMemoryLimitMB?: number;
/** @internal */
simulation?: boolean;
}) {
this.agent = agent;
if (!this.agent) {
Expand Down Expand Up @@ -248,6 +253,7 @@ export class ServerOptions {
this.production = production;
this.jobMemoryWarnMB = jobMemoryWarnMB;
this.jobMemoryLimitMB = jobMemoryLimitMB;
this.simulation = simulation;
}
}

Expand Down Expand Up @@ -390,6 +396,9 @@ export class AgentServer {
}

this.#logger.info('starting worker');
if (this.#opts.simulation) {
this.#logger.info('simulation mode enabled: worker load limit disabled');
}
this.#closed = false;
this.#procPool.start();

Expand Down Expand Up @@ -705,7 +714,7 @@ export class AgentServer {
this.#opts
.loadFunc(this)
.then((currentLoad: number) => {
const isFull = currentLoad >= this.#opts.loadThreshold;
const isFull = !this.#opts.simulation && currentLoad >= this.#opts.loadThreshold;
const currentlyAvailable = !isFull;
currentStatus = currentlyAvailable ? WorkerStatus.WS_AVAILABLE : WorkerStatus.WS_FULL;

Expand Down