Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/simulation-http-server.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

Do not start the worker HTTP server while running simulations.
4 changes: 3 additions & 1 deletion agents/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ const runServer = async (args: CliArgs) => {

// though `production` is defined in ServerOptions, it will always be overridden by CLI.
const { production: _, ...opts } = args.opts; // eslint-disable-line @typescript-eslint/no-unused-vars
const server = new AgentServer(new ServerOptions({ production: args.production, ...opts }));
const server = new AgentServer(
new ServerOptions({ ...opts, production: args.production, simulation: !!args.room }),
);

if (args.room) {
server.event.once('worker_registered', () => {
Expand Down
68 changes: 40 additions & 28 deletions agents/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ export class ServerOptions {
port: number;
logLevel: string;
production: boolean;
simulation: boolean;
jobMemoryWarnMB: number;
jobMemoryLimitMB: number;

Expand All @@ -169,6 +170,7 @@ export class ServerOptions {
port = undefined,
logLevel = 'info',
production = false,
simulation = false,
jobMemoryWarnMB = 500,
jobMemoryLimitMB = 0,
}: {
Expand Down Expand Up @@ -210,6 +212,7 @@ export class ServerOptions {
port?: number;
logLevel?: string;
production?: boolean;
simulation?: boolean;
jobMemoryWarnMB?: number;
jobMemoryLimitMB?: number;
}) {
Expand Down Expand Up @@ -246,6 +249,7 @@ export class ServerOptions {
this.port = port || Default.port(production);
this.logLevel = logLevel;
this.production = production;
this.simulation = simulation;
this.jobMemoryWarnMB = jobMemoryWarnMB;
this.jobMemoryLimitMB = jobMemoryLimitMB;
}
Expand Down Expand Up @@ -283,7 +287,7 @@ export class AgentServer {

event = new EventEmitter();
#session: WebSocket | undefined = undefined;
#httpServer: HTTPServer;
#httpServer?: HTTPServer;
#logger = log().child({ version });
#inferenceExecutor?: InferenceProcExecutor;

Expand Down Expand Up @@ -353,35 +357,39 @@ export class AgentServer {

this.#opts = opts;

const healthCheck = () => {
// Check if inference executor exists and is not alive
if (this.#inferenceExecutor && !this.#inferenceExecutor.isAlive) {
return { healthy: false, message: 'inference process not running' };
}
// Simulations run ephemeral workers side by side; a health endpoint on a fixed port would make
// concurrent runs collide.
if (!opts.simulation) {
const healthCheck = () => {
// Check if inference executor exists and is not alive
if (this.#inferenceExecutor && !this.#inferenceExecutor.isAlive) {
return { healthy: false, message: 'inference process not running' };
}

// Only healthy when fully connected with an active WebSocket
if (
this.#closed ||
this.#connecting ||
!this.#session ||
this.#session.readyState !== WebSocket.OPEN
) {
return { healthy: false, message: 'not connected to livekit' };
}
// Only healthy when fully connected with an active WebSocket
if (
this.#closed ||
this.#connecting ||
!this.#session ||
this.#session.readyState !== WebSocket.OPEN
) {
return { healthy: false, message: 'not connected to livekit' };
}

return { healthy: true, message: 'OK' };
};
return { healthy: true, message: 'OK' };
};

const getWorkerInfo = () => ({
agent_name: opts.agentName,
agent_name_is_env: opts.agentNameIsEnv,
worker_type: JobType[opts.serverType],
active_jobs: this.activeJobs.length,
sdk_version: version,
project_type: PROJECT_TYPE,
});
const getWorkerInfo = () => ({
agent_name: opts.agentName,
agent_name_is_env: opts.agentNameIsEnv,
worker_type: JobType[opts.serverType],
active_jobs: this.activeJobs.length,
sdk_version: version,
project_type: PROJECT_TYPE,
});

this.#httpServer = new HTTPServer(opts.host, opts.port, healthCheck, getWorkerInfo);
this.#httpServer = new HTTPServer(opts.host, opts.port, healthCheck, getWorkerInfo);
}
}

/** @throws {@link WorkerError} if worker failed to connect or already running */
Expand Down Expand Up @@ -448,7 +456,11 @@ export class AgentServer {
}
};

await ThrowsPromise.all([workerWS(), this.#httpServer.run()]);
const tasks = [workerWS()];
if (this.#httpServer) {
tasks.push(this.#httpServer.run());
}
await ThrowsPromise.all(tasks);
this.#close.resolve();
}

Expand Down Expand Up @@ -881,7 +893,7 @@ export class AgentServer {

await this.#inferenceExecutor?.close();
await this.#procPool.close();
await this.#httpServer.close();
await this.#httpServer?.close();
await ThrowsPromise.allSettled(this.#tasks);

this.#session?.close();
Expand Down