Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion apps/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ async function main() {
cleanRepeatJobs("reconcile-resync"),
]);

// Start BullMQ workers (each re-registers its repeat job)
// Register all repeatable jobs from the single source of truth
const { ensureRepeatJobs } = await import("./services/repeatable-jobs.js");
await ensureRepeatJobs();

// Start BullMQ workers
const worker = startTaskWorker();
logger.info("Task worker started");

Expand Down Expand Up @@ -223,6 +227,12 @@ async function main() {
const reconcileResyncWorker = startReconcileResyncWorker();
logger.info("Reconcile workers started");

// Re-register repeatable jobs after a Redis reconnect (e.g. Redis pod restart)
const { startRepeatJobMonitor, stopRepeatJobMonitor } =
await import("./services/repeat-job-monitor.js");
startRepeatJobMonitor();
logger.info("Repeat-job monitor started");

// Check if metrics-server is available
checkMetricsServer().catch(() => {});

Expand All @@ -235,6 +245,7 @@ async function main() {
// Graceful shutdown
const shutdown = async () => {
logger.info("Shutting down...");
await stopRepeatJobMonitor();
await worker.close();
await ticketSyncWorker.close();
await repoCleanupWorker.close();
Expand Down
79 changes: 79 additions & 0 deletions apps/api/src/services/repeat-job-monitor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";

const handlers: Record<string, (...args: unknown[]) => void> = {};
const quitMock = vi.fn().mockResolvedValue("OK");

const mockRedis = {
on: vi.fn((event: string, cb: (...args: unknown[]) => void) => {
handlers[event] = cb;
return mockRedis;
}),
quit: quitMock,
};

vi.mock("ioredis", () => ({
Redis: vi.fn().mockImplementation(() => mockRedis),
}));

vi.mock("./redis-config.js", () => ({
redisConnectionUrl: "redis://localhost:6379",
redisTlsOptions: undefined,
}));

const ensureRepeatJobsMock = vi.fn().mockResolvedValue(undefined);
vi.mock("./repeatable-jobs.js", () => ({
ensureRepeatJobs: (...args: unknown[]) => ensureRepeatJobsMock(...args),
}));

vi.mock("../logger.js", () => ({
logger: { info: vi.fn(), debug: vi.fn(), warn: vi.fn(), error: vi.fn() },
}));

import { startRepeatJobMonitor, stopRepeatJobMonitor } from "./repeat-job-monitor.js";

describe("repeat-job-monitor", () => {
beforeEach(() => {
vi.useFakeTimers();
ensureRepeatJobsMock.mockClear();
for (const key of Object.keys(handlers)) delete handlers[key];
});

afterEach(async () => {
await stopRepeatJobMonitor();
vi.useRealTimers();
});

it("ignores the first ready (initial connect)", () => {
startRepeatJobMonitor();
handlers.ready();
vi.advanceTimersByTime(5000);
expect(ensureRepeatJobsMock).not.toHaveBeenCalled();
});

it("re-registers jobs on a reconnect (later ready), after debounce", () => {
startRepeatJobMonitor();
handlers.ready(); // initial connect
handlers.ready(); // reconnect

expect(ensureRepeatJobsMock).not.toHaveBeenCalled(); // debounce not elapsed
vi.advanceTimersByTime(2000);
expect(ensureRepeatJobsMock).toHaveBeenCalledTimes(1);
});

it("debounces a burst of reconnects into a single re-registration", () => {
startRepeatJobMonitor();
handlers.ready(); // initial connect
handlers.ready();
handlers.ready();
handlers.ready();

vi.advanceTimersByTime(2000);
expect(ensureRepeatJobsMock).toHaveBeenCalledTimes(1);
});

it("stop quits the connection", async () => {
startRepeatJobMonitor();
await stopRepeatJobMonitor();
expect(quitMock).toHaveBeenCalled();
});
});
58 changes: 58 additions & 0 deletions apps/api/src/services/repeat-job-monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { Redis } from "ioredis";
import { logger } from "../logger.js";
import { redisConnectionUrl, redisTlsOptions } from "./redis-config.js";
import { ensureRepeatJobs } from "./repeatable-jobs.js";

const DEBOUNCE_MS = 2000;

let connection: Redis | null = null;
let debounceTimer: ReturnType<typeof setTimeout> | null = null;
let seenReady = false;

/**
* Watch a dedicated Redis connection and re-register the repeatable jobs after
* a reconnect. A Redis restart drops the TCP connection, which wipes the repeat
* schedulers; ioredis fires "ready" again once it reconnects, and we re-add them.
*
* The very first "ready" is the initial connect — boot already registered the
* jobs, so we skip it. Every later "ready" is a reconnect.
*/
export function startRepeatJobMonitor(): void {
if (connection) return;

connection = new Redis(redisConnectionUrl, {
...(redisTlsOptions ? { tls: redisTlsOptions } : {}),
maxRetriesPerRequest: null,
});

connection.on("error", (err) => {
logger.debug({ err: err.message }, "Repeat-job monitor connection error");
});

connection.on("ready", () => {
if (!seenReady) {
seenReady = true;
return;
}
logger.info("Redis reconnected — re-registering repeatable jobs");
if (debounceTimer) clearTimeout(debounceTimer);
debounceTimer = setTimeout(() => {
debounceTimer = null;
ensureRepeatJobs().catch((err) => {
logger.warn({ err }, "Failed to re-register repeatable jobs after reconnect");
});
}, DEBOUNCE_MS);
});
}

export async function stopRepeatJobMonitor(): Promise<void> {
if (debounceTimer) {
clearTimeout(debounceTimer);
debounceTimer = null;
}
if (connection) {
await connection.quit().catch(() => {});
connection = null;
}
seenReady = false;
}
60 changes: 60 additions & 0 deletions apps/api/src/services/repeatable-jobs.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { describe, it, expect, vi, beforeEach } from "vitest";

const addMock = vi.fn();
const closeMock = vi.fn();

vi.mock("bullmq", () => ({
Queue: vi.fn().mockImplementation((name: string) => ({
name,
add: (...args: unknown[]) => addMock(name, ...args),
close: closeMock,
})),
}));

vi.mock("./redis-config.js", () => ({
getBullMQConnectionOptions: vi.fn().mockReturnValue({}),
}));

vi.mock("../logger.js", () => ({
logger: { info: vi.fn(), debug: vi.fn(), warn: vi.fn(), error: vi.fn() },
}));

import { REPEATABLE_JOBS, ensureRepeatJobs } from "./repeatable-jobs.js";

describe("ensureRepeatJobs", () => {
beforeEach(() => {
addMock.mockReset().mockResolvedValue(undefined);
closeMock.mockReset().mockResolvedValue(undefined);
});

it("registers exactly the REPEATABLE_JOBS entries with correct repeat options", async () => {
await ensureRepeatJobs();

expect(addMock).toHaveBeenCalledTimes(REPEATABLE_JOBS.length);
for (const job of REPEATABLE_JOBS) {
expect(addMock).toHaveBeenCalledWith(
job.queueName,
job.jobName,
{},
{ repeat: { every: job.every } },
);
}
});

it("closes a queue for every entry", async () => {
await ensureRepeatJobs();
expect(closeMock).toHaveBeenCalledTimes(REPEATABLE_JOBS.length);
});

it("shares a single in-flight run across concurrent calls", async () => {
const first = ensureRepeatJobs();
const second = ensureRepeatJobs();

expect(first).toBe(second);

await Promise.all([first, second]);

// One registration pass ran, not two — adds called once per entry.
expect(addMock).toHaveBeenCalledTimes(REPEATABLE_JOBS.length);
});
});
85 changes: 85 additions & 0 deletions apps/api/src/services/repeatable-jobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { Queue } from "bullmq";
import { parseIntEnv } from "@optio/shared";
import { logger } from "../logger.js";
import { getBullMQConnectionOptions } from "./redis-config.js";

export interface RepeatableJob {
queueName: string;
jobName: string;
every: number;
}

/**
* Single source of truth for every repeatable BullMQ job. Used both at boot and
* when re-registering after a Redis reconnect, so the two paths can never drift.
*/
export const REPEATABLE_JOBS: RepeatableJob[] = [
{
queueName: "pr-watcher",
jobName: "check-prs",
every: parseIntEnv("OPTIO_PR_WATCH_INTERVAL", 30000),
},
{
queueName: "repo-cleanup",
jobName: "health-check",
every: parseIntEnv("OPTIO_HEALTH_CHECK_INTERVAL", 60000),
},
{
queueName: "repo-cleanup",
jobName: "stall-check",
every: parseIntEnv("OPTIO_STALL_CHECK_INTERVAL", 30000),
},
{
queueName: "ticket-sync",
jobName: "sync",
every: parseIntEnv("OPTIO_TICKET_SYNC_INTERVAL", 60000),
},
{
queueName: "workflow-trigger-checker",
jobName: "check-workflow-triggers",
every: parseIntEnv("OPTIO_WORKFLOW_TRIGGER_INTERVAL", 60000),
},
{
queueName: "token-validation",
jobName: "validate-token",
every: parseIntEnv("OPTIO_TOKEN_VALIDATION_INTERVAL", 300000),
},
{
queueName: "reconcile-resync",
jobName: "resync",
every: parseIntEnv("OPTIO_RECONCILE_RESYNC_INTERVAL", 300000),
},
];

let inFlight: Promise<void> | null = null;

/**
* Register every repeatable job. Adding a repeat job is idempotent — BullMQ
* upserts the scheduler by repeat key — so this is safe to call repeatedly.
* Concurrent calls share a single in-flight run. Never throws.
*/
export function ensureRepeatJobs(): Promise<void> {
if (inFlight) return inFlight;
inFlight = registerAll().finally(() => {
inFlight = null;
});
return inFlight;
}

async function registerAll(): Promise<void> {
const connection = getBullMQConnectionOptions();
for (const job of REPEATABLE_JOBS) {
const queue = new Queue(job.queueName, { connection });
try {
await queue.add(job.jobName, {}, { repeat: { every: job.every } });
} catch (err) {
logger.warn(
{ err, queue: job.queueName, job: job.jobName },
"Failed to register repeatable job",
);
} finally {
await queue.close();
}
}
logger.info({ count: REPEATABLE_JOBS.length }, "Ensured repeatable jobs");
}
12 changes: 1 addition & 11 deletions apps/api/src/workers/pr-watcher-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { eq, sql } from "drizzle-orm";
import { db } from "../db/client.js";
import { tasks, sessionPrs, interactiveSessions, reviewDrafts } from "../db/schema.js";
import type { GitPlatform, RepoIdentifier } from "@optio/shared";
import { parsePrUrl, parseIntEnv } from "@optio/shared";
import { parsePrUrl } from "@optio/shared";
import { getGitPlatformForRepo } from "../services/git-token-service.js";
import type { GitTokenContext } from "../services/git-token-service.js";
import { updateSessionPr } from "../services/interactive-session-service.js";
Expand Down Expand Up @@ -51,16 +51,6 @@ export function determineReviewStatus(reviews: { state: string; body?: string }[
export const prWatcherQueue = new Queue("pr-watcher", { connection: connectionOpts });

export function startPrWatcherWorker() {
prWatcherQueue.add(
"check-prs",
{},
{
repeat: {
every: parseIntEnv("OPTIO_PR_WATCH_INTERVAL", 30000),
},
},
);

const worker = new Worker(
"pr-watcher",
instrumentWorkerProcessor("pr-watcher", async () => {
Expand Down
10 changes: 0 additions & 10 deletions apps/api/src/workers/reconcile-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,6 @@ export const resyncQueue = new Queue("reconcile-resync", { connection: connectio
* and enqueue a reconcile key for each. Catches drift from lost events.
*/
export function startReconcileResyncWorker() {
const intervalMs = parseIntEnv("OPTIO_RECONCILE_RESYNC_INTERVAL", 5 * 60 * 1000);

resyncQueue.add(
"resync",
{},
{
repeat: { every: intervalMs },
},
);

const worker = new Worker(
"reconcile-resync",
instrumentWorkerProcessor("reconcile-resync", async () => {
Expand Down
21 changes: 0 additions & 21 deletions apps/api/src/workers/repo-cleanup-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,6 @@ async function recordHealthEvent(
}

export function startRepoCleanupWorker() {
repoCleanupQueue.add(
"health-check",
{},
{
repeat: {
every: parseIntEnv("OPTIO_HEALTH_CHECK_INTERVAL", 60000),
},
},
);

// Dedicated stall-check cadence (30s) — more responsive than the 60s health-check
repoCleanupQueue.add(
"stall-check",
{},
{
repeat: {
every: parseIntEnv("OPTIO_STALL_CHECK_INTERVAL", 30000),
},
},
);

const worker = new Worker(
"repo-cleanup",
instrumentWorkerProcessor("repo-cleanup", async () => {
Expand Down
Loading
Loading