diff --git a/.changeset/fix-supervised-proc-init-wedge.md b/.changeset/fix-supervised-proc-init-wedge.md new file mode 100644 index 000000000..224de8767 --- /dev/null +++ b/.changeset/fix-supervised-proc-init-wedge.md @@ -0,0 +1,22 @@ +--- +'@livekit/agents': patch +--- + +Fix `ProcPool` wedging permanently when a warming child process dies or +hangs before its first IPC message. + +`SupervisedProc.initialize()` only completed via +`await once(proc, 'message')`, which never settled when the child exited +or crashed mid-prewarm (kernel OOM, V8 heap abort, import crash). The +init timeout rejected the side `init` future but left `initialize()` +itself pending, so `ProcPool.procWatchTask` was parked at +`await proc.initialize()` holding both `initMutex` and its `procMutex` +slot — the worker kept reporting available and accepting jobs that could +never launch. + +`initialize()` now races three signals — first message, child `exit`, +and the init timeout — and kills the child on timeout. Late race losers +swallow their own rejection so a normal child exit after a successful +init never surfaces as an unhandled rejection. The pool's +`procWatchTask` already catches initialization failures, so its mutex +slots release and the pool replenishes as intended. diff --git a/agents/src/ipc/supervised_proc.test.ts b/agents/src/ipc/supervised_proc.test.ts index da2713cca..ee855e9d2 100644 --- a/agents/src/ipc/supervised_proc.test.ts +++ b/agents/src/ipc/supervised_proc.test.ts @@ -126,17 +126,19 @@ describe('IPC send on dead process', () => { }); describe('init timeout rejection handling', () => { - it('does not produce unhandled rejection when init times out', async () => { - // Regression test: before the fix, run() was called without await in start(). - // When init timed out, the rejection in run()'s `await this.init.await` escaped - // as an unhandled rejection — crashing the Node.js process. + it('rejects initialize() and produces no unhandled rejections when init times out', async () => { + // Regression test: before the fix, initialize() would either silently + // resolve (after a late first-message arrived) or — for a child that + // never sent a message — hang forever, wedging the pool. It must now + // reject so the caller can release its mutex slots. The Future's + // self-attached `.catch` and the race losers' attached `.catch` ensure + // no rejection escapes as unhandled. const unhandled: unknown[] = []; const handler = (reason: unknown) => unhandled.push(reason); process.on('unhandledRejection', handler); - // Child that responds AFTER the timeout — simulates slow init under CPU pressure. - // Timeout fires at 50ms (init.reject), child responds at 200ms (once() resolves). - // Before the fix, init.reject caused an unhandled rejection in run(). + // Child that responds AFTER the timeout — simulates slow init under + // CPU pressure. Timeout fires at 50 ms, child would respond at 200 ms. const slowScript = join(tmpdir(), 'test_slow_init_child.mjs'); writeFileSync( slowScript, @@ -168,9 +170,7 @@ describe('init timeout rejection handling', () => { ); await proc.start(); - // initialize() returns normally: child responds at 200ms, once() resolves, - // but init was already rejected at 50ms — run() gets the rejection. - await proc.initialize(); + await expect(proc.initialize()).rejects.toThrow(/timed out|exited before/); // Give the event loop a tick for any unhandled rejection to surface await new Promise((r) => setTimeout(r, 100)); @@ -185,8 +185,9 @@ describe('init timeout rejection handling', () => { }); it('join() resolves after init timeout instead of hanging forever', async () => { - // When run() fails early (before registering proc event handlers), - // #join must still resolve so that join() and close() don't hang. + // When init fails (rejected), run() throws at `await this.init.await`, + // start()'s catch resolves #join, and join() must return promptly so + // that the pool can release its mutex slots and replenish. const slowScript = join(tmpdir(), 'test_slow_init_child_join.mjs'); writeFileSync( slowScript, @@ -210,7 +211,7 @@ describe('init timeout rejection handling', () => { const proc = new TestProc(50, 1000, 0, 0, 5000, 60000, 2500); await proc.start(); - await proc.initialize(); + await expect(proc.initialize()).rejects.toThrow(); // join() must resolve within a reasonable time, not hang forever const result = await Promise.race([ @@ -225,6 +226,51 @@ describe('init timeout rejection handling', () => { expect(result).toBe('resolved'); }); + + it('rejects initialize() when child hangs without ever sending a message (#1748 wedge)', async () => { + // Reproduces the production wedge in #1748: a child that never sends + // its first IPC message (e.g. crashed mid-prewarm) used to leave + // `initialize()` pending forever via `await once(proc, 'message')`, + // holding initMutex and the procMutex slot and black-holing every + // subsequent job. With the fix initialize() races the message against + // exit + timeout, so it always settles. + const hangScript = join(tmpdir(), 'test_hang_init_child.mjs'); + writeFileSync( + hangScript, + // never registers a message handler, never sends anything + `setInterval(() => {}, 1000);`, + ); + + const { SupervisedProc } = await import('./supervised_proc.js'); + class TestProc extends SupervisedProc { + protected get processKind() { + return 'job'; + } + createProcess() { + return fork(hangScript, [], { stdio: ['pipe', 'pipe', 'pipe', 'ipc'] }); + } + async mainTask() {} + } + + const proc = new TestProc(150, 1000, 0, 0, 5000, 60000, 2500); + + await proc.start(); + const outcome = await Promise.race([ + proc.initialize().then( + () => 'resolved' as const, + (err: Error) => `rejected: ${err.message}` as const, + ), + new Promise<'pending'>((r) => setTimeout(() => r('pending'), 2000)), + ]); + + proc.proc?.kill(); + try { + unlinkSync(hangScript); + } catch {} + + expect(outcome).toMatch(/^rejected:/); + expect(outcome).not.toBe('pending'); + }); }); describe('timer cleanup', () => { diff --git a/agents/src/ipc/supervised_proc.ts b/agents/src/ipc/supervised_proc.ts index ec2e0ff76..3d05c85cb 100644 --- a/agents/src/ipc/supervised_proc.ts +++ b/agents/src/ipc/supervised_proc.ts @@ -96,10 +96,9 @@ export abstract class SupervisedProc { this.#startedAt = performance.now(); this.run().catch((err) => { this.#logger.child({ err }).warn('supervised process run failed'); - // Note: we intentionally do NOT kill the child process here. Killing it - // would race with initialize()'s `once(proc, 'message')`, causing - // initialize() to hang forever and deadlocking the caller (proc_pool). - // The child process is cleaned up when the pool shuts down. + // initialize() owns killing the child on its own failure paths, so we + // don't need to kill it again here. Resolve #join so any pool caller + // parked on join() unblocks promptly. this.#join.resolve(); }); } @@ -198,12 +197,21 @@ export abstract class SupervisedProc { } async initialize() { + let timedOut = false; const timer = setTimeout(() => { + timedOut = true; this.init.reject(new Error('runner initialization timed out')); + try { + this.proc?.kill('SIGKILL'); + } catch { + // proc may have already exited; the exit-race below will still settle. + } }, this.#opts.initializeTimeout); if (!this.proc?.connected) { - this.init.reject(new Error('process not connected')); - return; + clearTimeout(timer); + const err = new Error('process not connected'); + this.init.reject(err); + throw err; } this.proc.send({ case: 'initializeRequest', @@ -214,12 +222,40 @@ export abstract class SupervisedProc { highPingThreshold: this.#opts.highPingThreshold, }, }); - await once(this.proc!, 'message').then(([msg]: IPCMessage[]) => { - clearTimeout(timer); + + // Race three signals so initialize() always settles even when the warming + // child dies/hangs without ever sending an IPC message: + // 1. firstMessage — happy path + // 2. exited — child crashed/exited before initializeResponse + // 3. this.init — the timeout above (or any other init rejection) + // The losers of the race must pre-attach a `.catch` so the late + // post-success `exit` does not surface as an unhandledRejection. + const firstMessage = once(this.proc, 'message').then(([msg]: IPCMessage[]) => { if (msg!.case !== 'initializeResponse') { throw new Error('first message must be InitializeResponse'); } }); + const exited = once(this.proc, 'exit').then(() => { + throw new Error('process exited before initialization completed'); + }); + firstMessage.catch(() => {}); + exited.catch(() => {}); + + try { + await Promise.race([firstMessage, exited, this.init.await]); + } catch (err) { + if (!timedOut) { + try { + this.proc?.kill('SIGKILL'); + } catch { + // already dead + } + } + this.init.reject(err as Error); + throw err; + } finally { + clearTimeout(timer); + } this.init.resolve(); }