Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions hub/src/db/scheduled-tasks-dal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,18 +303,26 @@ export async function insertRunV2(input: {
session_id?: string | null
triggered_by_run_id?: string | null
error?: string | null
output_snippet?: string | null
started_at?: Date | null
finished_at?: Date | null
}): Promise<ScheduledTaskRun> {
const startedAt =
input.started_at !== undefined
? input.started_at
: input.status === 'pending' ? null : new Date()
const finishedAt = input.finished_at ?? null
const rows = await sql<ScheduledTaskRun[]>`
INSERT INTO scheduled_task_runs (
task_id, user_id, session_id, status, error,
scheduled_for, target_kind, target_id, triggered_by_run_id,
started_at
started_at, finished_at, output_snippet
) VALUES (
${input.task_id}, ${input.user_id}, ${input.session_id ?? null},
${input.status}, ${input.error ?? null},
${input.scheduled_for}, ${input.target_kind}, ${input.target_id ?? null},
${input.triggered_by_run_id ?? null},
${input.status === 'pending' ? null : sql`now()`}
${startedAt}, ${finishedAt}, ${input.output_snippet ?? null}
)
RETURNING *
`
Expand Down
79 changes: 53 additions & 26 deletions hub/src/scheduler/catchup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@
* Catch-up on hub boot (W2/T11).
*
* For each enabled task, walk fires that should have happened between
* `last_fire_at` (or `created_at`) and now. Cap at 100 to avoid blowing
* up if the hub was off for weeks.
* `last_fire_at` (or `created_at`) and now. Cap at MAX_MISSED.
*
* - `catchup_policy = 'skip'` → batch-insert all missed as skipped(catchup)
* - `catchup_policy = 'run_once'` → dispatch only the most recent missed fire
* - `catchup_policy = 'skip'` → insert ONE consolidated skipped row
* summarising all missed fires.
* - `catchup_policy = 'run_once'` → insert ONE consolidated skipped row
* covering the older missed fires (if any),
* then dispatch the most recent slot live.
*
* Consolidation rationale: a hub that was offline for hours of a 4h-cadence
* task should produce ONE "Skipped N missed fires" history row, not N
* identical rows. See `consolidateMissed` for the row shape.
*/
import { Cron } from 'croner'
import {
Expand All @@ -15,7 +21,7 @@ import {
type ScheduledTask,
} from '../db/scheduled-tasks-dal.ts'

const MAX_MISSED = 100
const MAX_MISSED = 1000

function computeMissed(task: ScheduledTask): Date[] {
const expr = task.cron_expr || task.cron_expression
Expand Down Expand Up @@ -43,6 +49,37 @@ function computeMissed(task: ScheduledTask): Date[] {
}
}

/**
* Insert a single consolidated skipped row for N missed fires.
*
* Exported for tests — pure DAL adapter, no hub state.
*/
export async function consolidateMissed(
task: ScheduledTask,
missed: Date[],
): Promise<void> {
if (missed.length === 0) return
const first = missed[0]
const last = missed[missed.length - 1]
const n = missed.length
const snippet =
n === 1
? `Skipped 1 missed fire at ${first.toISOString()} during hub downtime`
: `Skipped ${n} missed fires from ${first.toISOString()} to ${last.toISOString()} during hub downtime`
await insertRunV2({
task_id: task.id,
user_id: task.user_id,
status: 'skipped',
scheduled_for: first,
target_kind: task.target_kind,
target_id: task.target_id,
error: `hub_restart:${n}_missed`,
output_snippet: snippet,
started_at: first,
finished_at: new Date(),
})
}

export async function runOnce(): Promise<{
tasks: number; missed: number; dispatched: number; skipped: number
}> {
Expand All @@ -57,34 +94,24 @@ export async function runOnce(): Promise<{

if (t.catchup_policy === 'run_once') {
try {
const d = await import('./dispatcher.ts')
// Record older missed slots as skipped so history is honest;
// only re-fire the latest missed slot.
for (let i = 0; i < missed.length - 1; i++) {
await insertRunV2({
task_id: t.id, user_id: t.user_id, status: 'skipped',
scheduled_for: missed[i], target_kind: t.target_kind,
target_id: t.target_id, error: 'catchup',
})
skipped++
// Older missed slots → ONE consolidated row (if any).
const older = missed.slice(0, -1)
if (older.length > 0) {
await consolidateMissed(t, older)
skipped += older.length
}
const d = await import('./dispatcher.ts')
await d.runNow(t.id, t.user_id, {})
dispatched++
} catch (err: any) {
console.error(`[scheduler.catchup] dispatch failed task=${t.id}: ${err?.message}`)
}
} else {
for (const at of missed) {
try {
await insertRunV2({
task_id: t.id, user_id: t.user_id, status: 'skipped',
scheduled_for: at, target_kind: t.target_kind,
target_id: t.target_id, error: 'catchup',
})
skipped++
} catch (err: any) {
console.error(`[scheduler.catchup] insert failed task=${t.id}: ${err?.message}`)
}
try {
await consolidateMissed(t, missed)
skipped += missed.length
} catch (err: any) {
console.error(`[scheduler.catchup] consolidated insert failed task=${t.id}: ${err?.message}`)
}
}
}
Expand Down
95 changes: 95 additions & 0 deletions hub/test/catchup-consolidate.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Catchup consolidation (bug fix 2026-05-25).
*
* Before the fix: hub restart with N missed fires for a single task produced
* N identical `skipped/catchup` rows in `scheduled_task_runs`. A real-world
* 4h-cadence task that sat through a long offline window flooded the UI with
* 20+ duplicate rows.
*
* After the fix: ONE row, `error='hub_restart:N_missed'`, with a human-readable
* `output_snippet` summarising the window.
*
* Strategy: mock the DAL module so we never touch postgres, then drive
* `consolidateMissed` directly + assert the single captured insert.
*/
import { describe, test, expect, beforeEach, mock } from 'bun:test'

const calls: any[] = []

mock.module('../src/db/scheduled-tasks-dal.ts', () => ({
listEnabledTasks: async () => [],
insertRunV2: async (input: any) => {
calls.push(input)
return { id: `run_${calls.length}`, ...input }
},
}))

// Import AFTER the mock so the module binds to the stub.
const { consolidateMissed } = await import('../src/scheduler/catchup.ts')

const baseTask = {
id: 'task_1',
user_id: 'user_1',
target_kind: 'session' as const,
target_id: 'sess_1',
catchup_policy: 'skip' as const,
cron_expr: '0 */4 * * *',
timezone: 'UTC',
created_at: new Date('2026-05-24T00:00:00Z'),
last_fire_at: null,
} as any

describe('catchup/consolidateMissed', () => {
beforeEach(() => {
calls.length = 0
})

test('20 missed slots → exactly 1 inserted row', async () => {
const missed: Date[] = []
const start = Date.parse('2026-05-23T00:00:00Z')
for (let i = 0; i < 20; i++) {
missed.push(new Date(start + i * 4 * 60 * 60 * 1000))
}
await consolidateMissed(baseTask, missed)
expect(calls.length).toBe(1)
const row = calls[0]
expect(row.status).toBe('skipped')
expect(row.error).toBe('hub_restart:20_missed')
expect(row.output_snippet).toContain('Skipped 20 missed fires')
expect(row.output_snippet).toContain('2026-05-23T00:00:00.000Z')
expect(row.output_snippet).toContain(
new Date(start + 19 * 4 * 60 * 60 * 1000).toISOString(),
)
expect(row.scheduled_for.getTime()).toBe(missed[0].getTime())
expect(row.started_at.getTime()).toBe(missed[0].getTime())
expect(row.finished_at).toBeInstanceOf(Date)
expect(row.task_id).toBe('task_1')
expect(row.target_kind).toBe('session')
})

test('1 missed slot → 1 row with singular snippet', async () => {
const at = new Date('2026-05-25T08:00:00Z')
await consolidateMissed(baseTask, [at])
expect(calls.length).toBe(1)
expect(calls[0].error).toBe('hub_restart:1_missed')
expect(calls[0].output_snippet).toBe(
'Skipped 1 missed fire at 2026-05-25T08:00:00.000Z during hub downtime',
)
})

test('1000 missed slots → still 1 row (no per-fire fan-out)', async () => {
const missed: Date[] = []
const start = Date.parse('2026-01-01T00:00:00Z')
for (let i = 0; i < 1000; i++) {
missed.push(new Date(start + i * 60_000))
}
await consolidateMissed(baseTask, missed)
expect(calls.length).toBe(1)
expect(calls[0].error).toBe('hub_restart:1000_missed')
})

test('empty missed list → no insert', async () => {
await consolidateMissed(baseTask, [])
expect(calls.length).toBe(0)
})
})
Loading