diff --git a/.fallowrc.jsonc b/.fallowrc.jsonc index 30e0a27aa5..c0de88c13f 100644 --- a/.fallowrc.jsonc +++ b/.fallowrc.jsonc @@ -29,6 +29,8 @@ // Worker entry points loaded dynamically by their *Pool.ts companions. "packages/producer/src/services/pngDecodeBlitWorker.ts", "packages/producer/src/services/shaderTransitionWorker.ts", + // Off-main-thread /health endpoint, spawned by path from healthWorker.ts. + "packages/producer/src/services/healthWorkerThread.ts", // Test fixture worker, spawned by path via the pools' workerEntryPath // option from the crash-recovery tests; has no import-graph referrer. "packages/producer/src/services/__fixtures__/crashOnMessageWorker.mjs", diff --git a/packages/producer/build.mjs b/packages/producer/build.mjs index 50f5daee1a..b494ef7a3c 100644 --- a/packages/producer/build.mjs +++ b/packages/producer/build.mjs @@ -70,6 +70,11 @@ await Promise.all([ entryPoints: ["src/services/shaderTransitionWorker.ts"], outfile: "dist/services/shaderTransitionWorker.js", }), + build({ + ...sharedOpts, + entryPoints: ["src/services/healthWorkerThread.ts"], + outfile: "dist/services/healthWorkerThread.js", + }), build({ ...sharedOpts, entryPoints: ["src/distributed.ts"], outfile: "dist/distributed.js" }), ]); diff --git a/packages/producer/src/server.ts b/packages/producer/src/server.ts index 97de4d8731..51d8bd9c95 100644 --- a/packages/producer/src/server.ts +++ b/packages/producer/src/server.ts @@ -36,6 +36,7 @@ import { type RenderConfig, } from "./services/renderOrchestrator.js"; import { prepareHyperframeLintBody, runHyperframeLint } from "./services/hyperframeLint.js"; +import { startHealthWorker, type HealthWorkerHandle } from "./services/healthWorker.js"; import { isVideoFrameFormat } from "@hyperframes/engine"; import { resolveRenderPaths } from "./utils/paths.js"; import { defaultLogger, type ProducerLogger } from "./logger.js"; @@ -738,10 +739,47 @@ export function startServer(options: ServerOptions = {}) { (server as unknown as import("node:http").Server).requestTimeout = 0; (server as unknown as import("node:http").Server).keepAliveTimeout = 0; + // Start the worker-thread health endpoint alongside the main listener. + // The main thread keeps serving /health on `port` for backwards + // compatibility; the worker thread additionally serves /health on + // PRODUCER_HEALTH_PORT (default 9848) so k8s liveness/readiness probes can + // migrate to a listener that doesn't share an event loop with renders. + // + // Opt-out: set PRODUCER_DISABLE_HEALTH_WORKER=1 (e.g. for tests that don't + // want a worker spawned, or for environments where the extra port isn't + // wanted). + // + // We store the *promise* (not the resolved handle) so a SIGTERM that + // arrives before the worker has finished booting still has something to + // await. Awaiting a `let healthWorker = null` mutated from inside `.then` + // would race: if SIGTERM lands before the `.then` callback fires, + // `shutdown()` sees `null` and skips worker cleanup. The promise pattern + // closes that window without making startup blocking. + const healthWorkerPromise: Promise = + process.env.PRODUCER_DISABLE_HEALTH_WORKER === "1" + ? Promise.resolve(null) + : startHealthWorker({ logger: log }).catch((err: Error) => { + // Don't crash the producer if the worker fails to start — the main + // /health is still up. Log loudly so the operator notices. + log.error(`[server] health worker failed to start: ${err.message}`); + return null; + }); + async function shutdown(signal: string) { log.info(`Received ${signal}, shutting down`); const { drainBrowserPool } = await import("@hyperframes/engine"); await drainBrowserPool().catch(() => {}); + // Bounded await: if the worker hasn't come online within 1.5s of + // shutdown there's no useful cleanup left to do — `worker.terminate()` + // from process exit will kill the thread regardless, and we'd rather + // not let a hung-startup worker keep the SIGTERM path waiting. + const handle = await Promise.race([ + healthWorkerPromise, + new Promise((res) => setTimeout(() => res(null), 1_500).unref()), + ]); + if (handle) { + await handle.shutdown().catch(() => {}); + } server.close(() => { log.info("Server closed"); process.exit(0); diff --git a/packages/producer/src/services/healthWorker.test.ts b/packages/producer/src/services/healthWorker.test.ts new file mode 100644 index 0000000000..b7b2b1d5c5 --- /dev/null +++ b/packages/producer/src/services/healthWorker.test.ts @@ -0,0 +1,131 @@ +/** + * Tests for the worker_thread /health endpoint. + * + * The pin: the listener answers from a worker_thread, so it stays responsive + * even if the main thread is blocked on a sync task. We verify: + * + * 1. Boot: startHealthWorker resolves once the worker is listening, and the + * endpoint returns 200 + the expected JSON body. + * 2. Liveness across main-thread block: while the main thread is blocked on + * a long synchronous loop, /health still answers within a tight budget. + * This is the property k8s probes need: probe responsiveness is + * decoupled from main-thread event-loop latency. + * 3. Shutdown: the handle's shutdown() actually frees the port (subsequent + * boot on the same port succeeds). + */ + +import { afterEach, describe, expect, it } from "vitest"; +import { fileURLToPath } from "node:url"; +import { dirname, resolve } from "node:path"; +import { request as httpRequest } from "node:http"; +import { startHealthWorker, type HealthWorkerHandle } from "./healthWorker.js"; + +// Use a fixed test port well above the producer's default port range. +const TEST_PORT = 19848; + +// Resolve the worker entry from this test file so vitest (tsx-driven) loads +// the .ts source rather than looking for a non-existent dist/ artifact. +const HERE = dirname(fileURLToPath(import.meta.url)); +const WORKER_ENTRY = resolve(HERE, "healthWorkerThread.ts"); + +async function fetchHealth( + port: number, + timeoutMs = 1_000, +): Promise<{ status: number; body: string }> { + return new Promise((resolveFetch, rejectFetch) => { + const req = httpRequest( + { + host: "127.0.0.1", + port, + path: "/health", + method: "GET", + timeout: timeoutMs, + }, + (res) => { + let body = ""; + res.on("data", (chunk) => { + body += chunk; + }); + res.on("end", () => resolveFetch({ status: res.statusCode ?? 0, body })); + }, + ); + req.on("timeout", () => { + req.destroy(new Error(`request timed out after ${timeoutMs}ms`)); + }); + req.on("error", rejectFetch); + req.end(); + }); +} + +describe("healthWorker", () => { + const handles: HealthWorkerHandle[] = []; + + afterEach(async () => { + while (handles.length > 0) { + const h = handles.pop()!; + await h.shutdown().catch(() => {}); + } + }); + + it("boots and serves /health from the worker thread", async () => { + const handle = await startHealthWorker({ + port: TEST_PORT, + workerEntry: WORKER_ENTRY, + logger: { info: () => {}, warn: () => {}, error: () => {} }, + }); + handles.push(handle); + expect(handle.port).toBe(TEST_PORT); + + const res = await fetchHealth(TEST_PORT); + expect(res.status).toBe(200); + const json = JSON.parse(res.body); + expect(json.status).toBe("ok"); + expect(json.thread).toBe("worker"); + expect(typeof json.uptime).toBe("number"); + expect(typeof json.timestamp).toBe("string"); + }); + + it("stays responsive while the main thread is blocked on a sync task", async () => { + const handle = await startHealthWorker({ + port: TEST_PORT, + workerEntry: WORKER_ENTRY, + logger: { info: () => {}, warn: () => {}, error: () => {} }, + }); + handles.push(handle); + + // Kick off a request, then block the main thread synchronously for 500ms. + // If /health lived on the main thread, the response wouldn't arrive + // until after the block finishes. Because it lives on a worker_thread, + // the worker's socket accept loop has already started responding. + const fetchPromise = fetchHealth(TEST_PORT, 2_000); + const blockStart = Date.now(); + // Busy-spin without yielding to the event loop. 500ms is well over the + // 5s prod probe timeout's "0.1x" budget; a 1s budget on the fetch is + // generous. + // eslint-disable-next-line no-empty + while (Date.now() - blockStart < 500) {} + const res = await fetchPromise; + expect(res.status).toBe(200); + const json = JSON.parse(res.body); + expect(json.status).toBe("ok"); + }); + + it("releases the port on shutdown", async () => { + const first = await startHealthWorker({ + port: TEST_PORT, + workerEntry: WORKER_ENTRY, + logger: { info: () => {}, warn: () => {}, error: () => {} }, + }); + await first.shutdown(); + + // Booting again on the same port should succeed. + const second = await startHealthWorker({ + port: TEST_PORT, + workerEntry: WORKER_ENTRY, + logger: { info: () => {}, warn: () => {}, error: () => {} }, + }); + handles.push(second); + const res = await fetchHealth(TEST_PORT); + expect(res.status).toBe(200); + }); +}); diff --git a/packages/producer/src/services/healthWorker.ts b/packages/producer/src/services/healthWorker.ts new file mode 100644 index 0000000000..71535bc27c --- /dev/null +++ b/packages/producer/src/services/healthWorker.ts @@ -0,0 +1,186 @@ +/** + * Worker-thread health endpoint. + * + * Runs an HTTP server (`/health`, returns 200 OK with uptime+timestamp JSON) + * on a *separate* Node worker_thread so probe responses don't depend on the + * main event loop. The main thread can be deep in a long-running synchronous + * task (Chrome teardown, large file I/O, GC pause, the post-Miguel guard + * "impossible duration" math, etc.) and this endpoint still answers within + * milliseconds because it lives in a different V8 isolate with its own + * event loop. + * + * Why this matters + * ---------------- + * + * The producer sidecar's k8s `livenessProbe` / `readinessProbe` hit `/health` + * on the Hono server in the main thread. Any synchronous stall longer than + * the probe `timeoutSeconds` (5s in prod prior to the companion change) + * triggers a SIGKILL even when the process is still alive — just busy. + * + * Today's incident (2026-06-26): an infinite GSAP timeline caused the + * distributed planner to try to enumerate ~300_000_000_000 frames, and + * the sidecar got killed mid-arithmetic. Miguel's upstream `plan()` + * duration guard kills that input class at the source. This module is + * defense-in-depth: future wedge classes (sync I/O on video-heavy comps, + * runaway loops, GC pauses) shouldn't kill an otherwise-alive pod either. + * + * Contract + * -------- + * + * - The worker thread binds an HTTP listener on + * `PRODUCER_HEALTH_PORT` (default 9848) for `/health` only. + * - Liveness in the worker thread = "the worker_thread itself is responsive", + * which is a strict subset of process liveness. If the entire Node process + * is dead the OS tears down both threads' sockets simultaneously, so the + * worker_thread's listener stops answering and k8s correctly kills the pod. + * - Listening on `0.0.0.0` is intentional: this is the probe entry point and + * the sidecar already exposes other ports for in-pod traffic only. The + * endpoint returns the same shape the main-thread `/health` always returned + * so existing observability keeps working. + * + * The main thread still serves `/health` on the main port (9847) for + * backwards compatibility; k8s probe config in `heygen-com/app` can migrate + * to the worker port at its own pace. + */ + +import { Worker } from "node:worker_threads"; +import { fileURLToPath } from "node:url"; +import { dirname, join } from "node:path"; +import { existsSync } from "node:fs"; + +export interface HealthWorkerOptions { + /** Port the worker_thread health endpoint listens on. Default 9848 / env. */ + port?: number; + /** + * Optional logger; falls back to console. Note: the worker thread itself + * cannot use the parent's logger directly (separate isolate), so it logs + * via `console`. The handle returned here uses the provided logger for + * lifecycle events on the *main* thread. + */ + logger?: { + info?: (msg: string, meta?: Record) => void; + warn?: (msg: string, meta?: Record) => void; + error?: (msg: string, meta?: Record) => void; + }; + /** + * Override worker entry module. Falls back to a co-located + * `healthWorkerThread.js` (post-build) or `.ts` (dev/test). + */ + workerEntry?: string; +} + +export interface HealthWorkerHandle { + /** Port the listener is bound to (resolved). */ + port: number; + /** Stop the listener + terminate the worker thread. Idempotent. */ + shutdown: () => Promise; +} + +const DEFAULT_HEALTH_PORT = 9848; + +/** + * Spawn the health worker_thread. Returns once the worker reports its + * listener is up (or rejects if the worker fails to start). + */ +export async function startHealthWorker( + options: HealthWorkerOptions = {}, +): Promise { + const log = options.logger ?? defaultLogger(); + const port = + options.port ?? parseInt(process.env.PRODUCER_HEALTH_PORT ?? String(DEFAULT_HEALTH_PORT), 10); + + const entry = options.workerEntry ?? resolveWorkerEntry(); + if (!entry) { + throw new Error( + "[healthWorker] could not resolve worker entry. " + + "Pass options.workerEntry or ensure healthWorkerThread.{js,ts} is co-located.", + ); + } + + const worker = new Worker(entry, { + workerData: { port }, + // Keep stdio inherited so the worker's console logs land in the pod logs + // alongside the main thread's. + stdout: false, + stderr: false, + }); + + // Wait for the worker to report "listening" before resolving, so callers + // can be sure the probe endpoint is actually up. + await new Promise((resolve, reject) => { + const onMessage = (msg: { type: string; error?: string }) => { + if (msg?.type === "listening") { + worker.off("error", onError); + worker.off("message", onMessage); + resolve(); + } else if (msg?.type === "listen-error") { + worker.off("error", onError); + worker.off("message", onMessage); + reject(new Error(`[healthWorker] failed to bind port ${port}: ${msg.error}`)); + } + }; + const onError = (err: Error) => { + worker.off("error", onError); + worker.off("message", onMessage); + reject(err); + }; + worker.on("message", onMessage); + worker.on("error", onError); + }); + + log.info?.(`[healthWorker] /health listening on worker thread, port ${port}`); + + let shutdownPromise: Promise | null = null; + const shutdown = (): Promise => { + if (shutdownPromise) return shutdownPromise; + shutdownPromise = (async () => { + try { + worker.postMessage({ type: "shutdown" }); + // Give the worker a beat to close its server cleanly. terminate() + // is the hard backstop. + await Promise.race([ + new Promise((res) => worker.once("exit", () => res())), + new Promise((res) => setTimeout(res, 2_000)), + ]); + } finally { + await worker.terminate().catch(() => {}); + } + })(); + return shutdownPromise; + }; + + // If the worker crashes unexpectedly, log loudly. We don't auto-respawn + // here — the k8s probe will catch a dead listener and the pod will be + // restarted, which is the right behavior for a truly-broken process. + worker.on("error", (err: Error) => { + log.error?.(`[healthWorker] worker thread error: ${err.message}`); + }); + worker.on("exit", (code) => { + if (code !== 0) { + log.warn?.(`[healthWorker] worker thread exited with code ${code}`); + } + }); + + return { port, shutdown }; +} + +function defaultLogger() { + return { + info: (msg: string) => console.log(msg), + warn: (msg: string) => console.warn(msg), + error: (msg: string) => console.error(msg), + }; +} + +/** + * Try to find the co-located worker entry file. Prefer the compiled `.js` + * (production) and fall back to the `.ts` source (dev / vitest via tsx). + */ +function resolveWorkerEntry(): string | undefined { + const here = dirname(fileURLToPath(import.meta.url)); + const candidates = [join(here, "healthWorkerThread.js"), join(here, "healthWorkerThread.ts")]; + for (const candidate of candidates) { + if (existsSync(candidate)) return candidate; + } + return undefined; +} diff --git a/packages/producer/src/services/healthWorkerThread.ts b/packages/producer/src/services/healthWorkerThread.ts new file mode 100644 index 0000000000..33b8a6694d --- /dev/null +++ b/packages/producer/src/services/healthWorkerThread.ts @@ -0,0 +1,84 @@ +/** + * Worker-thread entry for the off-main-thread /health endpoint. + * + * Runs a minimal `node:http` server bound to `workerData.port` that answers + * `GET /health` with `{ status, uptime, timestamp, thread: "worker" }`. + * + * Lifecycle: + * + * - On startup, binds the port and posts `{ type: "listening" }` to the + * parent. If `listen()` errors (port in use, permission denied), posts + * `{ type: "listen-error", error }` and exits non-zero. + * - On `{ type: "shutdown" }` from the parent, closes the server and exits 0. + * - All other requests (path or method other than `GET /health`) get 404. + * + * The HTTP layer is intentionally raw — no framework, zero deps beyond the + * Node stdlib — so the worker's surface is small and its event loop has + * nothing to block on except its own socket accept loop. + */ + +import { createServer, type IncomingMessage, type ServerResponse } from "node:http"; +import { parentPort, workerData } from "node:worker_threads"; + +const startTime = Date.now(); +const port = Number(workerData?.port ?? 9848); + +if (!parentPort) { + // Defensive — this module is only meaningful inside a worker_thread. + throw new Error("[healthWorkerThread] must run inside a worker_thread"); +} + +const server = createServer((req: IncomingMessage, res: ServerResponse) => { + if (req.method === "GET" && (req.url === "/health" || req.url?.startsWith("/health?"))) { + const body = JSON.stringify({ + status: "ok", + uptime: Math.floor((Date.now() - startTime) / 1000), + timestamp: new Date().toISOString(), + thread: "worker", + }); + res.writeHead(200, { + "Content-Type": "application/json", + "Content-Length": Buffer.byteLength(body), + "Cache-Control": "no-store", + }); + res.end(body); + return; + } + res.writeHead(404, { "Content-Type": "text/plain" }); + res.end("Not found"); +}); + +// Disable timeouts — `/health` is a fast endpoint, but if the kernel is +// slow to flush at process-tear-down time we'd rather respond than time out. +server.keepAliveTimeout = 0; +server.requestTimeout = 0; +server.headersTimeout = 0; + +server.on("error", (err: NodeJS.ErrnoException) => { + parentPort?.postMessage({ + type: "listen-error", + error: `${err.code ?? "unknown"}: ${err.message}`, + }); + // Give the parent a beat to receive the error, then close our message + // channel so the worker thread exits naturally. process.exit() inside a + // worker has had inconsistent semantics across Node versions; closing + // parentPort + letting the event loop drain is the documented clean path + // and lets the parent's `exit` listener fire with the natural code. + setTimeout(() => parentPort?.close(), 50); +}); + +server.listen(port, "0.0.0.0", () => { + parentPort?.postMessage({ type: "listening", port }); +}); + +parentPort.on("message", (msg: { type?: string }) => { + if (msg?.type === "shutdown") { + // Close the server and the parent-port message channel. The parent + // owns the authoritative shutdown timeout (see healthWorker.ts — + // Promise.race against a 2s clock then worker.terminate()), so we do + // NOT need a redundant force-exit timer here: if server.close() hangs + // on a lingering keep-alive socket, the parent's terminate() lands + // first and kills the worker. Single source of truth for the deadline. + server.close(() => parentPort?.close()); + } +});