From df6592f797c50556aef63228e525272f865939de Mon Sep 17 00:00:00 2001 From: tsushanth <78000697+tsushanth@users.noreply.github.com> Date: Thu, 11 Jun 2026 10:34:18 -0700 Subject: [PATCH] fix(ipc): make SupervisedProc.initialize() always settle (#1748 wedge) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit initialize() only completed via `await once(proc, 'message')`, so a warming child that died or hung before sending its first IPC message left initialize() pending forever. The init timeout rejected the side `init` future but did not unblock initialize() itself, so ProcPool.procWatchTask was parked at `await proc.initialize()` holding both initMutex and its procMutex slot — the worker kept reporting available and accepting jobs that could never launch. In production this black-holed every job routed to the worker until the process was externally restarted. initialize() now races three signals — first message, child exit, and the init timeout — and SIGKILLs the child on timeout. Late race losers swallow their own rejection so a normal child exit after a successful init never surfaces as an unhandled rejection. procWatchTask already catches initialization failures, so its mutex slots release and the pool replenishes as intended. Update the stale comment in start() and the two existing init-timeout tests to assert initialize() now rejects; add a regression test for the never-sends-a-message wedge. Closes #1748 --- .changeset/fix-supervised-proc-init-wedge.md | 22 ++++++ agents/src/ipc/supervised_proc.test.ts | 72 ++++++++++++++++---- agents/src/ipc/supervised_proc.ts | 52 +++++++++++--- 3 files changed, 125 insertions(+), 21 deletions(-) create mode 100644 .changeset/fix-supervised-proc-init-wedge.md diff --git a/.changeset/fix-supervised-proc-init-wedge.md b/.changeset/fix-supervised-proc-init-wedge.md new file mode 100644 index 000000000..224de8767 --- /dev/null +++ b/.changeset/fix-supervised-proc-init-wedge.md @@ -0,0 +1,22 @@ +--- +'@livekit/agents': patch +--- + +Fix `ProcPool` wedging permanently when a warming child process dies or +hangs before its first IPC message. + +`SupervisedProc.initialize()` only completed via +`await once(proc, 'message')`, which never settled when the child exited +or crashed mid-prewarm (kernel OOM, V8 heap abort, import crash). The +init timeout rejected the side `init` future but left `initialize()` +itself pending, so `ProcPool.procWatchTask` was parked at +`await proc.initialize()` holding both `initMutex` and its `procMutex` +slot — the worker kept reporting available and accepting jobs that could +never launch. + +`initialize()` now races three signals — first message, child `exit`, +and the init timeout — and kills the child on timeout. Late race losers +swallow their own rejection so a normal child exit after a successful +init never surfaces as an unhandled rejection. The pool's +`procWatchTask` already catches initialization failures, so its mutex +slots release and the pool replenishes as intended. diff --git a/agents/src/ipc/supervised_proc.test.ts b/agents/src/ipc/supervised_proc.test.ts index da2713cca..ee855e9d2 100644 --- a/agents/src/ipc/supervised_proc.test.ts +++ b/agents/src/ipc/supervised_proc.test.ts @@ -126,17 +126,19 @@ describe('IPC send on dead process', () => { }); describe('init timeout rejection handling', () => { - it('does not produce unhandled rejection when init times out', async () => { - // Regression test: before the fix, run() was called without await in start(). - // When init timed out, the rejection in run()'s `await this.init.await` escaped - // as an unhandled rejection — crashing the Node.js process. + it('rejects initialize() and produces no unhandled rejections when init times out', async () => { + // Regression test: before the fix, initialize() would either silently + // resolve (after a late first-message arrived) or — for a child that + // never sent a message — hang forever, wedging the pool. It must now + // reject so the caller can release its mutex slots. The Future's + // self-attached `.catch` and the race losers' attached `.catch` ensure + // no rejection escapes as unhandled. const unhandled: unknown[] = []; const handler = (reason: unknown) => unhandled.push(reason); process.on('unhandledRejection', handler); - // Child that responds AFTER the timeout — simulates slow init under CPU pressure. - // Timeout fires at 50ms (init.reject), child responds at 200ms (once() resolves). - // Before the fix, init.reject caused an unhandled rejection in run(). + // Child that responds AFTER the timeout — simulates slow init under + // CPU pressure. Timeout fires at 50 ms, child would respond at 200 ms. const slowScript = join(tmpdir(), 'test_slow_init_child.mjs'); writeFileSync( slowScript, @@ -168,9 +170,7 @@ describe('init timeout rejection handling', () => { ); await proc.start(); - // initialize() returns normally: child responds at 200ms, once() resolves, - // but init was already rejected at 50ms — run() gets the rejection. - await proc.initialize(); + await expect(proc.initialize()).rejects.toThrow(/timed out|exited before/); // Give the event loop a tick for any unhandled rejection to surface await new Promise((r) => setTimeout(r, 100)); @@ -185,8 +185,9 @@ describe('init timeout rejection handling', () => { }); it('join() resolves after init timeout instead of hanging forever', async () => { - // When run() fails early (before registering proc event handlers), - // #join must still resolve so that join() and close() don't hang. + // When init fails (rejected), run() throws at `await this.init.await`, + // start()'s catch resolves #join, and join() must return promptly so + // that the pool can release its mutex slots and replenish. const slowScript = join(tmpdir(), 'test_slow_init_child_join.mjs'); writeFileSync( slowScript, @@ -210,7 +211,7 @@ describe('init timeout rejection handling', () => { const proc = new TestProc(50, 1000, 0, 0, 5000, 60000, 2500); await proc.start(); - await proc.initialize(); + await expect(proc.initialize()).rejects.toThrow(); // join() must resolve within a reasonable time, not hang forever const result = await Promise.race([ @@ -225,6 +226,51 @@ describe('init timeout rejection handling', () => { expect(result).toBe('resolved'); }); + + it('rejects initialize() when child hangs without ever sending a message (#1748 wedge)', async () => { + // Reproduces the production wedge in #1748: a child that never sends + // its first IPC message (e.g. crashed mid-prewarm) used to leave + // `initialize()` pending forever via `await once(proc, 'message')`, + // holding initMutex and the procMutex slot and black-holing every + // subsequent job. With the fix initialize() races the message against + // exit + timeout, so it always settles. + const hangScript = join(tmpdir(), 'test_hang_init_child.mjs'); + writeFileSync( + hangScript, + // never registers a message handler, never sends anything + `setInterval(() => {}, 1000);`, + ); + + const { SupervisedProc } = await import('./supervised_proc.js'); + class TestProc extends SupervisedProc { + protected get processKind() { + return 'job'; + } + createProcess() { + return fork(hangScript, [], { stdio: ['pipe', 'pipe', 'pipe', 'ipc'] }); + } + async mainTask() {} + } + + const proc = new TestProc(150, 1000, 0, 0, 5000, 60000, 2500); + + await proc.start(); + const outcome = await Promise.race([ + proc.initialize().then( + () => 'resolved' as const, + (err: Error) => `rejected: ${err.message}` as const, + ), + new Promise<'pending'>((r) => setTimeout(() => r('pending'), 2000)), + ]); + + proc.proc?.kill(); + try { + unlinkSync(hangScript); + } catch {} + + expect(outcome).toMatch(/^rejected:/); + expect(outcome).not.toBe('pending'); + }); }); describe('timer cleanup', () => { diff --git a/agents/src/ipc/supervised_proc.ts b/agents/src/ipc/supervised_proc.ts index ec2e0ff76..3d05c85cb 100644 --- a/agents/src/ipc/supervised_proc.ts +++ b/agents/src/ipc/supervised_proc.ts @@ -96,10 +96,9 @@ export abstract class SupervisedProc { this.#startedAt = performance.now(); this.run().catch((err) => { this.#logger.child({ err }).warn('supervised process run failed'); - // Note: we intentionally do NOT kill the child process here. Killing it - // would race with initialize()'s `once(proc, 'message')`, causing - // initialize() to hang forever and deadlocking the caller (proc_pool). - // The child process is cleaned up when the pool shuts down. + // initialize() owns killing the child on its own failure paths, so we + // don't need to kill it again here. Resolve #join so any pool caller + // parked on join() unblocks promptly. this.#join.resolve(); }); } @@ -198,12 +197,21 @@ export abstract class SupervisedProc { } async initialize() { + let timedOut = false; const timer = setTimeout(() => { + timedOut = true; this.init.reject(new Error('runner initialization timed out')); + try { + this.proc?.kill('SIGKILL'); + } catch { + // proc may have already exited; the exit-race below will still settle. + } }, this.#opts.initializeTimeout); if (!this.proc?.connected) { - this.init.reject(new Error('process not connected')); - return; + clearTimeout(timer); + const err = new Error('process not connected'); + this.init.reject(err); + throw err; } this.proc.send({ case: 'initializeRequest', @@ -214,12 +222,40 @@ export abstract class SupervisedProc { highPingThreshold: this.#opts.highPingThreshold, }, }); - await once(this.proc!, 'message').then(([msg]: IPCMessage[]) => { - clearTimeout(timer); + + // Race three signals so initialize() always settles even when the warming + // child dies/hangs without ever sending an IPC message: + // 1. firstMessage — happy path + // 2. exited — child crashed/exited before initializeResponse + // 3. this.init — the timeout above (or any other init rejection) + // The losers of the race must pre-attach a `.catch` so the late + // post-success `exit` does not surface as an unhandledRejection. + const firstMessage = once(this.proc, 'message').then(([msg]: IPCMessage[]) => { if (msg!.case !== 'initializeResponse') { throw new Error('first message must be InitializeResponse'); } }); + const exited = once(this.proc, 'exit').then(() => { + throw new Error('process exited before initialization completed'); + }); + firstMessage.catch(() => {}); + exited.catch(() => {}); + + try { + await Promise.race([firstMessage, exited, this.init.await]); + } catch (err) { + if (!timedOut) { + try { + this.proc?.kill('SIGKILL'); + } catch { + // already dead + } + } + this.init.reject(err as Error); + throw err; + } finally { + clearTimeout(timer); + } this.init.resolve(); }