Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .changeset/fix-supervised-proc-init-wedge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
'@livekit/agents': patch
---

Fix `ProcPool` wedging permanently when a warming child process dies or
hangs before its first IPC message.

`SupervisedProc.initialize()` only completed via
`await once(proc, 'message')`, which never settled when the child exited
or crashed mid-prewarm (kernel OOM, V8 heap abort, import crash). The
init timeout rejected the side `init` future but left `initialize()`
itself pending, so `ProcPool.procWatchTask` was parked at
`await proc.initialize()` holding both `initMutex` and its `procMutex`
slot — the worker kept reporting available and accepting jobs that could
never launch.

`initialize()` now races three signals — first message, child `exit`,
and the init timeout — and kills the child on timeout. Late race losers
swallow their own rejection so a normal child exit after a successful
init never surfaces as an unhandled rejection. The pool's
`procWatchTask` already catches initialization failures, so its mutex
slots release and the pool replenishes as intended.
72 changes: 59 additions & 13 deletions agents/src/ipc/supervised_proc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,19 @@ describe('IPC send on dead process', () => {
});

describe('init timeout rejection handling', () => {
it('does not produce unhandled rejection when init times out', async () => {
// Regression test: before the fix, run() was called without await in start().
// When init timed out, the rejection in run()'s `await this.init.await` escaped
// as an unhandled rejection — crashing the Node.js process.
it('rejects initialize() and produces no unhandled rejections when init times out', async () => {
// Regression test: before the fix, initialize() would either silently
// resolve (after a late first-message arrived) or — for a child that
// never sent a message — hang forever, wedging the pool. It must now
// reject so the caller can release its mutex slots. The Future's
// self-attached `.catch` and the race losers' attached `.catch` ensure
// no rejection escapes as unhandled.
const unhandled: unknown[] = [];
const handler = (reason: unknown) => unhandled.push(reason);
process.on('unhandledRejection', handler);

// Child that responds AFTER the timeout — simulates slow init under CPU pressure.
// Timeout fires at 50ms (init.reject), child responds at 200ms (once() resolves).
// Before the fix, init.reject caused an unhandled rejection in run().
// Child that responds AFTER the timeout — simulates slow init under
// CPU pressure. Timeout fires at 50 ms, child would respond at 200 ms.
const slowScript = join(tmpdir(), 'test_slow_init_child.mjs');
writeFileSync(
slowScript,
Expand Down Expand Up @@ -168,9 +170,7 @@ describe('init timeout rejection handling', () => {
);

await proc.start();
// initialize() returns normally: child responds at 200ms, once() resolves,
// but init was already rejected at 50ms — run() gets the rejection.
await proc.initialize();
await expect(proc.initialize()).rejects.toThrow(/timed out|exited before/);

// Give the event loop a tick for any unhandled rejection to surface
await new Promise((r) => setTimeout(r, 100));
Expand All @@ -185,8 +185,9 @@ describe('init timeout rejection handling', () => {
});

it('join() resolves after init timeout instead of hanging forever', async () => {
// When run() fails early (before registering proc event handlers),
// #join must still resolve so that join() and close() don't hang.
// When init fails (rejected), run() throws at `await this.init.await`,
// start()'s catch resolves #join, and join() must return promptly so
// that the pool can release its mutex slots and replenish.
const slowScript = join(tmpdir(), 'test_slow_init_child_join.mjs');
writeFileSync(
slowScript,
Expand All @@ -210,7 +211,7 @@ describe('init timeout rejection handling', () => {
const proc = new TestProc(50, 1000, 0, 0, 5000, 60000, 2500);

await proc.start();
await proc.initialize();
await expect(proc.initialize()).rejects.toThrow();

// join() must resolve within a reasonable time, not hang forever
const result = await Promise.race([
Expand All @@ -225,6 +226,51 @@ describe('init timeout rejection handling', () => {

expect(result).toBe('resolved');
});

it('rejects initialize() when child hangs without ever sending a message (#1748 wedge)', async () => {
// Reproduces the production wedge in #1748: a child that never sends
// its first IPC message (e.g. crashed mid-prewarm) used to leave
// `initialize()` pending forever via `await once(proc, 'message')`,
// holding initMutex and the procMutex slot and black-holing every
// subsequent job. With the fix initialize() races the message against
// exit + timeout, so it always settles.
const hangScript = join(tmpdir(), 'test_hang_init_child.mjs');
writeFileSync(
hangScript,
// never registers a message handler, never sends anything
`setInterval(() => {}, 1000);`,
);

const { SupervisedProc } = await import('./supervised_proc.js');
class TestProc extends SupervisedProc {
protected get processKind() {
return 'job';
}
createProcess() {
return fork(hangScript, [], { stdio: ['pipe', 'pipe', 'pipe', 'ipc'] });
}
async mainTask() {}
}

const proc = new TestProc(150, 1000, 0, 0, 5000, 60000, 2500);

await proc.start();
const outcome = await Promise.race([
proc.initialize().then(
() => 'resolved' as const,
(err: Error) => `rejected: ${err.message}` as const,
),
new Promise<'pending'>((r) => setTimeout(() => r('pending'), 2000)),
]);

proc.proc?.kill();
try {
unlinkSync(hangScript);
} catch {}

expect(outcome).toMatch(/^rejected:/);
expect(outcome).not.toBe('pending');
});
});

describe('timer cleanup', () => {
Expand Down
52 changes: 44 additions & 8 deletions agents/src/ipc/supervised_proc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ export abstract class SupervisedProc {
this.#startedAt = performance.now();
this.run().catch((err) => {
this.#logger.child({ err }).warn('supervised process run failed');
// Note: we intentionally do NOT kill the child process here. Killing it
// would race with initialize()'s `once(proc, 'message')`, causing
// initialize() to hang forever and deadlocking the caller (proc_pool).
// The child process is cleaned up when the pool shuts down.
// initialize() owns killing the child on its own failure paths, so we
// don't need to kill it again here. Resolve #join so any pool caller
// parked on join() unblocks promptly.
this.#join.resolve();
});
}
Expand Down Expand Up @@ -198,12 +197,21 @@ export abstract class SupervisedProc {
}

async initialize() {
let timedOut = false;
const timer = setTimeout(() => {
timedOut = true;
this.init.reject(new Error('runner initialization timed out'));
try {
this.proc?.kill('SIGKILL');
} catch {
// proc may have already exited; the exit-race below will still settle.
}
}, this.#opts.initializeTimeout);
if (!this.proc?.connected) {
this.init.reject(new Error('process not connected'));
return;
clearTimeout(timer);
const err = new Error('process not connected');
this.init.reject(err);
throw err;
}
this.proc.send({
case: 'initializeRequest',
Expand All @@ -214,12 +222,40 @@ export abstract class SupervisedProc {
highPingThreshold: this.#opts.highPingThreshold,
},
});
await once(this.proc!, 'message').then(([msg]: IPCMessage[]) => {
clearTimeout(timer);

// Race three signals so initialize() always settles even when the warming
// child dies/hangs without ever sending an IPC message:
// 1. firstMessage — happy path
// 2. exited — child crashed/exited before initializeResponse
// 3. this.init — the timeout above (or any other init rejection)
// The losers of the race must pre-attach a `.catch` so the late
// post-success `exit` does not surface as an unhandledRejection.
const firstMessage = once(this.proc, 'message').then(([msg]: IPCMessage[]) => {
if (msg!.case !== 'initializeResponse') {
throw new Error('first message must be InitializeResponse');
}
});
const exited = once(this.proc, 'exit').then(() => {
throw new Error('process exited before initialization completed');
});
firstMessage.catch(() => {});
exited.catch(() => {});

try {
await Promise.race([firstMessage, exited, this.init.await]);
} catch (err) {
if (!timedOut) {
try {
this.proc?.kill('SIGKILL');
} catch {
// already dead
}
}
this.init.reject(err as Error);
throw err;
} finally {
clearTimeout(timer);
}
this.init.resolve();
}

Expand Down