From db2e94c1c1a02fc06d702f9607f88a5798a65c6b Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 25 May 2026 22:12:10 -0700 Subject: [PATCH] fix(scheduler): consolidate missed-fire catchup into one row MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hub restart with N missed cron slots for a task was producing N identical 'skipped/catchup' rows (real-world: 20 dupes for a 4h task across a long offline window). Now produces ONE row with error='hub_restart:N_missed' and a human-readable output_snippet summarising first→last missed timestamps. - catchup.ts: new consolidateMissed() helper; runOnce() emits one consolidated row per task for both 'skip' and 'run_once' policies (run_once still dispatches the latest slot live). - insertRunV2: accept optional output_snippet, started_at, finished_at so the consolidated row carries the original first-missed timestamp. - MAX_MISSED bumped to 1000 — single row regardless of count. - Test: 20 / 1 / 1000 / 0 missed slots → 1 / 1 / 1 / 0 rows. --- hub/src/db/scheduled-tasks-dal.ts | 12 +++- hub/src/scheduler/catchup.ts | 79 +++++++++++++++-------- hub/test/catchup-consolidate.test.ts | 95 ++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+), 28 deletions(-) create mode 100644 hub/test/catchup-consolidate.test.ts diff --git a/hub/src/db/scheduled-tasks-dal.ts b/hub/src/db/scheduled-tasks-dal.ts index 7e9f997..ca536bc 100644 --- a/hub/src/db/scheduled-tasks-dal.ts +++ b/hub/src/db/scheduled-tasks-dal.ts @@ -303,18 +303,26 @@ export async function insertRunV2(input: { session_id?: string | null triggered_by_run_id?: string | null error?: string | null + output_snippet?: string | null + started_at?: Date | null + finished_at?: Date | null }): Promise { + const startedAt = + input.started_at !== undefined + ? input.started_at + : input.status === 'pending' ? null : new Date() + const finishedAt = input.finished_at ?? null const rows = await sql` INSERT INTO scheduled_task_runs ( task_id, user_id, session_id, status, error, scheduled_for, target_kind, target_id, triggered_by_run_id, - started_at + started_at, finished_at, output_snippet ) VALUES ( ${input.task_id}, ${input.user_id}, ${input.session_id ?? null}, ${input.status}, ${input.error ?? null}, ${input.scheduled_for}, ${input.target_kind}, ${input.target_id ?? null}, ${input.triggered_by_run_id ?? null}, - ${input.status === 'pending' ? null : sql`now()`} + ${startedAt}, ${finishedAt}, ${input.output_snippet ?? null} ) RETURNING * ` diff --git a/hub/src/scheduler/catchup.ts b/hub/src/scheduler/catchup.ts index 79220ac..e5415fd 100644 --- a/hub/src/scheduler/catchup.ts +++ b/hub/src/scheduler/catchup.ts @@ -2,11 +2,17 @@ * Catch-up on hub boot (W2/T11). * * For each enabled task, walk fires that should have happened between - * `last_fire_at` (or `created_at`) and now. Cap at 100 to avoid blowing - * up if the hub was off for weeks. + * `last_fire_at` (or `created_at`) and now. Cap at MAX_MISSED. * - * - `catchup_policy = 'skip'` → batch-insert all missed as skipped(catchup) - * - `catchup_policy = 'run_once'` → dispatch only the most recent missed fire + * - `catchup_policy = 'skip'` → insert ONE consolidated skipped row + * summarising all missed fires. + * - `catchup_policy = 'run_once'` → insert ONE consolidated skipped row + * covering the older missed fires (if any), + * then dispatch the most recent slot live. + * + * Consolidation rationale: a hub that was offline for hours of a 4h-cadence + * task should produce ONE "Skipped N missed fires" history row, not N + * identical rows. See `consolidateMissed` for the row shape. */ import { Cron } from 'croner' import { @@ -15,7 +21,7 @@ import { type ScheduledTask, } from '../db/scheduled-tasks-dal.ts' -const MAX_MISSED = 100 +const MAX_MISSED = 1000 function computeMissed(task: ScheduledTask): Date[] { const expr = task.cron_expr || task.cron_expression @@ -43,6 +49,37 @@ function computeMissed(task: ScheduledTask): Date[] { } } +/** + * Insert a single consolidated skipped row for N missed fires. + * + * Exported for tests — pure DAL adapter, no hub state. + */ +export async function consolidateMissed( + task: ScheduledTask, + missed: Date[], +): Promise { + if (missed.length === 0) return + const first = missed[0] + const last = missed[missed.length - 1] + const n = missed.length + const snippet = + n === 1 + ? `Skipped 1 missed fire at ${first.toISOString()} during hub downtime` + : `Skipped ${n} missed fires from ${first.toISOString()} to ${last.toISOString()} during hub downtime` + await insertRunV2({ + task_id: task.id, + user_id: task.user_id, + status: 'skipped', + scheduled_for: first, + target_kind: task.target_kind, + target_id: task.target_id, + error: `hub_restart:${n}_missed`, + output_snippet: snippet, + started_at: first, + finished_at: new Date(), + }) +} + export async function runOnce(): Promise<{ tasks: number; missed: number; dispatched: number; skipped: number }> { @@ -57,34 +94,24 @@ export async function runOnce(): Promise<{ if (t.catchup_policy === 'run_once') { try { - const d = await import('./dispatcher.ts') - // Record older missed slots as skipped so history is honest; - // only re-fire the latest missed slot. - for (let i = 0; i < missed.length - 1; i++) { - await insertRunV2({ - task_id: t.id, user_id: t.user_id, status: 'skipped', - scheduled_for: missed[i], target_kind: t.target_kind, - target_id: t.target_id, error: 'catchup', - }) - skipped++ + // Older missed slots → ONE consolidated row (if any). + const older = missed.slice(0, -1) + if (older.length > 0) { + await consolidateMissed(t, older) + skipped += older.length } + const d = await import('./dispatcher.ts') await d.runNow(t.id, t.user_id, {}) dispatched++ } catch (err: any) { console.error(`[scheduler.catchup] dispatch failed task=${t.id}: ${err?.message}`) } } else { - for (const at of missed) { - try { - await insertRunV2({ - task_id: t.id, user_id: t.user_id, status: 'skipped', - scheduled_for: at, target_kind: t.target_kind, - target_id: t.target_id, error: 'catchup', - }) - skipped++ - } catch (err: any) { - console.error(`[scheduler.catchup] insert failed task=${t.id}: ${err?.message}`) - } + try { + await consolidateMissed(t, missed) + skipped += missed.length + } catch (err: any) { + console.error(`[scheduler.catchup] consolidated insert failed task=${t.id}: ${err?.message}`) } } } diff --git a/hub/test/catchup-consolidate.test.ts b/hub/test/catchup-consolidate.test.ts new file mode 100644 index 0000000..3038241 --- /dev/null +++ b/hub/test/catchup-consolidate.test.ts @@ -0,0 +1,95 @@ +/** + * Catchup consolidation (bug fix 2026-05-25). + * + * Before the fix: hub restart with N missed fires for a single task produced + * N identical `skipped/catchup` rows in `scheduled_task_runs`. A real-world + * 4h-cadence task that sat through a long offline window flooded the UI with + * 20+ duplicate rows. + * + * After the fix: ONE row, `error='hub_restart:N_missed'`, with a human-readable + * `output_snippet` summarising the window. + * + * Strategy: mock the DAL module so we never touch postgres, then drive + * `consolidateMissed` directly + assert the single captured insert. + */ +import { describe, test, expect, beforeEach, mock } from 'bun:test' + +const calls: any[] = [] + +mock.module('../src/db/scheduled-tasks-dal.ts', () => ({ + listEnabledTasks: async () => [], + insertRunV2: async (input: any) => { + calls.push(input) + return { id: `run_${calls.length}`, ...input } + }, +})) + +// Import AFTER the mock so the module binds to the stub. +const { consolidateMissed } = await import('../src/scheduler/catchup.ts') + +const baseTask = { + id: 'task_1', + user_id: 'user_1', + target_kind: 'session' as const, + target_id: 'sess_1', + catchup_policy: 'skip' as const, + cron_expr: '0 */4 * * *', + timezone: 'UTC', + created_at: new Date('2026-05-24T00:00:00Z'), + last_fire_at: null, +} as any + +describe('catchup/consolidateMissed', () => { + beforeEach(() => { + calls.length = 0 + }) + + test('20 missed slots → exactly 1 inserted row', async () => { + const missed: Date[] = [] + const start = Date.parse('2026-05-23T00:00:00Z') + for (let i = 0; i < 20; i++) { + missed.push(new Date(start + i * 4 * 60 * 60 * 1000)) + } + await consolidateMissed(baseTask, missed) + expect(calls.length).toBe(1) + const row = calls[0] + expect(row.status).toBe('skipped') + expect(row.error).toBe('hub_restart:20_missed') + expect(row.output_snippet).toContain('Skipped 20 missed fires') + expect(row.output_snippet).toContain('2026-05-23T00:00:00.000Z') + expect(row.output_snippet).toContain( + new Date(start + 19 * 4 * 60 * 60 * 1000).toISOString(), + ) + expect(row.scheduled_for.getTime()).toBe(missed[0].getTime()) + expect(row.started_at.getTime()).toBe(missed[0].getTime()) + expect(row.finished_at).toBeInstanceOf(Date) + expect(row.task_id).toBe('task_1') + expect(row.target_kind).toBe('session') + }) + + test('1 missed slot → 1 row with singular snippet', async () => { + const at = new Date('2026-05-25T08:00:00Z') + await consolidateMissed(baseTask, [at]) + expect(calls.length).toBe(1) + expect(calls[0].error).toBe('hub_restart:1_missed') + expect(calls[0].output_snippet).toBe( + 'Skipped 1 missed fire at 2026-05-25T08:00:00.000Z during hub downtime', + ) + }) + + test('1000 missed slots → still 1 row (no per-fire fan-out)', async () => { + const missed: Date[] = [] + const start = Date.parse('2026-01-01T00:00:00Z') + for (let i = 0; i < 1000; i++) { + missed.push(new Date(start + i * 60_000)) + } + await consolidateMissed(baseTask, missed) + expect(calls.length).toBe(1) + expect(calls[0].error).toBe('hub_restart:1000_missed') + }) + + test('empty missed list → no insert', async () => { + await consolidateMissed(baseTask, []) + expect(calls.length).toBe(0) + }) +})