Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 82 additions & 13 deletions hub/src/api/scheduled-tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
deleteTask,
} from '../db/scheduled-tasks-dal.ts'
import { validate as validateCron, nextRuns, isValidTimezone } from '../scheduler/cron.ts'
import { validateRules, ruleToCron, type ScheduleRule } from '../scheduler/schedule-rules.ts'
import * as registry from '../scheduler/registry.ts'
import * as dispatcher from '../scheduler/dispatcher.ts'
import {
Expand Down Expand Up @@ -53,7 +54,13 @@ const CreateSchema = z.object({
target_kind: TargetKindEnum,
target_id: z.string().min(1).nullable().optional(),
payload: z.record(z.any()).optional(),
cron_expr: z.string().min(1).max(200),
// Either `cron_expr` (legacy) or `schedule_rules` (new). Validated below.
cron_expr: z.string().min(1).max(200).optional(),
schedule_rules: z.array(z.object({
interval: z.number().int().min(1).max(999),
unit: z.enum(['hours', 'days', 'weeks']),
start_at: z.string().min(1),
})).min(1).max(20).optional(),
timezone: z.string().min(1).max(100),
catchup_policy: CatchupPolicyEnum.optional(),
max_concurrent: z.number().int().min(1).max(10).optional(),
Expand All @@ -67,8 +74,23 @@ const PatchSchema = CreateSchema.partial()

function withNext3<T extends ScheduledTask>(task: T) {
if (!task.enabled) return { ...task, next_3_runs: [] as string[] }
const expr = task.cron_expr || task.cron_expression
const tz = task.timezone || 'UTC'
const rules = Array.isArray(task.schedule_rules) ? (task.schedule_rules as ScheduleRule[]) : []
if (rules.length > 0) {
const merged: number[] = []
for (const r of rules) {
try {
const expr = ruleToCron(r, tz)
const startMs = Date.parse(r.start_at)
const from = Number.isFinite(startMs) && startMs > Date.now() ? new Date(startMs) : new Date()
for (const d of nextRuns(expr, tz, 3, from)) merged.push(d.getTime())
} catch {}
}
merged.sort((a, b) => a - b)
const top = Array.from(new Set(merged)).slice(0, 3).map(ms => new Date(ms).toISOString())
return { ...task, next_3_runs: top }
}
const expr = task.cron_expr || task.cron_expression
if (!expr) return { ...task, next_3_runs: [] as string[] }
const runs = nextRuns(expr, tz, 3).map((d) => d.toISOString())
return { ...task, next_3_runs: runs }
Expand All @@ -78,6 +100,25 @@ function targetIdRequired(kind: string): boolean {
return kind === 'session' || kind === 'supervisor'
}

/**
* Resolve the effective cron_expr to persist + register. Priority:
* - explicit `cron_expr` from the body (legacy clients)
* - rule[0] of `schedule_rules` converted to cron
* - existing row's cron_expr (PATCH path)
*
* Returns null if neither is available.
*/
function resolveCronExpr(
body: { cron_expr?: string; schedule_rules?: ScheduleRule[] },
tz: string,
): string | null {
if (body.cron_expr) return body.cron_expr
if (body.schedule_rules && body.schedule_rules.length > 0) {
return ruleToCron(body.schedule_rules[0], tz)
}
return null
}

interface ValidateOptions {
cron_expr?: string
timezone?: string
Expand Down Expand Up @@ -220,8 +261,25 @@ scheduledTasks.post('/', async (c) => {
}
const data = parsed.data

// Require either cron_expr or schedule_rules. Derive cron_expr from rule[0]
// when only schedule_rules was sent. The legacy column stays populated.
if (!data.cron_expr && (!data.schedule_rules || data.schedule_rules.length === 0)) {
return c.json({ error: 'schedule_required', detail: 'cron_expr or schedule_rules is required' }, 400)
}
if (data.schedule_rules) {
const rv = validateRules(data.schedule_rules)
if (!rv.ok) return c.json({ error: 'invalid_schedule_rules', detail: rv.error }, 400)
}
const effectiveCron = resolveCronExpr(
{ cron_expr: data.cron_expr, schedule_rules: data.schedule_rules as ScheduleRule[] | undefined },
data.timezone,
)
if (!effectiveCron) {
return c.json({ error: 'schedule_required' }, 400)
}

const v = validateInputs({
cron_expr: data.cron_expr,
cron_expr: effectiveCron,
timezone: data.timezone,
target_kind: data.target_kind,
target_id: data.target_id,
Expand All @@ -246,7 +304,7 @@ scheduledTasks.post('/', async (c) => {
target_kind: data.target_kind,
target_id: data.target_id ?? null,
payload: data.payload ?? {},
cron_expr: data.cron_expr,
cron_expr: effectiveCron,
}, data.name_suffix, data.name)

const task = await createTaskV2({
Expand All @@ -256,17 +314,18 @@ scheduledTasks.post('/', async (c) => {
target_kind: data.target_kind,
target_id: data.target_id ?? null,
payload: data.payload ?? {},
cron_expr: data.cron_expr,
cron_expr: effectiveCron,
timezone: data.timezone,
catchup_policy: data.catchup_policy ?? 'skip',
max_concurrent: data.max_concurrent ?? 1,
enabled: data.enabled ?? true,
post_run_actions: v.actions,
session_id: sessionId,
cron_expression: data.cron_expr,
cron_expression: effectiveCron,
prompt: typeof data.payload?.prompt === 'string' ? data.payload.prompt : '',
name_prefix: built.prefix || null,
name_suffix: built.suffix || null,
schedule_rules: data.schedule_rules ?? null,
})

registry.register(task)
Expand All @@ -286,20 +345,29 @@ scheduledTasks.patch('/:id', async (c) => {
}
const data = parsed.data

// Merge effective values for validation.
if (data.schedule_rules) {
const rv = validateRules(data.schedule_rules)
if (!rv.ok) return c.json({ error: 'invalid_schedule_rules', detail: rv.error }, 400)
}

// Merge effective values for validation. If client sent schedule_rules,
// derive a cron from rule[0]; otherwise fall back to explicit cron_expr.
const effectiveTimezone = data.timezone ?? existing.timezone
const derivedFromRules = data.schedule_rules
? resolveCronExpr({ schedule_rules: data.schedule_rules as ScheduleRule[] }, effectiveTimezone)
: null
const effective = {
cron_expr: data.cron_expr ?? existing.cron_expr ?? existing.cron_expression,
timezone: data.timezone ?? existing.timezone,
cron_expr: data.cron_expr ?? derivedFromRules ?? existing.cron_expr ?? existing.cron_expression,
timezone: effectiveTimezone,
target_kind: data.target_kind ?? existing.target_kind,
target_id:
data.target_id !== undefined ? data.target_id : existing.target_id,
post_run_actions:
data.post_run_actions !== undefined ? data.post_run_actions : existing.post_run_actions,
}
const v = validateInputs({
// Only re-validate fields the caller actually changed; on partial PATCH
// we still want to reject if e.g. only the cron was sent and it's bad.
cron_expr: data.cron_expr,
// Validate the (possibly derived) effective cron when rules changed too.
cron_expr: data.cron_expr ?? derivedFromRules ?? undefined,
timezone: data.timezone,
target_kind: data.target_kind,
// target_id pairing depends on the effective kind, so always check it:
Expand Down Expand Up @@ -356,11 +424,12 @@ scheduledTasks.patch('/:id', async (c) => {
target_kind: data.target_kind,
target_id: data.target_id !== undefined ? data.target_id ?? null : undefined,
payload: data.payload,
cron_expr: data.cron_expr,
cron_expr: data.cron_expr ?? derivedFromRules ?? undefined,
timezone: data.timezone,
catchup_policy: data.catchup_policy,
max_concurrent: data.max_concurrent,
post_run_actions: data.post_run_actions !== undefined ? v.actions : undefined,
schedule_rules: data.schedule_rules !== undefined ? (data.schedule_rules as any[]) : undefined,
})
if (!updated) return c.json({ error: 'not_found' }, 404)

Expand Down
21 changes: 19 additions & 2 deletions hub/src/db/scheduled-tasks-dal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ export interface ScheduledTask {
// authoritative for back-compat — DAL writes keep all three in sync.
name_prefix: string | null
name_suffix: string | null
// Simpler-cron picker: structured rules. NULL on legacy rows (UI falls
// back to deriving a single rule from `cron_expr`). Array of
// `{interval, unit, start_at}`.
schedule_rules: any[] | null
// Derived from latest finalized run via LATERAL JOIN in listTasksForUser/getTask.
last_run_cost_usd?: number | null
last_run_duration_ms?: number | null
Expand Down Expand Up @@ -152,6 +156,12 @@ function normalize(row: any): ScheduledTask {
typeof row.post_run_actions === 'string'
? JSON.parse(row.post_run_actions)
: (row.post_run_actions ?? [])
const schedule_rules =
row.schedule_rules == null
? null
: typeof row.schedule_rules === 'string'
? JSON.parse(row.schedule_rules)
: row.schedule_rules
// node-postgres returns NUMERIC as string; coerce when present. EXTRACT(EPOCH ...)
// comes back as a number (double precision) but normalize defensively.
const last_run_cost_usd =
Expand All @@ -167,6 +177,7 @@ function normalize(row: any): ScheduledTask {
on_complete,
payload,
post_run_actions,
schedule_rules,
last_run_cost_usd,
last_run_duration_ms,
}
Expand Down Expand Up @@ -207,13 +218,14 @@ export async function createTaskV2(input: {
prompt?: string
name_prefix?: string | null
name_suffix?: string | null
schedule_rules?: any[] | null
}): Promise<ScheduledTask> {
const rows = await sql<ScheduledTask[]>`
INSERT INTO scheduled_tasks (
user_id, session_id, name, cron_expression, prompt, enabled,
task_type, target_kind, target_id, payload, cron_expr, timezone,
catchup_policy, max_concurrent, post_run_actions,
name_prefix, name_suffix
name_prefix, name_suffix, schedule_rules
) VALUES (
${input.user_id}, ${input.session_id}, ${input.name},
${input.cron_expression ?? input.cron_expr}, ${input.prompt ?? ''},
Expand All @@ -223,7 +235,8 @@ export async function createTaskV2(input: {
${input.timezone}, ${input.catchup_policy ?? 'skip'},
${input.max_concurrent ?? 1},
${sql.json((input.post_run_actions ?? []) as any)},
${input.name_prefix ?? null}, ${input.name_suffix ?? null}
${input.name_prefix ?? null}, ${input.name_suffix ?? null},
${input.schedule_rules ? sql.json(input.schedule_rules as any) : null}
)
RETURNING *
`
Expand All @@ -247,6 +260,7 @@ export async function updateTaskV2(
post_run_actions: PostRunAction[]
name_prefix: string | null
name_suffix: string | null
schedule_rules: any[] | null
}>,
): Promise<ScheduledTask | null> {
const sets: any[] = []
Expand All @@ -268,6 +282,9 @@ export async function updateTaskV2(
if (fields.post_run_actions !== undefined) {
sets.push(sql`post_run_actions = ${sql.json(fields.post_run_actions as any)}`)
}
if (fields.schedule_rules !== undefined) {
sets.push(sql`schedule_rules = ${fields.schedule_rules ? sql.json(fields.schedule_rules as any) : null}`)
}
if (sets.length === 0) return getTask(id, userId)
sets.push(sql`updated_at = now()`)

Expand Down
8 changes: 8 additions & 0 deletions hub/src/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ ALTER TABLE scheduled_tasks ADD COLUMN IF NOT EXISTS post_run_actions JSONB NOT
ALTER TABLE scheduled_tasks ADD COLUMN IF NOT EXISTS name_prefix TEXT;
ALTER TABLE scheduled_tasks ADD COLUMN IF NOT EXISTS name_suffix TEXT;

-- Simpler-cron picker (feat/simpler-cron-ui): structured rules array. Each
-- rule shape: { interval: int, unit: 'hours'|'days'|'weeks', start_at: ISO }.
-- Legacy `cron_expr`/`cron_expression` columns are still populated from
-- rule[0] on write for back-compat with the croner engine. Multiple rules
-- arm multiple cron registrations; fires from any rule route through the
-- same dispatcher.fire(task.id).
ALTER TABLE scheduled_tasks ADD COLUMN IF NOT EXISTS schedule_rules JSONB;

-- W2/T8: drop legacy NOT NULL on session_id so fan-out tasks
-- (all_agents/all_supervisors) and supervisor-targeted tasks can omit it.
-- Idempotent — Postgres no-ops if the column is already nullable.
Expand Down
Loading
Loading