Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .fallowrc.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
// Worker entry points loaded dynamically by their *Pool.ts companions.
"packages/producer/src/services/pngDecodeBlitWorker.ts",
"packages/producer/src/services/shaderTransitionWorker.ts",
// Off-main-thread /health endpoint, spawned by path from healthWorker.ts.
"packages/producer/src/services/healthWorkerThread.ts",
// Test fixture worker, spawned by path via the pools' workerEntryPath
// option from the crash-recovery tests; has no import-graph referrer.
"packages/producer/src/services/__fixtures__/crashOnMessageWorker.mjs",
Expand Down
5 changes: 5 additions & 0 deletions packages/producer/build.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ await Promise.all([
entryPoints: ["src/services/shaderTransitionWorker.ts"],
outfile: "dist/services/shaderTransitionWorker.js",
}),
build({
...sharedOpts,
entryPoints: ["src/services/healthWorkerThread.ts"],
outfile: "dist/services/healthWorkerThread.js",
}),
build({ ...sharedOpts, entryPoints: ["src/distributed.ts"], outfile: "dist/distributed.js" }),
]);

Expand Down
38 changes: 38 additions & 0 deletions packages/producer/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
type RenderConfig,
} from "./services/renderOrchestrator.js";
import { prepareHyperframeLintBody, runHyperframeLint } from "./services/hyperframeLint.js";
import { startHealthWorker, type HealthWorkerHandle } from "./services/healthWorker.js";
import { isVideoFrameFormat } from "@hyperframes/engine";
import { resolveRenderPaths } from "./utils/paths.js";
import { defaultLogger, type ProducerLogger } from "./logger.js";
Expand Down Expand Up @@ -738,10 +739,47 @@ export function startServer(options: ServerOptions = {}) {
(server as unknown as import("node:http").Server).requestTimeout = 0;
(server as unknown as import("node:http").Server).keepAliveTimeout = 0;

// Start the worker-thread health endpoint alongside the main listener.
// The main thread keeps serving /health on `port` for backwards
// compatibility; the worker thread additionally serves /health on
// PRODUCER_HEALTH_PORT (default 9848) so k8s liveness/readiness probes can
// migrate to a listener that doesn't share an event loop with renders.
//
// Opt-out: set PRODUCER_DISABLE_HEALTH_WORKER=1 (e.g. for tests that don't
// want a worker spawned, or for environments where the extra port isn't
// wanted).
//
// We store the *promise* (not the resolved handle) so a SIGTERM that
// arrives before the worker has finished booting still has something to
// await. Awaiting a `let healthWorker = null` mutated from inside `.then`
// would race: if SIGTERM lands before the `.then` callback fires,
// `shutdown()` sees `null` and skips worker cleanup. The promise pattern
// closes that window without making startup blocking.
const healthWorkerPromise: Promise<HealthWorkerHandle | null> =
process.env.PRODUCER_DISABLE_HEALTH_WORKER === "1"
? Promise.resolve(null)
: startHealthWorker({ logger: log }).catch((err: Error) => {
// Don't crash the producer if the worker fails to start — the main
// /health is still up. Log loudly so the operator notices.
log.error(`[server] health worker failed to start: ${err.message}`);
return null;
});

async function shutdown(signal: string) {
log.info(`Received ${signal}, shutting down`);
const { drainBrowserPool } = await import("@hyperframes/engine");
await drainBrowserPool().catch(() => {});
// Bounded await: if the worker hasn't come online within 1.5s of
// shutdown there's no useful cleanup left to do — `worker.terminate()`
// from process exit will kill the thread regardless, and we'd rather
// not let a hung-startup worker keep the SIGTERM path waiting.
const handle = await Promise.race<HealthWorkerHandle | null>([
healthWorkerPromise,
new Promise<null>((res) => setTimeout(() => res(null), 1_500).unref()),
]);
if (handle) {
await handle.shutdown().catch(() => {});
}
server.close(() => {
log.info("Server closed");
process.exit(0);
Expand Down
131 changes: 131 additions & 0 deletions packages/producer/src/services/healthWorker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
* Tests for the worker_thread /health endpoint.
*
* The pin: the listener answers from a worker_thread, so it stays responsive
* even if the main thread is blocked on a sync task. We verify:
*
* 1. Boot: startHealthWorker resolves once the worker is listening, and the
* endpoint returns 200 + the expected JSON body.
* 2. Liveness across main-thread block: while the main thread is blocked on
* a long synchronous loop, /health still answers within a tight budget.
* This is the property k8s probes need: probe responsiveness is
* decoupled from main-thread event-loop latency.
* 3. Shutdown: the handle's shutdown() actually frees the port (subsequent
* boot on the same port succeeds).
*/

import { afterEach, describe, expect, it } from "vitest";
import { fileURLToPath } from "node:url";
import { dirname, resolve } from "node:path";
import { request as httpRequest } from "node:http";
import { startHealthWorker, type HealthWorkerHandle } from "./healthWorker.js";

// Use a fixed test port well above the producer's default port range.
const TEST_PORT = 19848;

// Resolve the worker entry from this test file so vitest (tsx-driven) loads
// the .ts source rather than looking for a non-existent dist/ artifact.
const HERE = dirname(fileURLToPath(import.meta.url));
const WORKER_ENTRY = resolve(HERE, "healthWorkerThread.ts");

async function fetchHealth(
port: number,
timeoutMs = 1_000,
): Promise<{ status: number; body: string }> {
return new Promise((resolveFetch, rejectFetch) => {
const req = httpRequest(
{
host: "127.0.0.1",
port,
path: "/health",
method: "GET",
timeout: timeoutMs,
},
(res) => {
let body = "";
res.on("data", (chunk) => {
body += chunk;
});
res.on("end", () => resolveFetch({ status: res.statusCode ?? 0, body }));
},
);
req.on("timeout", () => {
req.destroy(new Error(`request timed out after ${timeoutMs}ms`));
});
req.on("error", rejectFetch);
req.end();
});
}

describe("healthWorker", () => {
const handles: HealthWorkerHandle[] = [];

afterEach(async () => {
while (handles.length > 0) {
const h = handles.pop()!;
await h.shutdown().catch(() => {});
}
});

it("boots and serves /health from the worker thread", async () => {
const handle = await startHealthWorker({
port: TEST_PORT,
workerEntry: WORKER_ENTRY,
logger: { info: () => {}, warn: () => {}, error: () => {} },
});
handles.push(handle);
expect(handle.port).toBe(TEST_PORT);

const res = await fetchHealth(TEST_PORT);
expect(res.status).toBe(200);
const json = JSON.parse(res.body);
expect(json.status).toBe("ok");
expect(json.thread).toBe("worker");
expect(typeof json.uptime).toBe("number");
expect(typeof json.timestamp).toBe("string");
});

it("stays responsive while the main thread is blocked on a sync task", async () => {
const handle = await startHealthWorker({
port: TEST_PORT,
workerEntry: WORKER_ENTRY,
logger: { info: () => {}, warn: () => {}, error: () => {} },
});
handles.push(handle);

// Kick off a request, then block the main thread synchronously for 500ms.
// If /health lived on the main thread, the response wouldn't arrive
// until after the block finishes. Because it lives on a worker_thread,
// the worker's socket accept loop has already started responding.
const fetchPromise = fetchHealth(TEST_PORT, 2_000);
const blockStart = Date.now();
// Busy-spin without yielding to the event loop. 500ms is well over the
// 5s prod probe timeout's "0.1x" budget; a 1s budget on the fetch is
// generous.
// eslint-disable-next-line no-empty
while (Date.now() - blockStart < 500) {}
const res = await fetchPromise;
expect(res.status).toBe(200);
const json = JSON.parse(res.body);
expect(json.status).toBe("ok");
});

it("releases the port on shutdown", async () => {
const first = await startHealthWorker({
port: TEST_PORT,
workerEntry: WORKER_ENTRY,
logger: { info: () => {}, warn: () => {}, error: () => {} },
});
await first.shutdown();

// Booting again on the same port should succeed.
const second = await startHealthWorker({
port: TEST_PORT,
workerEntry: WORKER_ENTRY,
logger: { info: () => {}, warn: () => {}, error: () => {} },
});
handles.push(second);
const res = await fetchHealth(TEST_PORT);
expect(res.status).toBe(200);
});
});
186 changes: 186 additions & 0 deletions packages/producer/src/services/healthWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/**
* Worker-thread health endpoint.
*
* Runs an HTTP server (`/health`, returns 200 OK with uptime+timestamp JSON)
* on a *separate* Node worker_thread so probe responses don't depend on the
* main event loop. The main thread can be deep in a long-running synchronous
* task (Chrome teardown, large file I/O, GC pause, the post-Miguel guard
* "impossible duration" math, etc.) and this endpoint still answers within
* milliseconds because it lives in a different V8 isolate with its own
* event loop.
*
* Why this matters
* ----------------
*
* The producer sidecar's k8s `livenessProbe` / `readinessProbe` hit `/health`
* on the Hono server in the main thread. Any synchronous stall longer than
* the probe `timeoutSeconds` (5s in prod prior to the companion change)
* triggers a SIGKILL even when the process is still alive — just busy.
*
* Today's incident (2026-06-26): an infinite GSAP timeline caused the
* distributed planner to try to enumerate ~300_000_000_000 frames, and
* the sidecar got killed mid-arithmetic. Miguel's upstream `plan()`
* duration guard kills that input class at the source. This module is
* defense-in-depth: future wedge classes (sync I/O on video-heavy comps,
* runaway loops, GC pauses) shouldn't kill an otherwise-alive pod either.
*
* Contract
* --------
*
* - The worker thread binds an HTTP listener on
* `PRODUCER_HEALTH_PORT` (default 9848) for `/health` only.
* - Liveness in the worker thread = "the worker_thread itself is responsive",
* which is a strict subset of process liveness. If the entire Node process
* is dead the OS tears down both threads' sockets simultaneously, so the
* worker_thread's listener stops answering and k8s correctly kills the pod.
* - Listening on `0.0.0.0` is intentional: this is the probe entry point and
* the sidecar already exposes other ports for in-pod traffic only. The
* endpoint returns the same shape the main-thread `/health` always returned
* so existing observability keeps working.
*
* The main thread still serves `/health` on the main port (9847) for
* backwards compatibility; k8s probe config in `heygen-com/app` can migrate
* to the worker port at its own pace.
*/

import { Worker } from "node:worker_threads";
import { fileURLToPath } from "node:url";
import { dirname, join } from "node:path";
import { existsSync } from "node:fs";

export interface HealthWorkerOptions {
/** Port the worker_thread health endpoint listens on. Default 9848 / env. */
port?: number;
/**
* Optional logger; falls back to console. Note: the worker thread itself
* cannot use the parent's logger directly (separate isolate), so it logs
* via `console`. The handle returned here uses the provided logger for
* lifecycle events on the *main* thread.
*/
logger?: {
info?: (msg: string, meta?: Record<string, unknown>) => void;
warn?: (msg: string, meta?: Record<string, unknown>) => void;
error?: (msg: string, meta?: Record<string, unknown>) => void;
};
/**
* Override worker entry module. Falls back to a co-located
* `healthWorkerThread.js` (post-build) or `.ts` (dev/test).
*/
workerEntry?: string;
}

export interface HealthWorkerHandle {
/** Port the listener is bound to (resolved). */
port: number;
/** Stop the listener + terminate the worker thread. Idempotent. */
shutdown: () => Promise<void>;
}

const DEFAULT_HEALTH_PORT = 9848;

/**
* Spawn the health worker_thread. Returns once the worker reports its
* listener is up (or rejects if the worker fails to start).
*/
export async function startHealthWorker(
options: HealthWorkerOptions = {},
): Promise<HealthWorkerHandle> {
const log = options.logger ?? defaultLogger();
const port =
options.port ?? parseInt(process.env.PRODUCER_HEALTH_PORT ?? String(DEFAULT_HEALTH_PORT), 10);

const entry = options.workerEntry ?? resolveWorkerEntry();
if (!entry) {
throw new Error(
"[healthWorker] could not resolve worker entry. " +
"Pass options.workerEntry or ensure healthWorkerThread.{js,ts} is co-located.",
);
}

const worker = new Worker(entry, {
workerData: { port },
// Keep stdio inherited so the worker's console logs land in the pod logs
// alongside the main thread's.
stdout: false,
stderr: false,
});

// Wait for the worker to report "listening" before resolving, so callers
// can be sure the probe endpoint is actually up.
await new Promise<void>((resolve, reject) => {
const onMessage = (msg: { type: string; error?: string }) => {
if (msg?.type === "listening") {
worker.off("error", onError);
worker.off("message", onMessage);
resolve();
} else if (msg?.type === "listen-error") {
worker.off("error", onError);
worker.off("message", onMessage);
reject(new Error(`[healthWorker] failed to bind port ${port}: ${msg.error}`));
}
};
const onError = (err: Error) => {
worker.off("error", onError);
worker.off("message", onMessage);
reject(err);
};
worker.on("message", onMessage);
worker.on("error", onError);
});

log.info?.(`[healthWorker] /health listening on worker thread, port ${port}`);

let shutdownPromise: Promise<void> | null = null;
const shutdown = (): Promise<void> => {
if (shutdownPromise) return shutdownPromise;
shutdownPromise = (async () => {
try {
worker.postMessage({ type: "shutdown" });
// Give the worker a beat to close its server cleanly. terminate()
// is the hard backstop.
await Promise.race([
new Promise<void>((res) => worker.once("exit", () => res())),
new Promise<void>((res) => setTimeout(res, 2_000)),
]);
} finally {
await worker.terminate().catch(() => {});
}
})();
return shutdownPromise;
};

// If the worker crashes unexpectedly, log loudly. We don't auto-respawn
// here — the k8s probe will catch a dead listener and the pod will be
// restarted, which is the right behavior for a truly-broken process.
worker.on("error", (err: Error) => {
log.error?.(`[healthWorker] worker thread error: ${err.message}`);
});
worker.on("exit", (code) => {
if (code !== 0) {
log.warn?.(`[healthWorker] worker thread exited with code ${code}`);
}
});

return { port, shutdown };
}

function defaultLogger() {
return {
info: (msg: string) => console.log(msg),
warn: (msg: string) => console.warn(msg),
error: (msg: string) => console.error(msg),
};
}

/**
* Try to find the co-located worker entry file. Prefer the compiled `.js`
* (production) and fall back to the `.ts` source (dev / vitest via tsx).
*/
function resolveWorkerEntry(): string | undefined {
const here = dirname(fileURLToPath(import.meta.url));
const candidates = [join(here, "healthWorkerThread.js"), join(here, "healthWorkerThread.ts")];
for (const candidate of candidates) {
if (existsSync(candidate)) return candidate;
}
return undefined;
}
Loading
Loading