Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions hub/src/api/scheduled-tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,15 @@ scheduledTasks.post('/:id/run-now', async (c) => {
const id = c.req.param('id')
const task = await getTask(id, userId)
if (!task) return c.json({ error: 'not_found' }, 404)
// Fire-and-forget. The dispatcher persists run rows and broadcasts WS
// events; client subscribes to those for live progress.
void dispatcher.runNow(id, userId).catch((err) =>
console.error(`[api.scheduled-tasks] run-now failed task=${id}: ${err?.message ?? err}`),
)
return c.json({ ok: true, status: 'dispatched' }, 202)
// Await dispatch so we can return the created run_ids the client uses to
// track progress via WS. Manual runs fail fast on offline targets instead
// of silently grace-queuing (the UI has no way to surface a pending
// grace-queued row otherwise).
try {
const res = await dispatcher.runNow(id, userId, { isManual: true })
return c.json({ ok: true, status: 'dispatched', run_ids: res.runIds }, 202)
} catch (err: any) {
console.error(`[api.scheduled-tasks] run-now failed task=${id}: ${err?.message ?? err}`)
return c.json({ error: 'dispatch_failed', message: err?.message ?? 'unknown' }, 500)
}
})
53 changes: 41 additions & 12 deletions hub/src/scheduler/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,19 @@ export async function runNow(
triggeredByRunId?: string | null
chainDepth?: number
payloadOverride?: Record<string, unknown>
isManual?: boolean
} = {},
): Promise<void> {
): Promise<{ runIds: string[] }> {
const task = await getTaskById(taskId)
if (!task) return
if (task.user_id !== userId) return
if (!task) return { runIds: [] }
if (task.user_id !== userId) return { runIds: [] }
// Phase 06 plan 008 — webhook-triggered triage passes per-event payload.
if (opts.payloadOverride) {
;(task as any).payload = { ...(task.payload ?? {}), ...opts.payloadOverride }
}
const chainDepth = opts.chainDepth ?? 0
if (chainDepth > MAX_CHAIN_DEPTH) {
await insertRunV2({
const r = await insertRunV2({
task_id: task.id,
user_id: userId,
status: 'failed',
Expand All @@ -87,24 +88,28 @@ export async function runNow(
triggered_by_run_id: opts.triggeredByRunId ?? null,
error: 'chain_depth_exceeded',
})
return
return { runIds: [r.id] }
}
await fireTask(task, {
return await fireTask(task, {
chainDepth,
triggeredByRunId: opts.triggeredByRunId ?? null,
skipCronUpdate: true,
isManual: opts.isManual === true,
})
}

interface FireOpts {
chainDepth: number
triggeredByRunId?: string | null
skipCronUpdate?: boolean
isManual?: boolean
}

async function fireTask(task: ScheduledTask, opts: FireOpts): Promise<void> {
async function fireTask(task: ScheduledTask, opts: FireOpts): Promise<{ runIds: string[] }> {
const now = new Date()
const userId = task.user_id
const runIds: string[] = []
const isManual = opts.isManual === true

if (await isOverCostCap(userId, task.timezone)) {
const run = await insertRunV2({
Expand All @@ -123,7 +128,8 @@ async function fireTask(task: ScheduledTask, opts: FireOpts): Promise<void> {
})
void onRunFinalized(task, run.id, 'skipped', 'daily_cost_cap')
if (!opts.skipCronUpdate) updateFireTimestamps(task.id, now)
return
runIds.push(run.id)
return { runIds }
}

// Phase 06 plan 008 — triage tasks route through pickSessionTarget instead
Expand Down Expand Up @@ -170,7 +176,8 @@ async function fireTask(task: ScheduledTask, opts: FireOpts): Promise<void> {
void finalizeRun(run.id, 'failed', err?.message || 'triage_import_failed')
}
if (!opts.skipCronUpdate) updateFireTimestamps(task.id, now)
return
runIds.push(run.id)
return { runIds }
}

const targets = await resolveTargets(task, userId)
Expand All @@ -183,11 +190,17 @@ async function fireTask(task: ScheduledTask, opts: FireOpts): Promise<void> {
target_kind: task.target_kind,
target_id: task.target_id,
triggered_by_run_id: opts.triggeredByRunId ?? null,
error: 'no_targets',
error: isManual ? 'target_offline' : 'no_targets',
})
broadcastScheduledRun(userId, {
type: 'scheduled_run_finished',
run_id: run.id, task_id: task.id, status: 'failed',
error: isManual ? 'target_offline' : 'no_targets',
})
void onRunFinalized(task, run.id, 'failed', 'no_targets')
void onRunFinalized(task, run.id, 'failed', isManual ? 'target_offline' : 'no_targets')
if (!opts.skipCronUpdate) updateFireTimestamps(task.id, now)
return
runIds.push(run.id)
return { runIds }
}

const isFanOut = task.target_kind === 'all_agents' || task.target_kind === 'all_supervisors'
Expand Down Expand Up @@ -225,6 +238,7 @@ async function fireTask(task: ScheduledTask, opts: FireOpts): Promise<void> {
triggeredByRunId: opts.triggeredByRunId ?? null,
}
trackRun(ctx)
runIds.push(run.id)

broadcastScheduledRun(userId, {
type: 'scheduled_run_started',
Expand All @@ -236,6 +250,20 @@ async function fireTask(task: ScheduledTask, opts: FireOpts): Promise<void> {
})

if (!target.online) {
if (isManual) {
// Manual run: fail fast with target_offline so the UI gets immediate
// feedback instead of a row that lingers pending until grace expires.
await updateRunStatus(run.id, {
status: 'failed', error: 'target_offline', finished_at: new Date(),
})
broadcastScheduledRun(userId, {
type: 'scheduled_run_finished',
run_id: run.id, task_id: task.id, status: 'failed', error: 'target_offline',
})
inFlightByRun.delete(run.id)
void onRunFinalized(task, run.id, 'failed', 'target_offline')
continue
}
const key = target.sessionId ?? target.supervisorId
if (key) {
try {
Expand Down Expand Up @@ -292,6 +320,7 @@ async function fireTask(task: ScheduledTask, opts: FireOpts): Promise<void> {
}

if (!opts.skipCronUpdate) updateFireTimestamps(task.id, now)
return { runIds }
}

function updateFireTimestamps(taskId: string, fired: Date): void {
Expand Down
12 changes: 10 additions & 2 deletions web/src/components/SchedulesPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,16 @@ export function SchedulesPage({ token, onBack, subscribe }: Props) {

const handleRunNow = async (id: string) => {
setBusy(b => ({ ...b, [id]: true }))
try { await runNow(id) } catch {}
finally { setBusy(b => ({ ...b, [id]: false })) }
try {
const res = await runNow(id)
if (!res || !res.run_ids || res.run_ids.length === 0) {
console.warn('[run-now] dispatched but no run rows created (task may have no target)')
}
} catch (err: any) {
console.error('[run-now] failed:', err?.message ?? err)
} finally {
setBusy(b => ({ ...b, [id]: false }))
}
}

const handleDelete = async (id: string) => {
Expand Down
Loading