diff --git a/.changeset/simulation-http-server.md b/.changeset/simulation-http-server.md new file mode 100644 index 000000000..7e29b9b71 --- /dev/null +++ b/.changeset/simulation-http-server.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +Do not start the worker HTTP server while running simulations. diff --git a/agents/src/cli.ts b/agents/src/cli.ts index c5f941dbd..f194b2c1d 100644 --- a/agents/src/cli.ts +++ b/agents/src/cli.ts @@ -29,7 +29,9 @@ const runServer = async (args: CliArgs) => { // though `production` is defined in ServerOptions, it will always be overridden by CLI. const { production: _, ...opts } = args.opts; // eslint-disable-line @typescript-eslint/no-unused-vars - const server = new AgentServer(new ServerOptions({ production: args.production, ...opts })); + const server = new AgentServer( + new ServerOptions({ ...opts, production: args.production, simulation: !!args.room }), + ); if (args.room) { server.event.once('worker_registered', () => { diff --git a/agents/src/worker.ts b/agents/src/worker.ts index 860b57005..b661af4b5 100644 --- a/agents/src/worker.ts +++ b/agents/src/worker.ts @@ -144,6 +144,7 @@ export class ServerOptions { port: number; logLevel: string; production: boolean; + simulation: boolean; jobMemoryWarnMB: number; jobMemoryLimitMB: number; @@ -169,6 +170,7 @@ export class ServerOptions { port = undefined, logLevel = 'info', production = false, + simulation = false, jobMemoryWarnMB = 500, jobMemoryLimitMB = 0, }: { @@ -210,6 +212,7 @@ export class ServerOptions { port?: number; logLevel?: string; production?: boolean; + simulation?: boolean; jobMemoryWarnMB?: number; jobMemoryLimitMB?: number; }) { @@ -246,6 +249,7 @@ export class ServerOptions { this.port = port || Default.port(production); this.logLevel = logLevel; this.production = production; + this.simulation = simulation; this.jobMemoryWarnMB = jobMemoryWarnMB; this.jobMemoryLimitMB = jobMemoryLimitMB; } @@ -283,7 +287,7 @@ export class AgentServer { event = new EventEmitter(); #session: WebSocket | undefined = undefined; - #httpServer: HTTPServer; + #httpServer?: HTTPServer; #logger = log().child({ version }); #inferenceExecutor?: InferenceProcExecutor; @@ -353,35 +357,39 @@ export class AgentServer { this.#opts = opts; - const healthCheck = () => { - // Check if inference executor exists and is not alive - if (this.#inferenceExecutor && !this.#inferenceExecutor.isAlive) { - return { healthy: false, message: 'inference process not running' }; - } + // Simulations run ephemeral workers side by side; a health endpoint on a fixed port would make + // concurrent runs collide. + if (!opts.simulation) { + const healthCheck = () => { + // Check if inference executor exists and is not alive + if (this.#inferenceExecutor && !this.#inferenceExecutor.isAlive) { + return { healthy: false, message: 'inference process not running' }; + } - // Only healthy when fully connected with an active WebSocket - if ( - this.#closed || - this.#connecting || - !this.#session || - this.#session.readyState !== WebSocket.OPEN - ) { - return { healthy: false, message: 'not connected to livekit' }; - } + // Only healthy when fully connected with an active WebSocket + if ( + this.#closed || + this.#connecting || + !this.#session || + this.#session.readyState !== WebSocket.OPEN + ) { + return { healthy: false, message: 'not connected to livekit' }; + } - return { healthy: true, message: 'OK' }; - }; + return { healthy: true, message: 'OK' }; + }; - const getWorkerInfo = () => ({ - agent_name: opts.agentName, - agent_name_is_env: opts.agentNameIsEnv, - worker_type: JobType[opts.serverType], - active_jobs: this.activeJobs.length, - sdk_version: version, - project_type: PROJECT_TYPE, - }); + const getWorkerInfo = () => ({ + agent_name: opts.agentName, + agent_name_is_env: opts.agentNameIsEnv, + worker_type: JobType[opts.serverType], + active_jobs: this.activeJobs.length, + sdk_version: version, + project_type: PROJECT_TYPE, + }); - this.#httpServer = new HTTPServer(opts.host, opts.port, healthCheck, getWorkerInfo); + this.#httpServer = new HTTPServer(opts.host, opts.port, healthCheck, getWorkerInfo); + } } /** @throws {@link WorkerError} if worker failed to connect or already running */ @@ -448,7 +456,11 @@ export class AgentServer { } }; - await ThrowsPromise.all([workerWS(), this.#httpServer.run()]); + const tasks = [workerWS()]; + if (this.#httpServer) { + tasks.push(this.#httpServer.run()); + } + await ThrowsPromise.all(tasks); this.#close.resolve(); } @@ -881,7 +893,7 @@ export class AgentServer { await this.#inferenceExecutor?.close(); await this.#procPool.close(); - await this.#httpServer.close(); + await this.#httpServer?.close(); await ThrowsPromise.allSettled(this.#tasks); this.#session?.close();