From 34777eae90917d6a5cdd37d74a293740fc642895 Mon Sep 17 00:00:00 2001 From: Daniel Genis Date: Thu, 25 Jun 2026 11:50:19 +0200 Subject: [PATCH] feat: re-register repeatable jobs on Redis reconnect Repeatable BullMQ jobs (poll ticks for pr-watcher, repo-cleanup, ticket-sync, workflow-trigger-checker, token-validation, reconcile-resync) were registered only at API boot. A Redis restart/flush wipes the repeat schedulers, so the pollers silently stop until the API is restarted. Add a single source of truth (REPEATABLE_JOBS) plus idempotent ensureRepeatJobs(), and a dedicated monitor connection that re-registers them on every reconnect (debounced). Workers no longer self-register inline, so boot and reconnect share the same definitions. --- apps/api/src/index.ts | 13 ++- .../src/services/repeat-job-monitor.test.ts | 79 +++++++++++++++++ apps/api/src/services/repeat-job-monitor.ts | 58 +++++++++++++ apps/api/src/services/repeatable-jobs.test.ts | 60 +++++++++++++ apps/api/src/services/repeatable-jobs.ts | 85 +++++++++++++++++++ apps/api/src/workers/pr-watcher-worker.ts | 12 +-- apps/api/src/workers/reconcile-worker.ts | 10 --- apps/api/src/workers/repo-cleanup-worker.ts | 21 ----- apps/api/src/workers/ticket-sync-worker.ts | 12 --- .../src/workers/token-validation-worker.ts | 12 --- .../src/workers/workflow-trigger-worker.ts | 11 --- 11 files changed, 295 insertions(+), 78 deletions(-) create mode 100644 apps/api/src/services/repeat-job-monitor.test.ts create mode 100644 apps/api/src/services/repeat-job-monitor.ts create mode 100644 apps/api/src/services/repeatable-jobs.test.ts create mode 100644 apps/api/src/services/repeatable-jobs.ts diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index e960d0f4..df28fb5f 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -193,7 +193,11 @@ async function main() { cleanRepeatJobs("reconcile-resync"), ]); - // Start BullMQ workers (each re-registers its repeat job) + // Register all repeatable jobs from the single source of truth + const { ensureRepeatJobs } = await import("./services/repeatable-jobs.js"); + await ensureRepeatJobs(); + + // Start BullMQ workers const worker = startTaskWorker(); logger.info("Task worker started"); @@ -223,6 +227,12 @@ async function main() { const reconcileResyncWorker = startReconcileResyncWorker(); logger.info("Reconcile workers started"); + // Re-register repeatable jobs after a Redis reconnect (e.g. Redis pod restart) + const { startRepeatJobMonitor, stopRepeatJobMonitor } = + await import("./services/repeat-job-monitor.js"); + startRepeatJobMonitor(); + logger.info("Repeat-job monitor started"); + // Check if metrics-server is available checkMetricsServer().catch(() => {}); @@ -235,6 +245,7 @@ async function main() { // Graceful shutdown const shutdown = async () => { logger.info("Shutting down..."); + await stopRepeatJobMonitor(); await worker.close(); await ticketSyncWorker.close(); await repoCleanupWorker.close(); diff --git a/apps/api/src/services/repeat-job-monitor.test.ts b/apps/api/src/services/repeat-job-monitor.test.ts new file mode 100644 index 00000000..6ce1dee1 --- /dev/null +++ b/apps/api/src/services/repeat-job-monitor.test.ts @@ -0,0 +1,79 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; + +const handlers: Record void> = {}; +const quitMock = vi.fn().mockResolvedValue("OK"); + +const mockRedis = { + on: vi.fn((event: string, cb: (...args: unknown[]) => void) => { + handlers[event] = cb; + return mockRedis; + }), + quit: quitMock, +}; + +vi.mock("ioredis", () => ({ + Redis: vi.fn().mockImplementation(() => mockRedis), +})); + +vi.mock("./redis-config.js", () => ({ + redisConnectionUrl: "redis://localhost:6379", + redisTlsOptions: undefined, +})); + +const ensureRepeatJobsMock = vi.fn().mockResolvedValue(undefined); +vi.mock("./repeatable-jobs.js", () => ({ + ensureRepeatJobs: (...args: unknown[]) => ensureRepeatJobsMock(...args), +})); + +vi.mock("../logger.js", () => ({ + logger: { info: vi.fn(), debug: vi.fn(), warn: vi.fn(), error: vi.fn() }, +})); + +import { startRepeatJobMonitor, stopRepeatJobMonitor } from "./repeat-job-monitor.js"; + +describe("repeat-job-monitor", () => { + beforeEach(() => { + vi.useFakeTimers(); + ensureRepeatJobsMock.mockClear(); + for (const key of Object.keys(handlers)) delete handlers[key]; + }); + + afterEach(async () => { + await stopRepeatJobMonitor(); + vi.useRealTimers(); + }); + + it("ignores the first ready (initial connect)", () => { + startRepeatJobMonitor(); + handlers.ready(); + vi.advanceTimersByTime(5000); + expect(ensureRepeatJobsMock).not.toHaveBeenCalled(); + }); + + it("re-registers jobs on a reconnect (later ready), after debounce", () => { + startRepeatJobMonitor(); + handlers.ready(); // initial connect + handlers.ready(); // reconnect + + expect(ensureRepeatJobsMock).not.toHaveBeenCalled(); // debounce not elapsed + vi.advanceTimersByTime(2000); + expect(ensureRepeatJobsMock).toHaveBeenCalledTimes(1); + }); + + it("debounces a burst of reconnects into a single re-registration", () => { + startRepeatJobMonitor(); + handlers.ready(); // initial connect + handlers.ready(); + handlers.ready(); + handlers.ready(); + + vi.advanceTimersByTime(2000); + expect(ensureRepeatJobsMock).toHaveBeenCalledTimes(1); + }); + + it("stop quits the connection", async () => { + startRepeatJobMonitor(); + await stopRepeatJobMonitor(); + expect(quitMock).toHaveBeenCalled(); + }); +}); diff --git a/apps/api/src/services/repeat-job-monitor.ts b/apps/api/src/services/repeat-job-monitor.ts new file mode 100644 index 00000000..d14ca64e --- /dev/null +++ b/apps/api/src/services/repeat-job-monitor.ts @@ -0,0 +1,58 @@ +import { Redis } from "ioredis"; +import { logger } from "../logger.js"; +import { redisConnectionUrl, redisTlsOptions } from "./redis-config.js"; +import { ensureRepeatJobs } from "./repeatable-jobs.js"; + +const DEBOUNCE_MS = 2000; + +let connection: Redis | null = null; +let debounceTimer: ReturnType | null = null; +let seenReady = false; + +/** + * Watch a dedicated Redis connection and re-register the repeatable jobs after + * a reconnect. A Redis restart drops the TCP connection, which wipes the repeat + * schedulers; ioredis fires "ready" again once it reconnects, and we re-add them. + * + * The very first "ready" is the initial connect — boot already registered the + * jobs, so we skip it. Every later "ready" is a reconnect. + */ +export function startRepeatJobMonitor(): void { + if (connection) return; + + connection = new Redis(redisConnectionUrl, { + ...(redisTlsOptions ? { tls: redisTlsOptions } : {}), + maxRetriesPerRequest: null, + }); + + connection.on("error", (err) => { + logger.debug({ err: err.message }, "Repeat-job monitor connection error"); + }); + + connection.on("ready", () => { + if (!seenReady) { + seenReady = true; + return; + } + logger.info("Redis reconnected — re-registering repeatable jobs"); + if (debounceTimer) clearTimeout(debounceTimer); + debounceTimer = setTimeout(() => { + debounceTimer = null; + ensureRepeatJobs().catch((err) => { + logger.warn({ err }, "Failed to re-register repeatable jobs after reconnect"); + }); + }, DEBOUNCE_MS); + }); +} + +export async function stopRepeatJobMonitor(): Promise { + if (debounceTimer) { + clearTimeout(debounceTimer); + debounceTimer = null; + } + if (connection) { + await connection.quit().catch(() => {}); + connection = null; + } + seenReady = false; +} diff --git a/apps/api/src/services/repeatable-jobs.test.ts b/apps/api/src/services/repeatable-jobs.test.ts new file mode 100644 index 00000000..65eda2b8 --- /dev/null +++ b/apps/api/src/services/repeatable-jobs.test.ts @@ -0,0 +1,60 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +const addMock = vi.fn(); +const closeMock = vi.fn(); + +vi.mock("bullmq", () => ({ + Queue: vi.fn().mockImplementation((name: string) => ({ + name, + add: (...args: unknown[]) => addMock(name, ...args), + close: closeMock, + })), +})); + +vi.mock("./redis-config.js", () => ({ + getBullMQConnectionOptions: vi.fn().mockReturnValue({}), +})); + +vi.mock("../logger.js", () => ({ + logger: { info: vi.fn(), debug: vi.fn(), warn: vi.fn(), error: vi.fn() }, +})); + +import { REPEATABLE_JOBS, ensureRepeatJobs } from "./repeatable-jobs.js"; + +describe("ensureRepeatJobs", () => { + beforeEach(() => { + addMock.mockReset().mockResolvedValue(undefined); + closeMock.mockReset().mockResolvedValue(undefined); + }); + + it("registers exactly the REPEATABLE_JOBS entries with correct repeat options", async () => { + await ensureRepeatJobs(); + + expect(addMock).toHaveBeenCalledTimes(REPEATABLE_JOBS.length); + for (const job of REPEATABLE_JOBS) { + expect(addMock).toHaveBeenCalledWith( + job.queueName, + job.jobName, + {}, + { repeat: { every: job.every } }, + ); + } + }); + + it("closes a queue for every entry", async () => { + await ensureRepeatJobs(); + expect(closeMock).toHaveBeenCalledTimes(REPEATABLE_JOBS.length); + }); + + it("shares a single in-flight run across concurrent calls", async () => { + const first = ensureRepeatJobs(); + const second = ensureRepeatJobs(); + + expect(first).toBe(second); + + await Promise.all([first, second]); + + // One registration pass ran, not two — adds called once per entry. + expect(addMock).toHaveBeenCalledTimes(REPEATABLE_JOBS.length); + }); +}); diff --git a/apps/api/src/services/repeatable-jobs.ts b/apps/api/src/services/repeatable-jobs.ts new file mode 100644 index 00000000..a6e6d5be --- /dev/null +++ b/apps/api/src/services/repeatable-jobs.ts @@ -0,0 +1,85 @@ +import { Queue } from "bullmq"; +import { parseIntEnv } from "@optio/shared"; +import { logger } from "../logger.js"; +import { getBullMQConnectionOptions } from "./redis-config.js"; + +export interface RepeatableJob { + queueName: string; + jobName: string; + every: number; +} + +/** + * Single source of truth for every repeatable BullMQ job. Used both at boot and + * when re-registering after a Redis reconnect, so the two paths can never drift. + */ +export const REPEATABLE_JOBS: RepeatableJob[] = [ + { + queueName: "pr-watcher", + jobName: "check-prs", + every: parseIntEnv("OPTIO_PR_WATCH_INTERVAL", 30000), + }, + { + queueName: "repo-cleanup", + jobName: "health-check", + every: parseIntEnv("OPTIO_HEALTH_CHECK_INTERVAL", 60000), + }, + { + queueName: "repo-cleanup", + jobName: "stall-check", + every: parseIntEnv("OPTIO_STALL_CHECK_INTERVAL", 30000), + }, + { + queueName: "ticket-sync", + jobName: "sync", + every: parseIntEnv("OPTIO_TICKET_SYNC_INTERVAL", 60000), + }, + { + queueName: "workflow-trigger-checker", + jobName: "check-workflow-triggers", + every: parseIntEnv("OPTIO_WORKFLOW_TRIGGER_INTERVAL", 60000), + }, + { + queueName: "token-validation", + jobName: "validate-token", + every: parseIntEnv("OPTIO_TOKEN_VALIDATION_INTERVAL", 300000), + }, + { + queueName: "reconcile-resync", + jobName: "resync", + every: parseIntEnv("OPTIO_RECONCILE_RESYNC_INTERVAL", 300000), + }, +]; + +let inFlight: Promise | null = null; + +/** + * Register every repeatable job. Adding a repeat job is idempotent — BullMQ + * upserts the scheduler by repeat key — so this is safe to call repeatedly. + * Concurrent calls share a single in-flight run. Never throws. + */ +export function ensureRepeatJobs(): Promise { + if (inFlight) return inFlight; + inFlight = registerAll().finally(() => { + inFlight = null; + }); + return inFlight; +} + +async function registerAll(): Promise { + const connection = getBullMQConnectionOptions(); + for (const job of REPEATABLE_JOBS) { + const queue = new Queue(job.queueName, { connection }); + try { + await queue.add(job.jobName, {}, { repeat: { every: job.every } }); + } catch (err) { + logger.warn( + { err, queue: job.queueName, job: job.jobName }, + "Failed to register repeatable job", + ); + } finally { + await queue.close(); + } + } + logger.info({ count: REPEATABLE_JOBS.length }, "Ensured repeatable jobs"); +} diff --git a/apps/api/src/workers/pr-watcher-worker.ts b/apps/api/src/workers/pr-watcher-worker.ts index 836fa951..0bc798cd 100644 --- a/apps/api/src/workers/pr-watcher-worker.ts +++ b/apps/api/src/workers/pr-watcher-worker.ts @@ -3,7 +3,7 @@ import { eq, sql } from "drizzle-orm"; import { db } from "../db/client.js"; import { tasks, sessionPrs, interactiveSessions, reviewDrafts } from "../db/schema.js"; import type { GitPlatform, RepoIdentifier } from "@optio/shared"; -import { parsePrUrl, parseIntEnv } from "@optio/shared"; +import { parsePrUrl } from "@optio/shared"; import { getGitPlatformForRepo } from "../services/git-token-service.js"; import type { GitTokenContext } from "../services/git-token-service.js"; import { updateSessionPr } from "../services/interactive-session-service.js"; @@ -51,16 +51,6 @@ export function determineReviewStatus(reviews: { state: string; body?: string }[ export const prWatcherQueue = new Queue("pr-watcher", { connection: connectionOpts }); export function startPrWatcherWorker() { - prWatcherQueue.add( - "check-prs", - {}, - { - repeat: { - every: parseIntEnv("OPTIO_PR_WATCH_INTERVAL", 30000), - }, - }, - ); - const worker = new Worker( "pr-watcher", instrumentWorkerProcessor("pr-watcher", async () => { diff --git a/apps/api/src/workers/reconcile-worker.ts b/apps/api/src/workers/reconcile-worker.ts index 4d79a8f3..9f0b34a8 100644 --- a/apps/api/src/workers/reconcile-worker.ts +++ b/apps/api/src/workers/reconcile-worker.ts @@ -97,16 +97,6 @@ export const resyncQueue = new Queue("reconcile-resync", { connection: connectio * and enqueue a reconcile key for each. Catches drift from lost events. */ export function startReconcileResyncWorker() { - const intervalMs = parseIntEnv("OPTIO_RECONCILE_RESYNC_INTERVAL", 5 * 60 * 1000); - - resyncQueue.add( - "resync", - {}, - { - repeat: { every: intervalMs }, - }, - ); - const worker = new Worker( "reconcile-resync", instrumentWorkerProcessor("reconcile-resync", async () => { diff --git a/apps/api/src/workers/repo-cleanup-worker.ts b/apps/api/src/workers/repo-cleanup-worker.ts index 976edf35..9e328976 100644 --- a/apps/api/src/workers/repo-cleanup-worker.ts +++ b/apps/api/src/workers/repo-cleanup-worker.ts @@ -54,27 +54,6 @@ async function recordHealthEvent( } export function startRepoCleanupWorker() { - repoCleanupQueue.add( - "health-check", - {}, - { - repeat: { - every: parseIntEnv("OPTIO_HEALTH_CHECK_INTERVAL", 60000), - }, - }, - ); - - // Dedicated stall-check cadence (30s) — more responsive than the 60s health-check - repoCleanupQueue.add( - "stall-check", - {}, - { - repeat: { - every: parseIntEnv("OPTIO_STALL_CHECK_INTERVAL", 30000), - }, - }, - ); - const worker = new Worker( "repo-cleanup", instrumentWorkerProcessor("repo-cleanup", async () => { diff --git a/apps/api/src/workers/ticket-sync-worker.ts b/apps/api/src/workers/ticket-sync-worker.ts index 5fcb2899..17199208 100644 --- a/apps/api/src/workers/ticket-sync-worker.ts +++ b/apps/api/src/workers/ticket-sync-worker.ts @@ -1,5 +1,4 @@ import { Queue, Worker } from "bullmq"; -import { parseIntEnv } from "@optio/shared"; import { logger } from "../logger.js"; import { getBullMQConnectionOptions } from "../services/redis-config.js"; @@ -9,17 +8,6 @@ const connectionOpts = getBullMQConnectionOptions(); export const ticketSyncQueue = new Queue("ticket-sync", { connection: connectionOpts }); export function startTicketSyncWorker(syncFn: () => Promise) { - // Add repeatable job for periodic sync - ticketSyncQueue.add( - "sync", - {}, - { - repeat: { - every: parseIntEnv("OPTIO_TICKET_SYNC_INTERVAL", 60000), // default: 60s - }, - }, - ); - const worker = new Worker( "ticket-sync", async () => { diff --git a/apps/api/src/workers/token-validation-worker.ts b/apps/api/src/workers/token-validation-worker.ts index ade3f12f..1cff0ddf 100644 --- a/apps/api/src/workers/token-validation-worker.ts +++ b/apps/api/src/workers/token-validation-worker.ts @@ -75,18 +75,6 @@ export async function validateClaudeToken( * read them without re-probing the Anthropic API. */ export function startTokenValidationWorker() { - const intervalMs = parseInt(process.env.OPTIO_TOKEN_VALIDATION_INTERVAL ?? "300000", 10); // 5 min - - tokenValidationQueue.add( - "validate-token", - {}, - { - repeat: { - every: intervalMs, - }, - }, - ); - const worker = new Worker( "token-validation", async () => { diff --git a/apps/api/src/workers/workflow-trigger-worker.ts b/apps/api/src/workers/workflow-trigger-worker.ts index 0d803ec1..394e8349 100644 --- a/apps/api/src/workers/workflow-trigger-worker.ts +++ b/apps/api/src/workers/workflow-trigger-worker.ts @@ -1,7 +1,6 @@ import { Queue, Worker } from "bullmq"; import * as workflowService from "../services/workflow-service.js"; import * as taskConfigService from "../services/task-config-service.js"; -import { parseIntEnv } from "@optio/shared"; import { logger } from "../logger.js"; import { getBullMQConnectionOptions } from "../services/redis-config.js"; @@ -17,16 +16,6 @@ export const workflowTriggerQueue = new Queue("workflow-trigger-checker", { * "task_config" wiring is added in a follow-up once that target exists. */ export function startWorkflowTriggerWorker() { - workflowTriggerQueue.add( - "check-workflow-triggers", - {}, - { - repeat: { - every: parseIntEnv("OPTIO_WORKFLOW_TRIGGER_INTERVAL", 60000), - }, - }, - ); - const worker = new Worker( "workflow-trigger-checker", async () => {