Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions hub/src/api/account.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Hono } from 'hono';
import { z } from 'zod';
import { authMiddleware } from '../auth/middleware.ts';
import {
getUserCoolifyWebhookStatus,
Expand All @@ -7,7 +8,11 @@ import {
getUserCoolifyWebhookAllowedIps,
setUserCoolifyWebhookAllowedIps,
listCoolifyWebhookAttempts,
getUserClaudeThresholds,
setUserClaudeThresholds,
} from '../db/dal.ts';
import { getUsage } from '../usage/store.ts';
import { evaluateThreshold } from '../usage/threshold.ts';

export const accountRouter = new Hono();
export { accountRouter as account };
Expand Down Expand Up @@ -96,6 +101,78 @@ accountRouter.get('/coolify-webhook-allowed-ips', async (c) => {
}
});

// ── Claude usage thresholds ──────────────────────────────────────────────────
// GET /api/account/claude-thresholds → { session_pct, week_pct }
accountRouter.get('/claude-thresholds', async (c) => {
const userId = c.get('userId') as string;
try {
const t = await getUserClaudeThresholds(userId);
return c.json({
session_pct: t.claude_session_threshold_pct,
week_pct: t.claude_week_threshold_pct,
});
} catch (err: any) {
console.error('[account] claude-thresholds GET failed:', err?.code, err?.message);
return c.json({ error: 'internal_error', code: err?.code ?? null }, 500);
}
});

const ThresholdSchema = z.object({
session_pct: z.number().int().min(1).max(100).nullable(),
week_pct: z.number().int().min(1).max(100).nullable(),
}).strict();

// PUT /api/account/claude-thresholds body: { session_pct, week_pct }
accountRouter.put('/claude-thresholds', async (c) => {
const userId = c.get('userId') as string;
let body: any;
try { body = await c.req.json(); } catch { return c.json({ error: 'bad_json' }, 400); }
const parsed = ThresholdSchema.safeParse(body);
if (!parsed.success) {
return c.json({ error: 'invalid_body', detail: parsed.error.flatten() }, 400);
}
try {
const saved = await setUserClaudeThresholds(userId, {
claude_session_threshold_pct: parsed.data.session_pct,
claude_week_threshold_pct: parsed.data.week_pct,
});
return c.json({
session_pct: saved.claude_session_threshold_pct,
week_pct: saved.claude_week_threshold_pct,
});
} catch (err: any) {
console.error('[account] claude-thresholds PUT failed:', err?.code, err?.message);
return c.json({ error: 'internal_error', code: err?.code ?? null }, 500);
}
});

// GET /api/account/usage → { usage, thresholds, paused, reason, ... }
// Used by Layout.tsx on first paint before the WS event arrives.
accountRouter.get('/usage', async (c) => {
const userId = c.get('userId') as string;
try {
const t = await getUserClaudeThresholds(userId);
const snap = getUsage(userId);
const decision = evaluateThreshold(snap, t);
return c.json({
usage: snap?.usage ?? null,
updated_at: snap?.updated_at ?? null,
thresholds: {
session_pct: t.claude_session_threshold_pct,
week_pct: t.claude_week_threshold_pct,
},
paused: !decision.allowed,
reason: decision.reason ?? null,
utilization_pct: decision.utilization_pct ?? null,
threshold_pct: decision.threshold_pct ?? null,
resets_at: decision.resets_at ?? null,
});
} catch (err: any) {
console.error('[account] usage GET failed:', err?.code, err?.message);
return c.json({ error: 'internal_error', code: err?.code ?? null }, 500);
}
});

// PUT /api/account/coolify-webhook-allowed-ips { allowed_ips: string }
accountRouter.put('/coolify-webhook-allowed-ips', async (c) => {
const userId = c.get('userId') as string;
Expand Down
10 changes: 10 additions & 0 deletions hub/src/api/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@ sessions.post('/heal', async (c) => {
excludeSupervisorIds: Array.from(exclude),
})

if (pick.kind === 'quota_blocked') {
return c.json({
error: 'quota_threshold_reached',
reason: pick.reason,
utilization_pct: pick.utilization_pct,
threshold_pct: pick.threshold_pct,
resets_at: pick.resets_at,
}, 503)
}

if (pick.kind === 'none') {
return c.json({ error: 'no_target_available' }, 503)
}
Expand Down
35 changes: 34 additions & 1 deletion hub/src/db/dal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ export async function revokeAllUserCredentials(userId: string): Promise<{ revoke
// ── Users / Profiles ──────────────────────────────────────────────────────────

export async function getUserById(id: string) {
const rows = await sql`SELECT id, email, display_name, avatar_url, role, system_prompt, timezone, daily_cost_cap_usd, web_push_enabled, created_at, updated_at FROM users WHERE id = ${id}`;
const rows = await sql`SELECT id, email, display_name, avatar_url, role, system_prompt, timezone, daily_cost_cap_usd, web_push_enabled, claude_session_threshold_pct, claude_week_threshold_pct, created_at, updated_at FROM users WHERE id = ${id}`;
return rows[0] ?? null;
}

Expand All @@ -290,6 +290,39 @@ export async function getUserSystemPrompt(id: string): Promise<string | null> {
return (rows[0]?.system_prompt as string | null) ?? null;
}

// ── Claude usage thresholds ──────────────────────────────────────────────────
export interface ClaudeThresholds {
claude_session_threshold_pct: number | null;
claude_week_threshold_pct: number | null;
}

export async function getUserClaudeThresholds(userId: string): Promise<ClaudeThresholds> {
const rows = await sql<ClaudeThresholds[]>`
SELECT claude_session_threshold_pct, claude_week_threshold_pct
FROM users WHERE id = ${userId}
`;
const row = rows[0];
return {
claude_session_threshold_pct: row?.claude_session_threshold_pct ?? null,
claude_week_threshold_pct: row?.claude_week_threshold_pct ?? null,
};
}

export async function setUserClaudeThresholds(
userId: string,
thresholds: ClaudeThresholds,
): Promise<ClaudeThresholds> {
const rows = await sql<ClaudeThresholds[]>`
UPDATE users
SET claude_session_threshold_pct = ${thresholds.claude_session_threshold_pct},
claude_week_threshold_pct = ${thresholds.claude_week_threshold_pct},
updated_at = now()
WHERE id = ${userId}
RETURNING claude_session_threshold_pct, claude_week_threshold_pct
`;
return rows[0] ?? thresholds;
}

export type UserInstructions = {
claude_global_md: string | null;
codex_agents_md: string | null;
Expand Down
1 change: 1 addition & 0 deletions hub/src/db/scheduled-tasks-dal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export type RunStatus =
| 'success'
| 'failed'
| 'skipped'
| 'skipped_quota'
| 'cancelled'

export interface ScheduledTask {
Expand Down
28 changes: 28 additions & 0 deletions hub/src/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,31 @@ CREATE TABLE IF NOT EXISTS coolify_webhook_attempts (
CREATE INDEX IF NOT EXISTS idx_coolify_webhook_attempts_user_recv
ON coolify_webhook_attempts(user_id, received_at DESC);

-- ── feat/claude-usage-thresholds ─────────────────────────────────────────────
-- Per-user thresholds for the Anthropic OAuth usage gate. Compared against the
-- in-memory snapshot from agent/src/usage-poller.ts (utilization is already a
-- percentage 0-100). NULL = gate OFF (back-compat — existing users are not
-- silently opted in on deploy).
ALTER TABLE users ADD COLUMN IF NOT EXISTS claude_session_threshold_pct INTEGER;
ALTER TABLE users ADD COLUMN IF NOT EXISTS claude_week_threshold_pct INTEGER;

DO $$ BEGIN
ALTER TABLE users ADD CONSTRAINT users_claude_session_threshold_pct_range
CHECK (claude_session_threshold_pct IS NULL
OR (claude_session_threshold_pct BETWEEN 1 AND 100));
EXCEPTION WHEN duplicate_object THEN NULL; END $$;

DO $$ BEGIN
ALTER TABLE users ADD CONSTRAINT users_claude_week_threshold_pct_range
CHECK (claude_week_threshold_pct IS NULL
OR (claude_week_threshold_pct BETWEEN 1 AND 100));
EXCEPTION WHEN duplicate_object THEN NULL; END $$;

-- New run status: 'skipped_quota' — distinguishes threshold-gated skips from
-- daily-cost-cap skips so the run history drawer can filter them separately.
DO $$ BEGIN
ALTER TABLE scheduled_task_runs DROP CONSTRAINT IF EXISTS scheduled_task_runs_status_check;
ALTER TABLE scheduled_task_runs ADD CONSTRAINT scheduled_task_runs_status_check
CHECK (status IN ('running','success','failed','skipped','pending','in_flight','cancelled','skipped_quota'));
EXCEPTION WHEN others THEN NULL; END $$;

19 changes: 19 additions & 0 deletions hub/src/error-capture/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import * as queue from '../scheduler/session-queue.ts'
import { notifyThrottled } from './notify.ts'
import { buildErrorMessage } from './prompt.ts'
import { registerErrorRunForSession } from './run-lifecycle.ts'
import { checkUserThreshold } from '../usage/threshold.ts'

export type DispatchOutcome =
| { status: 'dispatched'; run_id: string }
Expand Down Expand Up @@ -66,6 +67,24 @@ export async function dispatchPendingError(errorId: string): Promise<DispatchOut

const sessionId = project.session_id
const userId = project.user_id

// Claude usage threshold gate — refuses dispatch with skip_reason that
// includes the breach details. Matches the cost-cap audit pattern: we
// persist the skip on the error row rather than silently dropping.
const threshold = await checkUserThreshold(userId)
if (!threshold.allowed) {
const skipReason = `quota_threshold_reached:${threshold.reason}`
await updateErrorDispatchStatus(errorId, 'skipped', skipReason)
broadcastErrorEvent(userId, {
type: 'error_skipped',
error_id: errorId,
project_id: project.id,
dispatch_status: 'skipped',
skip_reason: skipReason,
})
return { status: 'skipped', skip_reason: skipReason }
}

const channel = getChannel(sessionId)

// 3. Offline target → grace + skip.
Expand Down
35 changes: 35 additions & 0 deletions hub/src/scheduler/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import * as registry from './registry.ts'
import * as queue from './session-queue.ts'
import { broadcastScheduledRun, broadcastToUser } from '../ws/registry.ts'
import { reserveSessionSlot, getCapacitySnapshot } from '../sessions/budget.ts'
import { checkUserThreshold } from '../usage/threshold.ts'

const MAX_CHAIN_DEPTH = 5

Expand Down Expand Up @@ -106,6 +107,32 @@ async function fireTask(task: ScheduledTask, opts: FireOpts): Promise<void> {
const now = new Date()
const userId = task.user_id

// Claude usage threshold gate — sits in front of the cost-cap gate.
// Same shape, distinct status ('skipped_quota'). Persisting the run row
// (rather than silently dropping) is required for the run-history drawer
// and matches the cost-cap audit pattern.
const threshold = await checkUserThreshold(userId)
if (!threshold.allowed) {
const errMsg = `quota_threshold_reached:${threshold.reason}:${threshold.utilization_pct}>=${threshold.threshold_pct}`
const run = await insertRunV2({
task_id: task.id,
user_id: userId,
status: 'skipped_quota',
scheduled_for: now,
target_kind: task.target_kind,
target_id: task.target_id,
triggered_by_run_id: opts.triggeredByRunId ?? null,
error: errMsg,
})
broadcastScheduledRun(userId, {
type: 'scheduled_run_finished',
run_id: run.id, task_id: task.id, status: 'skipped_quota', error: errMsg,
})
void onRunFinalized(task, run.id, 'skipped_quota', errMsg)
if (!opts.skipCronUpdate) updateFireTimestamps(task.id, now)
return
}

if (await isOverCostCap(userId, task.timezone)) {
const run = await insertRunV2({
task_id: task.id,
Expand Down Expand Up @@ -437,6 +464,14 @@ export function init(): void {
if (!ctx) return
const task = await getTaskById(ctx.taskId)
if (!task) return
// Re-evaluate the threshold gate at waiter promotion — the user may have
// crossed the cap while the run was queued. Drop with skipped_quota.
const t = await checkUserThreshold(ctx.userId)
if (!t.allowed) {
const errMsg = `quota_threshold_reached:${t.reason}:${t.utilization_pct}>=${t.threshold_pct}`
await finalizeRun(runId, 'skipped_quota', errMsg)
return
}
void routeToSender(task, ctx).catch((err) =>
console.error(
`[scheduler.dispatcher] promoted send failed run=${runId} session=${sessionId}: ${err?.message}`,
Expand Down
8 changes: 8 additions & 0 deletions hub/src/scheduler/senders/triage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ export async function sendTriage(
payload: TriagePayload,
): Promise<void> {
const pick = await pickSessionTarget(ctx.userId)
if (pick.kind === 'quota_blocked') {
await finalizeRun(
ctx.runId,
'skipped_quota',
`quota_threshold_reached:${pick.reason}:${pick.utilization_pct}>=${pick.threshold_pct}`,
)
return
}
if (pick.kind === 'none') {
await finalizeRun(ctx.runId, 'failed', 'no_target_available')
return
Expand Down
22 changes: 22 additions & 0 deletions hub/src/sessions/routing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { sql } from '../db/postgres.ts'
import { reserveSessionSlot } from './budget.ts'
import { isSupervisorOnline } from '../ws/supervisor-registry.ts'
import { listOnlineAgentSessionsForUser } from '../ws/registry.ts'
import { checkUserThreshold } from '../usage/threshold.ts'

// Recency threshold for "supervisor is online". Belt-and-braces with the
// in-memory registry check — guards against zombie rows whose WS closed
Expand All @@ -34,6 +35,13 @@ export type PickedTarget =
| { kind: 'supervisor'; supervisor_id: string; running: number; cap: number }
| { kind: 'local_agent'; agent_session_id: string }
| { kind: 'none'; reason: string }
| {
kind: 'quota_blocked'
reason: 'session_threshold' | 'week_threshold'
utilization_pct: number
threshold_pct: number
resets_at: string
}

interface SupervisorRow {
id: string
Expand All @@ -53,6 +61,20 @@ export async function pickSessionTarget(
): Promise<PickedTarget> {
const exclude = new Set(opts.excludeSupervisorIds ?? [])

// Step 0: Claude usage threshold gate. Blocks new dispatch when the user
// is over their configured 5h or 7d cap. In-flight runs are not killed —
// only NEW target picks are refused. See review §3.
const t = await checkUserThreshold(userId)
if (!t.allowed) {
return {
kind: 'quota_blocked',
reason: t.reason!,
utilization_pct: t.utilization_pct ?? 0,
threshold_pct: t.threshold_pct ?? 0,
resets_at: t.resets_at ?? '',
}
}

// Step 1: preferred supervisor.
const userRows = await sql<{ preferred_supervisor_id: string | null }[]>`
SELECT preferred_supervisor_id FROM users WHERE id = ${userId}
Expand Down
Loading
Loading