diff --git a/.gitignore b/.gitignore index ec5676d..c3b286f 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,8 @@ Cargo.lock.bak !/sql/pg_ask--0.5.4--0.5.5.sql !/sql/pg_ask--0.5.5--0.5.6.sql !/sql/pg_ask--0.5.6--0.5.7.sql +!/sql/pg_ask--0.5.7--0.5.8.sql +!/sql/pg_ask--0.5.8--0.5.9.sql /target-* # Packaging build outputs (deb/rpm/apk staging + artefacts) diff --git a/CHANGELOG.md b/CHANGELOG.md index a1eab02..534a962 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,201 @@ treats internal Rust modules as private regardless of `pub` visibility. Upgrade scripts ship as `sql/pg_ask----.sql` and run automatically under `ALTER EXTENSION pg_ask UPDATE`. +## [0.5.9] — 2026-06-08 — Async job queue (`ask.ask_async`) + +Adds an asynchronous job queue (ADR-0018) so a question can be submitted +without blocking the calling backend on the LLM round-trip. `ask.ask_async()` +enqueues a row to `ask._jobs` and returns a job id immediately; a background +worker runs the agent loop in its own backend and writes the answer back. +Upgrade with `ALTER EXTENSION pg_ask UPDATE TO '0.5.9';`. + +This is the only correct shape for async work in PostgreSQL: a backend is +single-threaded and SPI is not thread-safe, so "async" means handing the +work to a separate process (a background worker), never running it off-thread +in the caller. The synchronous `ask.ask()` path is unchanged. + +### Added + +- **`ask.ask_async(question[, kind])` → uuid**: enqueue a job and return its + id immediately (NULL when `pg_ask.jobs_enabled = off`, the default). + `kind` is `'ask'` (full agent loop) or `'sql'` (generate-only). +- **Result accessors** (owner-scoped, NotFound == Unauthorized collapse): + `ask.job_status(id)`, `ask.job_result(id)`, `ask.job_error(id)`, + `ask.cancel_job(id)`. +- **`ask.run_pending_jobs()`**: synchronous drain of up to + `pg_ask.jobs_batch` jobs in the current database, for installs without the + background worker (e.g. driven by `pg_cron`). Recovers orphans first. +- **`ask.prune_jobs(''[, batch])`**: batched retention of terminal + jobs; operator-only. +- **Background workers** (when loaded via `shared_preload_libraries`): a + launcher process discovers every pg_ask-enabled database and spawns one + dynamic per-database worker, which drains `ask._jobs` on a poll interval. + Re-reconciles so a `CREATE EXTENSION pg_ask` in a new database gets a + worker without a restart. +- **Durable state machine** (`ask._jobs.status`): pending → running → + done/failed, with retry (`pg_ask.jobs_max_attempts`) on transient failure + and orphan recovery (`pg_ask.jobs_orphan_timeout_ms`) so a crashed + worker's in-flight job is re-queued, never lost. Claims use + `FOR UPDATE SKIP LOCKED` so concurrent workers never collide. +- **GUCs**: `jobs_enabled` (off), `jobs_max_attempts` (3), + `jobs_orphan_timeout_ms` (300000), `jobs_batch` (10), + `jobs_poll_interval_ms` (5000). + +### Hardening (post-review) + +- **Real per-job durability**: the worker now commits the `claim` (running) + transition in its OWN transaction before the agent loop, then commits + `complete`/`fail` in another. A crash mid-loop leaves a committed `running` + row that orphan recovery reclaims — previously claim+execute+complete shared + one transaction, so `running` was never visible and orphan recovery was + effectively dead code. +- **Worker respawn**: the launcher keeps each worker's handle and checks + liveness (`pid()`) every reconcile, respawning a worker that died. The + previous grow-only "spawned" set meant a single worker crash stalled that + database's queue until a full instance restart. +- **Poison-pill fairness**: the synchronous `run_pending_jobs` drain tracks + ids attempted in the pass, so a permanently-failing job that re-queues + itself can't monopolise the batch and starve other pending jobs. +- **Privilege tiers**: the worker-path helpers (`_job_claim`, `_job_complete`, + `_job_fail`, `_job_recover_orphans`) and `ask.run_pending_jobs()` are now + operator-only (revoked from PUBLIC) since they act on jobs regardless of + owner; only the owner-scoped `ask.ask_async` / `ask.job_*` / `ask.cancel_job` + stay public. The background worker connects as superuser and is unaffected. + +### Hardening (six-model review pass) + +Findings from six parallel reviewer models, each empirically verified: + +- **Privilege isolation**: the worker now runs each job's agent loop under + the role that ENQUEUED it (`SET LOCAL ROLE` via `set_config`), so async + SQL/tool execution has exactly the caller's privileges — not the worker's + superuser rights. `_job_claim` returns the owner; the role is reset before + the trusted state-machine writes. +- **Double-execution race closed**: `_job_complete` / `_job_fail` / + `_job_release` now require `worker_pid = pg_backend_pid()`, so a + slow-but-alive worker whose job was orphan-recovered and re-claimed by + another worker can no longer clobber the new attempt with a stale result. +- **dblink conninfo bug**: the launcher's DB-discovery probe used + `quote_ident` (produces `dbname="MyDb"`, which libpq reads as a DB named + `"MyDb"` with the quotes and fails) — switched to `quote_literal` + (`dbname='MyDb'`). Without this, databases with uppercase/special names + were silently never given a worker. +- **Launcher restart leak**: on restart the launcher's old dynamic workers + keep running; it now checks `pg_stat_activity` for an existing + `pg_ask worker: {db}` before spawning (no duplicate per restart) and + terminates its workers on clean shutdown. +- **`run_pending_jobs` honours `jobs_enabled`**: returns 0 immediately and + skips orphan recovery when the queue is disabled (matching the worker and + its own docstring). +- **Orphan timeout default raised 5 min → 1 hour** so a slow-but-legitimate + job (up to `max_iterations * http_total_timeout_ms`) isn't falsely + reclaimed. +- **Indexes**: `_jobs_pending_idx` / `_jobs_running_idx` now lead with `db` + (multi-database claim/recovery scans), and a new partial + `_jobs_terminal_idx (finished_at) WHERE status IN (done,failed,cancelled)` + serves `prune_jobs`. +- **RLS on `ask._jobs`**: a direct `SELECT` is scoped to the caller's own + rows (`_jobs_owner_select`), matching `_traces`; the bgworker (superuser) + bypasses it. +- **Poison-pill drain**: a re-claimed retry is now `_job_release`d back to + `pending` instead of being left stuck in `running` until orphan recovery. +- **FIFO tie-break**: `_job_claim` orders by `(ts, id)` for deterministic + ordering of same-timestamp jobs. + +### Notes + +- A background worker cannot `LISTEN` (PostgreSQL restricts it to regular + client backends), so workers are poll-driven; the enqueue path still fires + `pg_notify('pg_ask_jobs', id)` for any external listener. +- SIGTERM is checked between jobs, so shutdown takes effect within at most + one job's runtime; a shutdown arriving mid-agent-loop still waits for that + job's LLM call to return or hit `http_total_timeout_ms` (a bgworker's + SIGTERM doesn't trip the agent loop's interrupt check). Keep + `http_total_timeout_ms` modest for a tight shutdown bound. +- Enable the worker by adding `pg_ask` to `shared_preload_libraries` and + setting `pg_ask.jobs_enabled = on`. Without the preload, async still works + via `ask.run_pending_jobs()`. +- The launcher's per-database worker is the clean-architecture seam: + claim/run/complete lives once in `src/jobs`, reused by both the worker and + the synchronous drain. + +## [0.5.8] — 2026-06-08 — Event outbox production hardening (`ask.emit`) + +Production-hardens the ADR-0017 event outbox without touching its +consumer-facing contract: the `pg_ask_events` channel, the `ask._outbox` +columns, the pending-row query (`WHERE processed_at IS NULL ORDER BY ts`), +and the `ask._outbox_emit(text,jsonb,text)` / `ask._outbox_mark_processed` +/ `ask.emit` signatures are all unchanged, so an existing +`LISTEN pg_ask_events` consumer keeps working across the upgrade. Upgrade +with `ALTER EXTENSION pg_ask UPDATE TO '0.5.8';`. + +### Security + +- **Bypass closed (`ask._outbox_emit`)**: validation, the `events_enabled` + switch, and the `pg_notify` wake-up now ALL live inside the SECURITY + DEFINER `ask._outbox_emit`, which is the single authority for writing an + event. Previously these lived only in the Rust `ask.emit` wrapper, so a + role with `EXECUTE` on the (PUBLIC-granted) helper could call it directly + to write a newline-laced event name, a multi-megabyte payload, or a row + while events were globally disabled — and without firing a NOTIFY. The + Rust layer is now pure defense-in-depth. + +### Added + +- **Input validation on `ask.emit`**, owned solely by `ask._outbox_emit` + (the single authority — the Rust layer does no size/charset checks, so + there is nothing to drift): event names must be non-empty, `<= 127` chars, + and match `[A-Za-z0-9][A-Za-z0-9._:-]*` (rejects whitespace / control + chars that could corrupt the durable log or a listener's routing). + `summary` is capped at 8192 **bytes** (`octet_length`, exact for + multi-byte text). Payload size is capped by the new + `pg_ask.events_max_payload_bytes` GUC (default 64 KiB; `0` disables), + measured as serialized-JSON bytes. Violations raise + `invalid_parameter_value` *before* any row is written. +- **Flood control** via two opt-in GUCs, both default `0` (off): + `pg_ask.events_max_per_minute` (per-`(emitter, event)` rate cap) and + `pg_ask.events_dedup_window_ms` (collapse identical + `(emitter, event, payload)` repeats, compared via `md5(payload::text)`). + Both are plain integers (e.g. `60000`), not unit-suffixed — the SQL writer + reads them via `current_setting()::int`. + A suppressed emit is a silent no-op returning `NULL` — it never raises, + so a trigger's transaction is never rolled back, and emits a + `RAISE DEBUG` line so operators can observe suppression under + `log_min_messages = debug1`. Checks run atomically inside + `ask._outbox_emit` under a transaction-scoped advisory lock to avoid a + check-then-insert race. The lock uses the two-argument + `pg_advisory_xact_lock(domain, key)` form (domain = `'pg_ask._outbox'`), + so it shares no key space with the int8 session lock and can never + collide across lock domains. +- **Indexes** for the new hot paths: `_outbox_rate_idx (emitter, event, ts)` + so the rate-limit / dedup checks never full-scan the outbox on emit, and + the partial `_outbox_processed_idx (processed_at) WHERE processed_at IS + NOT NULL` so retention prunes hit an index instead of a seq scan. +- **Retention**: `ask.prune_events(''[, batch_size])` (e.g. + `'30 days'`) plus the SECURITY DEFINER `ask._outbox_prune(interval, int)` + helper. Deletes only already-delivered rows (`processed_at IS NOT NULL`) + older than the interval, **in batches** (default 10000; `0` = single + DELETE) to bound each DELETE's per-statement memory, lock acquisition, + and dead-tuple churn. Note: as a plpgsql function the whole loop runs in + the caller's single transaction, so batching does NOT shorten lock + lifetime or split WAL across commits — for that, call it repeatedly from + separate transactions. Pending rows are never removed. Locked to + operators (not granted to PUBLIC). + +### Changed + +- `pg_notify` is fired inside `ask._outbox_emit` (was a separate Rust SPI + call) and is `pg_catalog`-qualified, so a hostile `search_path` cannot + shadow it and a direct helper call can't write a row without waking + listeners. +- Dedup now compares `md5(payload::text)` instead of the `jsonb =` operator + — stable for logically-equal payloads (jsonb normalizes key order / + whitespace before the text cast) and far cheaper than an equality scan + over large jsonb values. +- `ask.emit` doc-comments and SQL comments are now consumer-agnostic + ("an external `LISTEN pg_ask_events` consumer") rather than naming a + specific downstream; the orchestrator coupling lives only in ADR-0017. + ## [0.5.7] — 2026-06-06 — Security hardening pass A security / code-review pass closing several ways a low-privilege role diff --git a/Cargo.lock b/Cargo.lock index 5ccfb64..b1fcf87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1107,7 +1107,7 @@ dependencies = [ [[package]] name = "pg_ask" -version = "0.5.7" +version = "0.5.9" dependencies = [ "pgrx", "pgrx-tests", diff --git a/Cargo.toml b/Cargo.toml index 0bd8dd2..e17c338 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pg_ask" -version = "0.5.7" +version = "0.5.9" edition = "2021" rust-version = "1.82" # ^ Minimum Supported Rust Version. pgrx 0.18 requires 1.82+. diff --git a/Dockerfile b/Dockerfile index 393303a..89184b4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -90,11 +90,12 @@ RUN mkdir -p /staging/lib /staging/extension \ && cp /usr/share/postgresql/${PG_MAJOR}/extension/pg_ask.control \ /usr/share/postgresql/${PG_MAJOR}/extension/pg_ask*.sql \ /staging/extension/ \ - && (cp sql/pg_ask--0.5.5--0.5.6.sql /staging/extension/ 2>/dev/null || true) \ - && (cp sql/pg_ask--0.5.4--0.5.5.sql /staging/extension/ 2>/dev/null || true) \ - && (cp sql/pg_ask--0.5.3--0.5.4.sql /staging/extension/ 2>/dev/null || true) \ - && (cp sql/pg_ask--0.5.2--0.5.3.sql /staging/extension/ 2>/dev/null || true) \ - && (cp sql/pg_ask--0.5.1--0.5.2.sql /staging/extension/ 2>/dev/null || true) + && cp sql/pg_ask--*--*.sql /staging/extension/ 2>/dev/null || true +# Note: the cp above bundles EVERY hand-written upgrade script via glob +# (not a hardcoded list) so any older install can step through with +# ALTER EXTENSION UPDATE. The previous explicit list silently omitted new +# paths (e.g. 0.5.6->0.5.7, 0.5.7->0.5.8); globbing keeps this correct as +# new migrations land, matching the deb/apk/rpm packaging scripts. # ────────────────────────────────────────────────────────────────────────────── # Stage 2: runtime diff --git a/docker/initdb/01-configure.sh b/docker/initdb/01-configure.sh index 91b7178..0865eb1 100755 --- a/docker/initdb/01-configure.sh +++ b/docker/initdb/01-configure.sh @@ -24,6 +24,19 @@ run_sql() { configured=0 +# Async job launcher support: the background-worker launcher runs in the +# 'postgres' maintenance DB and uses dblink to discover which databases have +# pg_ask installed (so it only spawns workers where there is work). Install +# dblink there if available; harmless when the async queue is unused. Without +# it the launcher falls back to probing every database via a short-lived +# worker (noisier, but still correct). +if psql --username "$POSTGRES_USER" --dbname postgres -tAc \ + "SELECT 1 FROM pg_available_extensions WHERE name='dblink'" | grep -q 1; then + echo "[pg_ask] installing dblink in 'postgres' DB for the async job launcher" + psql --username "$POSTGRES_USER" --dbname postgres \ + -c "CREATE EXTENSION IF NOT EXISTS dblink;" || true +fi + if [ -n "${PG_ASK_PROVIDER:-}" ]; then echo "[pg_ask] setting provider = $PG_ASK_PROVIDER" run_sql "SELECT ask.config('provider', '$PG_ASK_PROVIDER');" diff --git a/pg_ask.control b/pg_ask.control index d9c9729..06a4599 100644 --- a/pg_ask.control +++ b/pg_ask.control @@ -1,5 +1,5 @@ comment = 'Agent-driven natural language interface for PostgreSQL' -default_version = '0.5.7' +default_version = '0.5.9' module_pathname = '$libdir/pg_ask' # Do NOT set `schema = 'ask'` here. pgrx already emits an explicit # `CREATE SCHEMA IF NOT EXISTS ask` plus fully-qualified `ask.*` diff --git a/sql/bootstrap.sql b/sql/bootstrap.sql index db8cffd..6d51820 100644 --- a/sql/bootstrap.sql +++ b/sql/bootstrap.sql @@ -278,10 +278,11 @@ CREATE INDEX IF NOT EXISTS _sql_audit_ts_idx ON ask._sql_audit (ts DESC); CREATE INDEX IF NOT EXISTS _sql_audit_caller_idx ON ask._sql_audit (caller, ts DESC); -- --------------------------------------------------------------------------- --- Event outbox (ADR-0017: pg_ask -> senti reverse notifications). +-- Event outbox (ADR-0017: in-database reverse notifications). -- -- `ask.emit(event, payload)` appends a durable row here and fires --- `pg_notify('pg_ask_events', )`. An external orchestrator (senti) +-- `pg_notify('pg_ask_events', )`. An external orchestrator (any +-- process holding a LISTEN pg_ask_events connection) -- LISTENs on that channel and drains unprocessed rows, marking -- `processed_at`. The durable table is the source of truth; NOTIFY is -- only a low-latency wake-up, so no event is lost if the listener is @@ -309,6 +310,112 @@ CREATE TABLE IF NOT EXISTS ask._outbox ( -- index keeps it tiny even when the table accumulates processed history. CREATE INDEX IF NOT EXISTS _outbox_pending_idx ON ask._outbox (ts) WHERE processed_at IS NULL; +-- Flood-control hot path (v0.5.8): the rate-limit and dedup checks in +-- ask._outbox_emit filter by (emitter, event, ts). Without this index those +-- guards degrade to a full scan of the whole outbox (including delivered +-- history) on every emit — turning the DoS protection into a DoS amplifier +-- on a large table. Cheap to maintain; only matters when a guard is on. +-- Not partial: the dedup/rate checks must see recent rows regardless of +-- processed status, so the index necessarily covers history too and grows +-- with the unpruned outbox — another reason to run ask.prune_events(). +CREATE INDEX IF NOT EXISTS _outbox_rate_idx + ON ask._outbox (emitter, event, ts); +-- Retention hot path (v0.5.8): ask._outbox_prune deletes delivered rows by +-- `processed_at < cutoff`. A partial index on the delivered rows serves the +-- prune scan directly (the pending partial index above has the opposite +-- predicate and can't help) and stays small — it only indexes rows that are +-- candidates for deletion. +CREATE INDEX IF NOT EXISTS _outbox_processed_idx + ON ask._outbox (processed_at) WHERE processed_at IS NOT NULL; + +-- --------------------------------------------------------------------------- +-- Async job queue (v0.5.9 / ADR-0018). +-- +-- `ask.ask_async(question)` enqueues a row here and returns its id +-- immediately, so the calling backend is never blocked on an LLM round-trip. +-- A background worker (or a manual ask.run_pending_jobs() / pg_cron call) +-- claims pending rows, runs the agent loop in its OWN backend, and writes +-- the answer back. This is the only correct shape for async work in +-- PostgreSQL: a backend is single-threaded and SPI is not thread-safe, so +-- "async" means "hand the work to a separate process", never "run it off- +-- thread in the caller". +-- +-- Durable state machine (status): +-- pending -> running -> done (success) +-- -> failed (terminal: attempts exhausted) +-- -> pending (retry: transient failure, attempts left) +-- pending/running -> cancelled (owner cancels) +-- +-- Crash safety: a worker that dies mid-job leaves the row in 'running' with +-- a stale `started_at`. ask._jobs_recover_orphans() (called by the worker on +-- startup and periodically) returns such rows to 'pending' once they exceed +-- pg_ask.jobs_orphan_timeout_ms, so no job is lost to a crash. The durable +-- table is the source of truth; pg_notify('pg_ask_jobs', id) is only a low- +-- latency wake-up for the worker, and pg_notify('pg_ask_jobs_done', id) the +-- same for a client awaiting the result. +-- +-- Like _outbox: readable by PUBLIC's owner-scoped views only via the helper +-- SRFs; all writes funnel through SECURITY DEFINER helpers that stamp +-- session_user as the owner. +CREATE TABLE IF NOT EXISTS ask._jobs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + ts timestamptz NOT NULL DEFAULT now(), + owner name NOT NULL DEFAULT session_user, + db name NOT NULL DEFAULT current_database(), + -- 'ask' | 'sql' — which agent mode to run (mirrors TraceKind). + kind text NOT NULL DEFAULT 'ask', + question text NOT NULL, + status text NOT NULL DEFAULT 'pending' + CHECK (status IN ('pending','running','done','failed','cancelled')), + attempts int NOT NULL DEFAULT 0, + -- Worker lifecycle stamps. + started_at timestamptz, + finished_at timestamptz, + -- Result columns, filled on completion. + answer text, + error text, + prompt_tokens bigint, + completion_tokens bigint, + -- Which backend PID claimed the job (diagnostics / orphan detection). + worker_pid int +); +-- Worker hot path: "oldest pending job in THIS database, FIFO". The claim +-- and orphan-recovery scans both also filter on db = current_database(), so +-- the partial indexes lead with `db` to give an exact index scan in +-- multi-database clusters (M3) instead of skipping over other DBs' rows. +-- Partial predicate keeps them tiny even as done/failed history grows. +CREATE INDEX IF NOT EXISTS _jobs_pending_idx + ON ask._jobs (db, ts) WHERE status = 'pending'; +-- Orphan recovery scans running rows by (db, started_at). +CREATE INDEX IF NOT EXISTS _jobs_running_idx + ON ask._jobs (db, started_at) WHERE status = 'running'; +-- Owner-scoped result lookups (ask.job_status / ask.job_result). +CREATE INDEX IF NOT EXISTS _jobs_owner_idx + ON ask._jobs (owner, ts); +-- Retention hot path (H4): ask._jobs_prune deletes terminal rows by +-- finished_at. A partial index on the terminal rows serves the prune scan +-- directly and stays small (mirrors _outbox_processed_idx). +CREATE INDEX IF NOT EXISTS _jobs_terminal_idx + ON ask._jobs (finished_at) WHERE status IN ('done','failed','cancelled'); + +-- Row-level security (M2): defence-in-depth so a direct SELECT on ask._jobs +-- only ever returns the caller's own jobs, matching the owner scoping the +-- job_status/job_result/job_error functions already enforce. Mirrors the +-- _traces RLS policy. The table owner (extension superuser) and the +-- background worker (connects as superuser) bypass RLS, so the worker drain +-- path is unaffected; only ordinary roles doing a direct SELECT are scoped. +ALTER TABLE ask._jobs ENABLE ROW LEVEL SECURITY; +DO $jobs_rls$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_policy WHERE polname = '_jobs_owner_select' + AND polrelid = 'ask._jobs'::regclass + ) THEN + CREATE POLICY _jobs_owner_select ON ask._jobs + FOR SELECT USING (owner = session_user); + END IF; +END +$jobs_rls$; -- --------------------------------------------------------------------------- -- SECURITY DEFINER helpers for internal-table writes. @@ -391,23 +498,228 @@ BEGIN END $$; --- Event outbox writer (ADR-0017). Appends one row and returns its id so --- the caller can fire pg_notify('pg_ask_events', id). SECURITY DEFINER so --- ordinary roles can emit without direct INSERT on ask._outbox; the row is --- stamped with session_user as the emitter (survives SECURITY DEFINER, see --- the _sql_audit_insert rationale above). +-- Event outbox writer (ADR-0017). The SINGLE authority for appending an +-- event: it validates input, honours the on/off switch, enforces flood +-- control, writes the durable row, AND fires the NOTIFY — all atomically in +-- one SECURITY DEFINER call. Returns the new id, or NULL when the emit was a +-- no-op (events disabled, or suppressed by a guard). +-- +-- Why everything lives here, not in the Rust caller (v0.5.8 B2 fix): this +-- function is GRANTed to PUBLIC so ordinary roles can emit without direct +-- INSERT on ask._outbox. If validation / the enabled-check / NOTIFY lived +-- only in ask.emit (the #[pg_extern]), a caller could bypass all of it by +-- invoking ask._outbox_emit() directly — writing a newline-laced event name, +-- a multi-megabyte payload, or a row while events are globally disabled. By +-- making the helper self-contained, BOTH entry points are protected and the +-- Rust layer is pure defense-in-depth (nicer error messages, early exit). +-- session_user survives SECURITY DEFINER, so the row is still billed to the +-- original connecting role (see _sql_audit_insert). +-- +-- Flood control. Two optional, GUC-driven guards run BEFORE the INSERT: +-- * pg_ask.events_max_per_minute — per-(emitter,event) rate cap. +-- * pg_ask.events_dedup_window_ms — collapse identical +-- (emitter,event,payload) repeats. +-- Both default to 0 (off). A suppressed emit returns NULL WITHOUT writing a +-- row and does NOT raise: emit runs inside the caller's (often a trigger's) +-- transaction, and an ERROR here would roll back the very INSERT/UPDATE that +-- fired it. Suppressions are surfaced via RAISE DEBUG so an operator can see +-- them with log_min_messages=debug1 without paying anything in production. +-- +-- Atomicity: a plain count-then-insert is racy under concurrency (two +-- backends both pass the cap before either commits, since uncommitted rows +-- aren't mutually visible). We serialize same-key emitters with a +-- transaction-scoped advisory lock keyed on (emitter,event); it is released +-- automatically at commit/rollback and is only taken when a guard is active, +-- so the unguarded fast path pays nothing. +-- +-- Validation is owned entirely here (the single authority); the Rust caller +-- does no size/charset checks, so there is nothing to drift out of sync. +-- event: non-empty, <= 127 chars, ^[A-Za-z0-9][A-Za-z0-9._:-]*$ (ASCII, so +-- char count == byte count). summary: <= 8192 BYTES (octet_length, so the +-- ceiling is exact for multi-byte text). payload: <= the +-- pg_ask.events_max_payload_bytes serialized-JSON bytes (0 disables). CREATE OR REPLACE FUNCTION ask._outbox_emit( event text, payload jsonb, summary text ) RETURNS uuid -LANGUAGE sql +LANGUAGE plpgsql SECURITY DEFINER SET search_path = pg_catalog, pg_temp AS $$ +DECLARE + enabled bool := COALESCE(current_setting('pg_ask.events_enabled', true)::bool, false); + max_per_min int := COALESCE(NULLIF(current_setting('pg_ask.events_max_per_minute', true), '')::int, 0); + dedup_ms int := COALESCE(NULLIF(current_setting('pg_ask.events_dedup_window_ms', true), '')::int, 0); + max_payload int := COALESCE(NULLIF(current_setting('pg_ask.events_max_payload_bytes', true), '')::int, 65536); + trimmed text := btrim(event); + norm_payload jsonb := COALESCE(payload, '{}'::jsonb); + payload_len int; + new_id uuid; +BEGIN + -- Global off switch (also enforced in Rust; duplicated so a direct + -- _outbox_emit call can't write while events are disabled). + IF NOT enabled THEN + RETURN NULL; + END IF; + + -- ---- Validation (caller bugs → hard error, before any write) -------- + IF trimmed IS NULL OR trimmed = '' THEN + RAISE EXCEPTION 'event name must not be empty' + USING ERRCODE = 'invalid_parameter_value'; + END IF; + IF length(trimmed) > 127 THEN + RAISE EXCEPTION 'event name must be <= 127 chars, got %', length(trimmed) + USING ERRCODE = 'invalid_parameter_value'; + END IF; + IF trimmed !~ '^[A-Za-z0-9][A-Za-z0-9._:-]*$' THEN + RAISE EXCEPTION 'event name must start alphanumeric and contain only [A-Za-z0-9._:-]' + USING ERRCODE = 'invalid_parameter_value'; + END IF; + IF summary IS NOT NULL AND octet_length(summary) > 8192 THEN + RAISE EXCEPTION 'summary must be <= 8192 bytes, got %', octet_length(summary) + USING ERRCODE = 'invalid_parameter_value'; + END IF; + IF max_payload > 0 THEN + payload_len := octet_length(norm_payload::text); + IF payload_len > max_payload THEN + RAISE EXCEPTION 'payload must be <= % bytes, got %', max_payload, payload_len + USING ERRCODE = 'invalid_parameter_value'; + END IF; + END IF; + + -- ---- Flood control -------------------------------------------------- + IF max_per_min > 0 OR dedup_ms > 0 THEN + -- Serialize same-key emitters so the checks below are not subject to + -- a check-then-insert race. Transaction-scoped advisory lock, freed + -- automatically at commit/rollback. + -- + -- Two-argument (int4, int4) form: the FIRST arg is a fixed domain + -- ('pg_ask._outbox') and the SECOND is the (emitter, event) key. + -- Postgres keeps the two-arg lock space wholly separate from the + -- one-arg int8 space, so this can never collide with the int8 + -- session lock in ask._session_lock_for_append. Within this domain a + -- collision only happens for the same (emitter, event) — exactly the + -- pairs we mean to serialize. + -- + -- hashtextextended returns bigint; the two-arg lock takes int4, so we + -- mask to the low 31 bits (& 0x7fffffff) to land in a non-negative + -- int4 instead of casting (which raises "integer out of range" for + -- hashes outside int4's range). + PERFORM pg_advisory_xact_lock( + (hashtextextended('pg_ask._outbox', 0) & 2147483647)::int, + (hashtextextended(session_user::text || '|' || trimmed, 0) & 2147483647)::int + ); + END IF; + + -- Dedup: identical (emitter,event,payload) within the window → no-op. + -- Compared via md5(payload::text) rather than the jsonb `=` operator: + -- jsonb normalizes (key order / whitespace) before text-casting, so the + -- hash is stable for logically-equal payloads, and hashing a short text + -- digest is far cheaper than an equality scan over large jsonb values. + IF dedup_ms > 0 THEN + IF EXISTS ( + SELECT 1 FROM ask._outbox o + WHERE o.emitter = session_user + AND o.event = trimmed + AND o.ts > now() - make_interval(secs => dedup_ms / 1000.0) + AND md5(o.payload::text) = md5(norm_payload::text) + ) THEN + RAISE DEBUG 'pg_ask: emit deduped (emitter=%, event=%)', session_user, trimmed; + RETURN NULL; + END IF; + END IF; + + -- Rate limit: per (emitter,event) over the last rolling minute. + IF max_per_min > 0 THEN + IF (SELECT count(*) FROM ask._outbox o + WHERE o.emitter = session_user + AND o.event = trimmed + AND o.ts > now() - interval '1 minute') >= max_per_min THEN + RAISE DEBUG 'pg_ask: emit rate-limited (emitter=%, event=%, cap=%/min)', + session_user, trimmed, max_per_min; + RETURN NULL; + END IF; + END IF; + + -- ---- Durable append + low-latency wake-up (atomic) ------------------ INSERT INTO ask._outbox (emitter, event, payload, summary) - VALUES (session_user, event, COALESCE(payload, '{}'::jsonb), summary) - RETURNING id; + VALUES (session_user, trimmed, norm_payload, summary) + RETURNING id INTO new_id; + + -- NOTIFY carries only the id (pg_notify's payload is 8 KB capped); the + -- listener reads the full row from the outbox. Fired here so a direct + -- _outbox_emit caller can't write a row without waking listeners. + PERFORM pg_notify('pg_ask_events', new_id::text); + + RETURN new_id; +END +$$; + +-- Retention helper (v0.5.8). Deletes outbox rows that have ALREADY been +-- delivered (processed_at IS NOT NULL) and are older than `older_than`. +-- Pending rows are never touched, so a slow/offline consumer can't lose +-- undelivered events to a prune. Returns the number of rows removed. +-- +-- Batched (H4): the first prune on a long-neglected outbox could otherwise +-- delete millions of rows in ONE statement. We instead delete in chunks of +-- `batch_size`, which bounds the size of each individual DELETE — capping +-- per-statement memory, lock acquisition, and dead-tuple churn, and letting +-- autovacuum interleave more easily. +-- +-- HONEST LIMITATION: this is a plpgsql function, whose whole body runs in +-- the caller's single transaction. The loop therefore does NOT commit +-- between batches — all chunks become durable together at the outer COMMIT, +-- so total WAL volume and the lifetime of the accumulated row locks are the +-- same as one big DELETE. To get per-batch commits (genuinely shorter locks +-- and incremental WAL flush) the caller must invoke this repeatedly from +-- separate transactions, or a future version must expose a PROCEDURE that +-- can COMMIT mid-loop. The batching here is still worthwhile for the +-- per-statement bounds above. `batch_size <= 0` falls back to a single +-- unbounded DELETE. +-- SECURITY DEFINER so an operator/maintenance role can prune without owning +-- the table; exposed to callers via ask.prune_events(interval, int). +CREATE OR REPLACE FUNCTION ask._outbox_prune( + older_than interval, + batch_size int DEFAULT 10000 +) RETURNS bigint +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + total bigint := 0; + removed bigint; +BEGIN + IF batch_size IS NULL OR batch_size <= 0 THEN + WITH del AS ( + DELETE FROM ask._outbox + WHERE processed_at IS NOT NULL + AND processed_at < now() - older_than + RETURNING 1 + ) + SELECT count(*) INTO total FROM del; + RETURN total; + END IF; + + LOOP + WITH cand AS ( + SELECT id FROM ask._outbox + WHERE processed_at IS NOT NULL + AND processed_at < now() - older_than + LIMIT batch_size + ), del AS ( + DELETE FROM ask._outbox o + USING cand + WHERE o.id = cand.id + RETURNING 1 + ) + SELECT count(*) INTO removed FROM del; + total := total + removed; + EXIT WHEN removed = 0; + END LOOP; + RETURN total; +END $$; -- Consumer-side: stamp a delivered row as processed. Idempotent (only @@ -415,8 +727,8 @@ $$; -- listener can tell a fresh delivery from a duplicate wake-up. SECURITY -- DEFINER because the designated consumer role need not own the table. -- --- Deliberately NOT filtered by emitter: the consumer (e.g. a senti --- listener) almost always connects as a DIFFERENT role than the one that +-- Deliberately NOT filtered by emitter: the consumer (a LISTEN +-- pg_ask_events drainer) almost always connects as a DIFFERENT role than the one that -- emitted the event (a trigger fires as the app role; the listener uses a -- dedicated reader DSN). An `emitter = session_user` filter would make the -- consumer unable to stamp those rows, causing infinite re-delivery. The @@ -443,6 +755,286 @@ AS $$ SELECT EXISTS (SELECT 1 FROM upd); $$; +-- =========================================================================== +-- Async job queue helpers (v0.5.9 / ADR-0018). All SECURITY DEFINER so an +-- ordinary role can enqueue/inspect its own jobs without direct DML on +-- ask._jobs; ownership is stamped + enforced from session_user inside each +-- body (survives SECURITY DEFINER, see _sql_audit_insert rationale). +-- =========================================================================== + +-- Enqueue a job. Honours pg_ask.jobs_enabled (no-op returning NULL when off, +-- mirroring ask.emit) so an install that doesn't use async pays nothing. +-- Validates kind + non-empty question. Fires pg_notify('pg_ask_jobs', id) so +-- a listening worker wakes immediately; the durable row is the source of +-- truth if no worker is up yet. +CREATE OR REPLACE FUNCTION ask._job_submit( + kind text, + question text +) RETURNS uuid +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + enabled bool := COALESCE(current_setting('pg_ask.jobs_enabled', true)::bool, false); + norm text := btrim(question); + new_id uuid; +BEGIN + IF NOT enabled THEN + RETURN NULL; + END IF; + IF kind IS NULL OR kind NOT IN ('ask','sql') THEN + RAISE EXCEPTION 'job kind must be ''ask'' or ''sql'', got %', kind + USING ERRCODE = 'invalid_parameter_value'; + END IF; + IF norm IS NULL OR norm = '' THEN + RAISE EXCEPTION 'job question must not be empty' + USING ERRCODE = 'invalid_parameter_value'; + END IF; + INSERT INTO ask._jobs (owner, kind, question) + VALUES (session_user, kind, norm) + RETURNING id INTO new_id; + PERFORM pg_notify('pg_ask_jobs', new_id::text); + RETURN new_id; +END +$$; + +-- Claim the oldest pending job for THIS database, atomically. Uses +-- FOR UPDATE SKIP LOCKED so concurrent workers never claim the same row and +-- never block each other. Flips pending -> running, stamps started_at / +-- worker_pid, bumps attempts. Returns the full job row (or no row when the +-- queue is empty). Scoped to current_database() so a worker only ever runs +-- jobs from the DB it is connected to (a bgworker binds to one DB). +CREATE OR REPLACE FUNCTION ask._job_claim() +RETURNS TABLE (id uuid, kind text, question text, attempts int, owner name) +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +BEGIN + RETURN QUERY + WITH next AS ( + SELECT j.id FROM ask._jobs j + WHERE j.status = 'pending' + AND j.db = current_database() + ORDER BY j.ts, j.id + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + UPDATE ask._jobs j + SET status = 'running', + started_at = now(), + finished_at = NULL, + worker_pid = pg_backend_pid(), + attempts = j.attempts + 1 + FROM next + WHERE j.id = next.id + RETURNING j.id, j.kind, j.question, j.attempts, j.owner; +END +$$; + +-- Mark a claimed job done with its answer + token usage. Only transitions a +-- row that is still 'running' (a cancel mid-flight wins). Fires +-- pg_notify('pg_ask_jobs_done', id) so a waiting client wakes. +CREATE OR REPLACE FUNCTION ask._job_complete( + job_id uuid, + p_answer text, + p_prompt bigint, + p_completion bigint +) RETURNS boolean +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + n int; +BEGIN + -- The worker_pid guard (B1) makes completion claim-scoped: only the + -- backend that currently owns the running row can finish it. Without it, + -- a slow-but-alive worker A whose job was orphan-recovered and re-claimed + -- by worker B could complete B's fresh attempt with A's stale answer + -- (double-execution / wrong result). + UPDATE ask._jobs + SET status = 'done', + answer = p_answer, + error = NULL, + prompt_tokens = p_prompt, + completion_tokens = p_completion, + finished_at = now() + WHERE id = job_id AND status = 'running' AND worker_pid = pg_backend_pid(); + GET DIAGNOSTICS n = ROW_COUNT; + IF n > 0 THEN + PERFORM pg_notify('pg_ask_jobs_done', job_id::text); + END IF; + RETURN n > 0; +END +$$; + +-- Mark a claimed job failed. If attempts remain (< max_attempts) the job is +-- returned to 'pending' for retry; otherwise it is terminal 'failed'. Only +-- acts on a 'running' row. `max_attempts` is passed by the caller (read from +-- the GUC in Rust) so the policy lives in one place. Notifies done-channel +-- only on terminal failure so a client awaiting a result isn't woken for a +-- transient retry. +CREATE OR REPLACE FUNCTION ask._job_fail( + job_id uuid, + p_error text, + max_attempts int +) RETURNS text +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + cur_attempts int; + new_status text; +BEGIN + -- worker_pid guard (B1): only the backend that owns the running claim may + -- fail it, so a re-claimed job isn't failed/retried by a ghost worker. + SELECT attempts INTO cur_attempts + FROM ask._jobs + WHERE id = job_id AND status = 'running' AND worker_pid = pg_backend_pid() + FOR UPDATE; + IF cur_attempts IS NULL THEN + RETURN NULL; -- not ours / not running (done/cancelled/re-claimed); no-op + END IF; + IF cur_attempts >= max_attempts THEN + new_status := 'failed'; + UPDATE ask._jobs + SET status = 'failed', error = p_error, finished_at = now() + WHERE id = job_id; + PERFORM pg_notify('pg_ask_jobs_done', job_id::text); + ELSE + new_status := 'pending'; + UPDATE ask._jobs + SET status = 'pending', error = p_error, + started_at = NULL, worker_pid = NULL + WHERE id = job_id; + PERFORM pg_notify('pg_ask_jobs', job_id::text); -- re-wake a worker + END IF; + RETURN new_status; +END +$$; + +-- Crash recovery: return 'running' jobs whose started_at is older than +-- `timeout_ms` back to 'pending' (their worker presumably died). Scoped to +-- current_database(). Returns the number of jobs recovered. Idempotent. +CREATE OR REPLACE FUNCTION ask._job_recover_orphans( + timeout_ms int +) RETURNS bigint +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + n bigint; +BEGIN + WITH revived AS ( + UPDATE ask._jobs + SET status = 'pending', started_at = NULL, worker_pid = NULL + WHERE status = 'running' + AND db = current_database() + AND started_at < now() - make_interval(secs => timeout_ms / 1000.0) + RETURNING id + ) + SELECT count(*) INTO n FROM revived; + RETURN n; +END +$$; + +-- Owner-scoped cancel. A job that is pending or running flips to +-- 'cancelled'; the completion helpers refuse to transition a non-running +-- row, so an in-flight worker's result is discarded. Returns true if it +-- changed anything. Filtered by owner = session_user so a role can only +-- cancel its own jobs. +CREATE OR REPLACE FUNCTION ask._job_cancel( + job_id uuid +) RETURNS boolean +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + n int; +BEGIN + UPDATE ask._jobs + SET status = 'cancelled', finished_at = now() + WHERE id = job_id + AND owner = session_user + AND status IN ('pending','running'); + GET DIAGNOSTICS n = ROW_COUNT; + RETURN n > 0; +END +$$; + +-- Release a running job back to 'pending' WITHOUT touching attempts — used by +-- the synchronous drain (M1) when it re-claims a job it already ran this +-- pass (a retry it just re-queued). Returns it to pending so the row doesn't +-- sit in 'running' until orphan recovery. PID-scoped like the completion +-- helpers so it only releases our own claim. +CREATE OR REPLACE FUNCTION ask._job_release( + job_id uuid +) RETURNS boolean +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + n int; +BEGIN + UPDATE ask._jobs + SET status = 'pending', started_at = NULL, worker_pid = NULL + WHERE id = job_id AND status = 'running' AND worker_pid = pg_backend_pid(); + GET DIAGNOSTICS n = ROW_COUNT; + RETURN n > 0; +END +$$; + +-- Delete terminal (done/failed/cancelled) jobs older than `older_than`, +-- in batches — same retention pattern as ask._outbox_prune. Pending/running +-- jobs are never touched. Operator-only. +CREATE OR REPLACE FUNCTION ask._jobs_prune( + older_than interval, + batch_size int DEFAULT 10000 +) RETURNS bigint +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + total bigint := 0; + removed bigint; +BEGIN + IF batch_size IS NULL OR batch_size <= 0 THEN + WITH del AS ( + DELETE FROM ask._jobs + WHERE status IN ('done','failed','cancelled') + AND finished_at < now() - older_than + RETURNING 1 + ) + SELECT count(*) INTO total FROM del; + RETURN total; + END IF; + LOOP + WITH cand AS ( + SELECT id FROM ask._jobs + WHERE status IN ('done','failed','cancelled') + AND finished_at < now() - older_than + LIMIT batch_size + ), del AS ( + DELETE FROM ask._jobs j USING cand + WHERE j.id = cand.id + RETURNING 1 + ) + SELECT count(*) INTO removed FROM del; + total := total + removed; + EXIT WHEN removed = 0; + END LOOP; + RETURN total; +END +$$; + + -- Insert a memory row owned by the calling role. The Rust caller passes -- the embedding as a text-encoded vector literal so we don't have to -- mention the pgvector type in this function's signature — keeps the @@ -715,6 +1307,7 @@ GRANT SELECT ON ask._traces TO PUBLIC; GRANT SELECT ON ask._tools TO PUBLIC; GRANT SELECT ON ask._sql_audit TO PUBLIC; GRANT SELECT ON ask._outbox TO PUBLIC; +GRANT SELECT ON ask._jobs TO PUBLIC; DO $grant_memory_select$ BEGIN IF EXISTS (SELECT 1 FROM pg_class @@ -733,6 +1326,21 @@ REVOKE ALL ON FUNCTION ask._sql_audit_insert(text, int, bool, text) REVOKE ALL ON FUNCTION ask._sql_audit_finish(uuid, bigint, text) FROM PUBLIC; REVOKE ALL ON FUNCTION ask._outbox_emit(text, jsonb, text) FROM PUBLIC; REVOKE ALL ON FUNCTION ask._outbox_mark_processed(uuid) FROM PUBLIC; +-- _outbox_prune deletes delivered rows; kept operator-only (like the config +-- surface). NOT granted to PUBLIC — operators grant it to a maintenance role +-- after CREATE EXTENSION. See finalize.sql for the matching ask.prune_events. +REVOKE ALL ON FUNCTION ask._outbox_prune(interval, int) FROM PUBLIC; +-- Async job helpers: enqueue / claim / complete / fail / recover / cancel are +-- owner-scoped inside their bodies, so EXECUTE to PUBLIC is safe. _jobs_prune +-- is destructive → operator-only (like _outbox_prune). +REVOKE ALL ON FUNCTION ask._job_submit(text, text) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._job_claim() FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._job_complete(uuid, text, bigint, bigint) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._job_fail(uuid, text, int) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._job_recover_orphans(int) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._job_release(uuid) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._job_cancel(uuid) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._jobs_prune(interval, int) FROM PUBLIC; REVOKE ALL ON FUNCTION ask._memory_insert(text, text, jsonb, text) FROM PUBLIC; REVOKE ALL ON FUNCTION ask._memory_delete_owned(uuid) FROM PUBLIC; REVOKE ALL ON FUNCTION ask._tool_register(text, jsonb, text) FROM PUBLIC; @@ -764,6 +1372,21 @@ GRANT EXECUTE ON FUNCTION ask._session_append_message(uuid, text, text, text, te GRANT EXECUTE ON FUNCTION ask._session_touch(uuid) TO PUBLIC; GRANT EXECUTE ON FUNCTION ask._session_clear_messages(uuid) TO PUBLIC; GRANT EXECUTE ON FUNCTION ask._config_get(text) TO PUBLIC; +-- Async job helpers. Two visibility tiers: +-- +-- * User-facing, owner-scoped (safe for PUBLIC): _job_submit stamps +-- session_user as owner; _job_cancel filters by owner = session_user. +-- These back ask.ask_async / ask.cancel_job. +-- * Worker-path, NOT owner-filtered (operator-only): _job_claim, +-- _job_complete, _job_fail, _job_recover_orphans run jobs regardless of +-- who enqueued them — a malicious role could otherwise claim/complete/ +-- fail another tenant's job by calling them directly. They are NOT +-- granted to PUBLIC. The background worker connects as a superuser and +-- is unaffected; ask.run_pending_jobs() (also operator-only) is the only +-- other caller. An operator who wants a dedicated drain role grants +-- EXECUTE on these four to it. See docs/SECURITY.md. +GRANT EXECUTE ON FUNCTION ask._job_submit(text, text) TO PUBLIC; +GRANT EXECUTE ON FUNCTION ask._job_cancel(uuid) TO PUBLIC; -- Config-surface lockdown (C6) lives in a finalize SQL block in -- `sql/finalize.sql` because pgrx emits the #[pg_extern] config diff --git a/sql/finalize.sql b/sql/finalize.sql index 35173f9..d447faa 100644 --- a/sql/finalize.sql +++ b/sql/finalize.sql @@ -26,3 +26,44 @@ -- after CREATE EXTENSION. REVOKE ALL ON FUNCTION ask.config(text, text) FROM PUBLIC; REVOKE ALL ON FUNCTION ask.get_config(text) FROM PUBLIC; + +-- --------------------------------------------------------------------------- +-- ask.prune_events(interval-text) deletes delivered outbox rows. It is a +-- destructive maintenance operation, so — like the config surface — it is +-- locked to operators rather than left at pgrx's default EXECUTE TO PUBLIC. +-- Grant it explicitly to whatever role runs your retention job: +-- GRANT EXECUTE ON FUNCTION ask.prune_events(text, int) TO maintenance_role; +-- --------------------------------------------------------------------------- +REVOKE ALL ON FUNCTION ask.prune_events(text, int) FROM PUBLIC; + +-- --------------------------------------------------------------------------- +-- ask.prune_jobs(interval-text, int) deletes terminal async jobs. Same +-- destructive-maintenance rationale as prune_events → operator-only. +-- GRANT EXECUTE ON FUNCTION ask.prune_jobs(text, int) TO maintenance_role; +-- --------------------------------------------------------------------------- +REVOKE ALL ON FUNCTION ask.prune_jobs(text, int) FROM PUBLIC; + +-- --------------------------------------------------------------------------- +-- ask.run_pending_jobs() drains the async queue synchronously: it claims and +-- runs jobs regardless of who enqueued them (it calls the operator-only +-- worker-path helpers). That cross-owner reach makes it an operator action, +-- not something to expose to every role — so it is locked down alongside the +-- worker-path helpers it depends on. Grant it to your pg_cron / maintenance +-- role explicitly: +-- GRANT EXECUTE ON FUNCTION ask.run_pending_jobs() TO maintenance_role; +-- The background worker (connects as superuser) does not need this grant. +-- +-- IMPORTANT (H2): run_pending_jobs() is SECURITY INVOKER and internally calls +-- the operator-only worker-path helpers (_job_claim / _job_complete / +-- _job_fail / _job_recover_orphans / _job_release). A NON-superuser +-- maintenance role therefore needs EXECUTE on those too, or it will get +-- "permission denied" mid-drain. Grant the full set: +-- GRANT EXECUTE ON FUNCTION ask._job_claim(), +-- ask._job_complete(uuid, text, bigint, bigint), +-- ask._job_fail(uuid, text, int), +-- ask._job_recover_orphans(int), +-- ask._job_release(uuid) +-- TO maintenance_role; +-- A superuser cron role needs none of these grants. +-- --------------------------------------------------------------------------- +REVOKE ALL ON FUNCTION ask.run_pending_jobs() FROM PUBLIC; diff --git a/sql/pg_ask--0.5.7--0.5.8.sql b/sql/pg_ask--0.5.7--0.5.8.sql new file mode 100644 index 0000000..2c79cae --- /dev/null +++ b/sql/pg_ask--0.5.7--0.5.8.sql @@ -0,0 +1,193 @@ +-- pg_ask 0.5.7 → 0.5.8 upgrade. +-- +-- Production-hardens the event outbox (ADR-0017) without changing the +-- consumer-facing contract: the `pg_ask_events` channel, the `ask._outbox` +-- columns, the pending-row query (`WHERE processed_at IS NULL ORDER BY ts`), +-- and the `ask._outbox_emit(text,jsonb,text)` / `ask._outbox_mark_processed` +-- / `ask.emit` signatures are all unchanged, so a LISTEN pg_ask_events +-- consumer keeps working across the upgrade. +-- +-- Changes: +-- 1. New (emitter, event, ts) index so the flood-control checks in +-- ask._outbox_emit don't full-scan the outbox on every emit. +-- 2. ask._outbox_emit becomes the single authority: it re-checks +-- events_enabled, validates input (event name regex/length, summary +-- length, payload bytes), enforces the rate-limit / dedup guards, +-- INSERTs, AND fires pg_notify — all atomically. This closes the bypass +-- where calling _outbox_emit directly skipped the Rust-side checks. +-- Suppressed emits return NULL (silent no-op) and never raise; caller +-- bugs (bad name / oversized payload) raise invalid_parameter_value. +-- Dedup compares md5(payload::text) instead of jsonb equality. +-- 3. New ask._outbox_prune(interval, int) + ask.prune_events(text, int) +-- retention helper. Deletes ONLY already-delivered rows older than the +-- interval, in batches, so the first prune of a neglected outbox isn't +-- one giant transaction. Pending rows are never removed. +-- 4. New GUCs (events_max_payload_bytes, events_max_per_minute, +-- events_dedup_window_ms) ship in the new .so. +-- +-- Idempotent: CREATE OR REPLACE preserves grants/ownership; re-running is +-- safe. + +-- ── Flood-control + retention indexes ─────────────────────────────────────── +CREATE INDEX IF NOT EXISTS _outbox_rate_idx + ON ask._outbox (emitter, event, ts); +CREATE INDEX IF NOT EXISTS _outbox_processed_idx + ON ask._outbox (processed_at) WHERE processed_at IS NOT NULL; + +-- ── Single-authority writer: validation + enabled + flood control + NOTIFY ── +CREATE OR REPLACE FUNCTION ask._outbox_emit( + event text, + payload jsonb, + summary text +) RETURNS uuid +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + enabled bool := COALESCE(current_setting('pg_ask.events_enabled', true)::bool, false); + max_per_min int := COALESCE(NULLIF(current_setting('pg_ask.events_max_per_minute', true), '')::int, 0); + dedup_ms int := COALESCE(NULLIF(current_setting('pg_ask.events_dedup_window_ms', true), '')::int, 0); + max_payload int := COALESCE(NULLIF(current_setting('pg_ask.events_max_payload_bytes', true), '')::int, 65536); + trimmed text := btrim(event); + norm_payload jsonb := COALESCE(payload, '{}'::jsonb); + payload_len int; + new_id uuid; +BEGIN + IF NOT enabled THEN + RETURN NULL; + END IF; + + IF trimmed IS NULL OR trimmed = '' THEN + RAISE EXCEPTION 'event name must not be empty' + USING ERRCODE = 'invalid_parameter_value'; + END IF; + IF length(trimmed) > 127 THEN + RAISE EXCEPTION 'event name must be <= 127 chars, got %', length(trimmed) + USING ERRCODE = 'invalid_parameter_value'; + END IF; + IF trimmed !~ '^[A-Za-z0-9][A-Za-z0-9._:-]*$' THEN + RAISE EXCEPTION 'event name must start alphanumeric and contain only [A-Za-z0-9._:-]' + USING ERRCODE = 'invalid_parameter_value'; + END IF; + IF summary IS NOT NULL AND octet_length(summary) > 8192 THEN + RAISE EXCEPTION 'summary must be <= 8192 bytes, got %', octet_length(summary) + USING ERRCODE = 'invalid_parameter_value'; + END IF; + IF max_payload > 0 THEN + payload_len := octet_length(norm_payload::text); + IF payload_len > max_payload THEN + RAISE EXCEPTION 'payload must be <= % bytes, got %', max_payload, payload_len + USING ERRCODE = 'invalid_parameter_value'; + END IF; + END IF; + + IF max_per_min > 0 OR dedup_ms > 0 THEN + PERFORM pg_advisory_xact_lock( + (hashtextextended('pg_ask._outbox', 0) & 2147483647)::int, + (hashtextextended(session_user::text || '|' || trimmed, 0) & 2147483647)::int + ); + END IF; + + IF dedup_ms > 0 THEN + IF EXISTS ( + SELECT 1 FROM ask._outbox o + WHERE o.emitter = session_user + AND o.event = trimmed + AND o.ts > now() - make_interval(secs => dedup_ms / 1000.0) + AND md5(o.payload::text) = md5(norm_payload::text) + ) THEN + RAISE DEBUG 'pg_ask: emit deduped (emitter=%, event=%)', session_user, trimmed; + RETURN NULL; + END IF; + END IF; + + IF max_per_min > 0 THEN + IF (SELECT count(*) FROM ask._outbox o + WHERE o.emitter = session_user + AND o.event = trimmed + AND o.ts > now() - interval '1 minute') >= max_per_min THEN + RAISE DEBUG 'pg_ask: emit rate-limited (emitter=%, event=%, cap=%/min)', + session_user, trimmed, max_per_min; + RETURN NULL; + END IF; + END IF; + + INSERT INTO ask._outbox (emitter, event, payload, summary) + VALUES (session_user, trimmed, norm_payload, summary) + RETURNING id INTO new_id; + + PERFORM pg_notify('pg_ask_events', new_id::text); + + RETURN new_id; +END +$$; + +-- ── Batched retention helper ──────────────────────────────────────────────── +CREATE OR REPLACE FUNCTION ask._outbox_prune( + older_than interval, + batch_size int DEFAULT 10000 +) RETURNS bigint +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + total bigint := 0; + removed bigint; +BEGIN + IF batch_size IS NULL OR batch_size <= 0 THEN + WITH del AS ( + DELETE FROM ask._outbox + WHERE processed_at IS NOT NULL + AND processed_at < now() - older_than + RETURNING 1 + ) + SELECT count(*) INTO total FROM del; + RETURN total; + END IF; + + LOOP + WITH cand AS ( + SELECT id FROM ask._outbox + WHERE processed_at IS NOT NULL + AND processed_at < now() - older_than + LIMIT batch_size + ), del AS ( + DELETE FROM ask._outbox o + USING cand + WHERE o.id = cand.id + RETURNING 1 + ) + SELECT count(*) INTO removed FROM del; + total := total + removed; + EXIT WHEN removed = 0; + END LOOP; + RETURN total; +END +$$; + +-- ── Grants ────────────────────────────────────────────────────────────────── +-- _outbox_emit keeps its existing PUBLIC EXECUTE (CREATE OR REPLACE preserved +-- it). _outbox_prune is destructive → operator-only. +REVOKE ALL ON FUNCTION ask._outbox_prune(interval, int) FROM PUBLIC; + +-- ask.prune_events(text, int) is a NEW #[pg_extern] in 0.5.8. Unlike the SQL +-- helpers above (which we CREATE OR REPLACE by hand), pgrx does NOT emit +-- CREATE FUNCTION DDL for #[pg_extern]s into an ALTER EXTENSION UPDATE script +-- — that DDL only lands in the base-install pg_ask--.sql. So a new +-- C-language entry point must be created here explicitly, or it is simply +-- missing after an upgrade. This mirrors the pgrx-generated definition in +-- pg_ask--0.5.8.sql (LANGUAGE c, MODULE_PATHNAME, prune_events_wrapper). +CREATE OR REPLACE FUNCTION ask."prune_events"( + "older_than" text, + "batch_size" int DEFAULT 10000 +) RETURNS bigint +STRICT VOLATILE PARALLEL UNSAFE +LANGUAGE c +AS 'MODULE_PATHNAME', 'prune_events_wrapper'; + +-- Destructive (deletes outbox rows), so lock it to operators rather than +-- leaving pgrx's default EXECUTE TO PUBLIC. The finalize step that does this +-- on fresh installs does not run on ALTER EXTENSION UPDATE, so we repeat it. +REVOKE ALL ON FUNCTION ask.prune_events(text, int) FROM PUBLIC; diff --git a/sql/pg_ask--0.5.8--0.5.9.sql b/sql/pg_ask--0.5.8--0.5.9.sql new file mode 100644 index 0000000..f340395 --- /dev/null +++ b/sql/pg_ask--0.5.8--0.5.9.sql @@ -0,0 +1,426 @@ +-- pg_ask 0.5.8 → 0.5.9 upgrade. +-- +-- Adds the async job queue (ADR-0018): ask.ask_async() enqueues a question +-- to ask._jobs and returns immediately; a background worker (or a manual +-- ask.run_pending_jobs() / pg_cron call) runs the agent loop in its own +-- backend and writes the answer back. This is the only correct shape for +-- async work in PostgreSQL — a backend is single-threaded and SPI is not +-- thread-safe, so "async" means handing the work to a separate process. +-- +-- Everything here is additive (new table, indexes, helpers, #[pg_extern]s, +-- GUCs in the new .so) and idempotent. Existing surface is untouched, so +-- nothing breaks across the upgrade. +-- +-- Function bodies below are character-identical to sql/bootstrap.sql (verified +-- by diff in CI); keep the two in sync. + +-- ── Job queue table + indexes ─────────────────────────────────────────────── +CREATE TABLE IF NOT EXISTS ask._jobs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + ts timestamptz NOT NULL DEFAULT now(), + owner name NOT NULL DEFAULT session_user, + db name NOT NULL DEFAULT current_database(), + kind text NOT NULL DEFAULT 'ask', + question text NOT NULL, + status text NOT NULL DEFAULT 'pending' + CHECK (status IN ('pending','running','done','failed','cancelled')), + attempts int NOT NULL DEFAULT 0, + started_at timestamptz, + finished_at timestamptz, + answer text, + error text, + prompt_tokens bigint, + completion_tokens bigint, + worker_pid int +); +-- Drop-and-recreate the hot-path indexes so any install that created the +-- earlier (ts)/(started_at) forms picks up the (db, ...) leading column +-- (CREATE INDEX IF NOT EXISTS alone won't alter an existing index). +DROP INDEX IF EXISTS ask._jobs_pending_idx; +DROP INDEX IF EXISTS ask._jobs_running_idx; +CREATE INDEX IF NOT EXISTS _jobs_pending_idx + ON ask._jobs (db, ts) WHERE status = 'pending'; +CREATE INDEX IF NOT EXISTS _jobs_running_idx + ON ask._jobs (db, started_at) WHERE status = 'running'; +CREATE INDEX IF NOT EXISTS _jobs_owner_idx + ON ask._jobs (owner, ts); +CREATE INDEX IF NOT EXISTS _jobs_terminal_idx + ON ask._jobs (finished_at) WHERE status IN ('done','failed','cancelled'); + +-- Row-level security (mirrors bootstrap.sql): a direct SELECT on ask._jobs is +-- scoped to the caller's own rows. Superuser (table owner, bgworker) bypasses. +ALTER TABLE ask._jobs ENABLE ROW LEVEL SECURITY; +DO $jobs_rls$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_policy WHERE polname = '_jobs_owner_select' + AND polrelid = 'ask._jobs'::regclass + ) THEN + CREATE POLICY _jobs_owner_select ON ask._jobs + FOR SELECT USING (owner = session_user); + END IF; +END +$jobs_rls$; + +-- ── SECURITY DEFINER state-machine helpers ────────────────────────────────── +-- (bodies mirror sql/bootstrap.sql) + +CREATE OR REPLACE FUNCTION ask._job_submit( + kind text, + question text +) RETURNS uuid +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + enabled bool := COALESCE(current_setting('pg_ask.jobs_enabled', true)::bool, false); + norm text := btrim(question); + new_id uuid; +BEGIN + IF NOT enabled THEN + RETURN NULL; + END IF; + IF kind IS NULL OR kind NOT IN ('ask','sql') THEN + RAISE EXCEPTION 'job kind must be ''ask'' or ''sql'', got %', kind + USING ERRCODE = 'invalid_parameter_value'; + END IF; + IF norm IS NULL OR norm = '' THEN + RAISE EXCEPTION 'job question must not be empty' + USING ERRCODE = 'invalid_parameter_value'; + END IF; + INSERT INTO ask._jobs (owner, kind, question) + VALUES (session_user, kind, norm) + RETURNING id INTO new_id; + PERFORM pg_notify('pg_ask_jobs', new_id::text); + RETURN new_id; +END +$$; + +-- Claim the oldest pending job for THIS database, atomically. Uses +-- FOR UPDATE SKIP LOCKED so concurrent workers never claim the same row and +-- never block each other. Flips pending -> running, stamps started_at / +-- worker_pid, bumps attempts. Returns the full job row (or no row when the +-- queue is empty). Scoped to current_database() so a worker only ever runs +-- jobs from the DB it is connected to (a bgworker binds to one DB). +CREATE OR REPLACE FUNCTION ask._job_claim() +RETURNS TABLE (id uuid, kind text, question text, attempts int, owner name) +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +BEGIN + RETURN QUERY + WITH next AS ( + SELECT j.id FROM ask._jobs j + WHERE j.status = 'pending' + AND j.db = current_database() + ORDER BY j.ts, j.id + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + UPDATE ask._jobs j + SET status = 'running', + started_at = now(), + finished_at = NULL, + worker_pid = pg_backend_pid(), + attempts = j.attempts + 1 + FROM next + WHERE j.id = next.id + RETURNING j.id, j.kind, j.question, j.attempts, j.owner; +END +$$; + +-- Mark a claimed job done with its answer + token usage. Only transitions a +-- row that is still 'running' (a cancel mid-flight wins). Fires +-- pg_notify('pg_ask_jobs_done', id) so a waiting client wakes. +CREATE OR REPLACE FUNCTION ask._job_complete( + job_id uuid, + p_answer text, + p_prompt bigint, + p_completion bigint +) RETURNS boolean +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + n int; +BEGIN + -- The worker_pid guard (B1) makes completion claim-scoped: only the + -- backend that currently owns the running row can finish it. Without it, + -- a slow-but-alive worker A whose job was orphan-recovered and re-claimed + -- by worker B could complete B's fresh attempt with A's stale answer + -- (double-execution / wrong result). + UPDATE ask._jobs + SET status = 'done', + answer = p_answer, + error = NULL, + prompt_tokens = p_prompt, + completion_tokens = p_completion, + finished_at = now() + WHERE id = job_id AND status = 'running' AND worker_pid = pg_backend_pid(); + GET DIAGNOSTICS n = ROW_COUNT; + IF n > 0 THEN + PERFORM pg_notify('pg_ask_jobs_done', job_id::text); + END IF; + RETURN n > 0; +END +$$; + +-- Mark a claimed job failed. If attempts remain (< max_attempts) the job is +-- returned to 'pending' for retry; otherwise it is terminal 'failed'. Only +-- acts on a 'running' row. `max_attempts` is passed by the caller (read from +-- the GUC in Rust) so the policy lives in one place. Notifies done-channel +-- only on terminal failure so a client awaiting a result isn't woken for a +-- transient retry. +CREATE OR REPLACE FUNCTION ask._job_fail( + job_id uuid, + p_error text, + max_attempts int +) RETURNS text +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + cur_attempts int; + new_status text; +BEGIN + -- worker_pid guard (B1): only the backend that owns the running claim may + -- fail it, so a re-claimed job isn't failed/retried by a ghost worker. + SELECT attempts INTO cur_attempts + FROM ask._jobs + WHERE id = job_id AND status = 'running' AND worker_pid = pg_backend_pid() + FOR UPDATE; + IF cur_attempts IS NULL THEN + RETURN NULL; -- not ours / not running (done/cancelled/re-claimed); no-op + END IF; + IF cur_attempts >= max_attempts THEN + new_status := 'failed'; + UPDATE ask._jobs + SET status = 'failed', error = p_error, finished_at = now() + WHERE id = job_id; + PERFORM pg_notify('pg_ask_jobs_done', job_id::text); + ELSE + new_status := 'pending'; + UPDATE ask._jobs + SET status = 'pending', error = p_error, + started_at = NULL, worker_pid = NULL + WHERE id = job_id; + PERFORM pg_notify('pg_ask_jobs', job_id::text); -- re-wake a worker + END IF; + RETURN new_status; +END +$$; + +-- Crash recovery: return 'running' jobs whose started_at is older than +-- `timeout_ms` back to 'pending' (their worker presumably died). Scoped to +-- current_database(). Returns the number of jobs recovered. Idempotent. +CREATE OR REPLACE FUNCTION ask._job_recover_orphans( + timeout_ms int +) RETURNS bigint +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + n bigint; +BEGIN + WITH revived AS ( + UPDATE ask._jobs + SET status = 'pending', started_at = NULL, worker_pid = NULL + WHERE status = 'running' + AND db = current_database() + AND started_at < now() - make_interval(secs => timeout_ms / 1000.0) + RETURNING id + ) + SELECT count(*) INTO n FROM revived; + RETURN n; +END +$$; + +-- Owner-scoped cancel. A job that is pending or running flips to +-- 'cancelled'; the completion helpers refuse to transition a non-running +-- row, so an in-flight worker's result is discarded. Returns true if it +-- changed anything. Filtered by owner = session_user so a role can only +-- cancel its own jobs. +CREATE OR REPLACE FUNCTION ask._job_cancel( + job_id uuid +) RETURNS boolean +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + n int; +BEGIN + UPDATE ask._jobs + SET status = 'cancelled', finished_at = now() + WHERE id = job_id + AND owner = session_user + AND status IN ('pending','running'); + GET DIAGNOSTICS n = ROW_COUNT; + RETURN n > 0; +END +$$; + +CREATE OR REPLACE FUNCTION ask._job_release( + job_id uuid +) RETURNS boolean +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + n int; +BEGIN + UPDATE ask._jobs + SET status = 'pending', started_at = NULL, worker_pid = NULL + WHERE id = job_id AND status = 'running' AND worker_pid = pg_backend_pid(); + GET DIAGNOSTICS n = ROW_COUNT; + RETURN n > 0; +END +$$; + +-- Delete terminal (done/failed/cancelled) jobs older than `older_than`, +-- in batches — same retention pattern as ask._outbox_prune. Pending/running +-- jobs are never touched. Operator-only. +CREATE OR REPLACE FUNCTION ask._jobs_prune( + older_than interval, + batch_size int DEFAULT 10000 +) RETURNS bigint +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $$ +DECLARE + total bigint := 0; + removed bigint; +BEGIN + IF batch_size IS NULL OR batch_size <= 0 THEN + WITH del AS ( + DELETE FROM ask._jobs + WHERE status IN ('done','failed','cancelled') + AND finished_at < now() - older_than + RETURNING 1 + ) + SELECT count(*) INTO total FROM del; + RETURN total; + END IF; + LOOP + WITH cand AS ( + SELECT id FROM ask._jobs + WHERE status IN ('done','failed','cancelled') + AND finished_at < now() - older_than + LIMIT batch_size + ), del AS ( + DELETE FROM ask._jobs j USING cand + WHERE j.id = cand.id + RETURNING 1 + ) + SELECT count(*) INTO removed FROM del; + total := total + removed; + EXIT WHEN removed = 0; + END LOOP; + RETURN total; +END +$$; + + +-- ── Grants ────────────────────────────────────────────────────────────────── +-- Two tiers (mirrors bootstrap.sql): +-- * user-facing owner-scoped → PUBLIC: _job_submit, _job_cancel. +-- * worker-path, NOT owner-filtered → operator-only (NOT granted to +-- PUBLIC): _job_claim, _job_complete, _job_fail, _job_recover_orphans. +-- A malicious role could otherwise claim/complete/fail another tenant's +-- job. The worker connects as superuser; grant these to a dedicated +-- drain role if you use one. +-- * destructive prune → operator-only. +GRANT SELECT ON ask._jobs TO PUBLIC; +REVOKE ALL ON FUNCTION ask._job_submit(text, text) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._job_claim() FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._job_complete(uuid, text, bigint, bigint) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._job_fail(uuid, text, int) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._job_recover_orphans(int) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._job_release(uuid) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._job_cancel(uuid) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask._jobs_prune(interval, int) FROM PUBLIC; +GRANT EXECUTE ON FUNCTION ask._job_submit(text, text) TO PUBLIC; +GRANT EXECUTE ON FUNCTION ask._job_cancel(uuid) TO PUBLIC; + +-- ── Public #[pg_extern] entry points ──────────────────────────────────────── +-- pgrx does NOT emit CREATE FUNCTION DDL for #[pg_extern]s into an +-- ALTER EXTENSION UPDATE script (only into the base-install file), so each +-- new C-language entry point must be created explicitly here, mirroring the +-- pgrx-generated definitions in pg_ask--0.5.9.sql. (Lesson from the 0.5.8 +-- prune_events fix.) MODULE_PATHNAME expands to $libdir/pg_ask in the +-- extension-script context. +CREATE OR REPLACE FUNCTION ask."ask_async"( + "question" text, + "kind" text DEFAULT 'ask' +) RETURNS uuid +STRICT VOLATILE PARALLEL UNSAFE +LANGUAGE c +AS 'MODULE_PATHNAME', 'ask_async_wrapper'; + +CREATE OR REPLACE FUNCTION ask."job_status"( + "job_id" uuid +) RETURNS text +STRICT STABLE PARALLEL UNSAFE +LANGUAGE c +AS 'MODULE_PATHNAME', 'job_status_wrapper'; + +CREATE OR REPLACE FUNCTION ask."job_result"( + "job_id" uuid +) RETURNS text +STRICT STABLE PARALLEL UNSAFE +LANGUAGE c +AS 'MODULE_PATHNAME', 'job_result_wrapper'; + +CREATE OR REPLACE FUNCTION ask."job_error"( + "job_id" uuid +) RETURNS text +STRICT STABLE PARALLEL UNSAFE +LANGUAGE c +AS 'MODULE_PATHNAME', 'job_error_wrapper'; + +CREATE OR REPLACE FUNCTION ask."cancel_job"( + "job_id" uuid +) RETURNS bool +STRICT VOLATILE PARALLEL UNSAFE +LANGUAGE c +AS 'MODULE_PATHNAME', 'cancel_job_wrapper'; + +CREATE OR REPLACE FUNCTION ask."run_pending_jobs"() RETURNS bigint +VOLATILE PARALLEL UNSAFE +LANGUAGE c +AS 'MODULE_PATHNAME', 'run_pending_jobs_wrapper'; + +CREATE OR REPLACE FUNCTION ask."prune_jobs"( + "older_than" text, + "batch_size" int DEFAULT 10000 +) RETURNS bigint +STRICT VOLATILE PARALLEL UNSAFE +LANGUAGE c +AS 'MODULE_PATHNAME', 'prune_jobs_wrapper'; + +-- prune_jobs AND run_pending_jobs are operator-only (match finalize.sql on +-- fresh installs). run_pending_jobs claims/runs jobs regardless of owner via +-- the operator-only worker-path helpers, so it must not be PUBLIC. +REVOKE ALL ON FUNCTION ask.prune_jobs(text, int) FROM PUBLIC; +REVOKE ALL ON FUNCTION ask.run_pending_jobs() FROM PUBLIC; + +-- ── parallel-safety fix for pre-existing SPI-reading functions ────────────── +-- status / list_tools / list_memories / list_namespaces were created +-- PARALLEL SAFE in earlier versions, but they all read via SPI — which is +-- forbidden inside a parallel worker ("cannot start commands during a +-- parallel operation"). pgrx does NOT change a function's proparallel flag on +-- ALTER EXTENSION UPDATE (only fresh-install base SQL reflects the new +-- annotation), so an upgraded install keeps the wrong flag unless we fix it +-- explicitly here. (job_status / job_result / job_error are new in 0.5.9 and +-- are already created PARALLEL UNSAFE above.) +ALTER FUNCTION ask.status() PARALLEL UNSAFE; +ALTER FUNCTION ask.list_tools() PARALLEL UNSAFE; +ALTER FUNCTION ask.list_memories(text, int, int) PARALLEL UNSAFE; +ALTER FUNCTION ask.list_namespaces() PARALLEL UNSAFE; diff --git a/src/api/emit.rs b/src/api/emit.rs index 2628a53..b65a013 100644 --- a/src/api/emit.rs +++ b/src/api/emit.rs @@ -2,8 +2,10 @@ //! //! Thin `#[pg_extern]` wrapper over [`crate::infra::events::emit`]. Appends //! a durable row to `ask._outbox` and fires `pg_notify('pg_ask_events', id)` -//! so an orchestrator (senti) can react. No-op (returns NULL) unless -//! `pg_ask.events_enabled = on`. +//! so any process holding a `LISTEN pg_ask_events` connection can react. +//! No-op (returns NULL) unless `pg_ask.events_enabled = on`, and also a +//! no-op when an emit is suppressed by the optional rate-limit / dedup +//! guards (see [`crate::infra::events`]). //! //! Intended to be called from triggers or scheduled jobs that have already //! decided a condition is worth reporting. Keep the *threshold* logic in @@ -16,7 +18,9 @@ use pgrx::prelude::*; use pgrx::{JsonB, Uuid}; /// Emit an event to the outbox and notify listeners. Returns the new row's -/// id, or NULL when the events layer is disabled. +/// id, or NULL when the emit was a no-op (events disabled, or suppressed by +/// the rate-limit / dedup window). Raises only on caller bugs: an invalid +/// event name, or a payload/summary over the configured size ceilings. /// /// ```sql /// SELECT ask.emit('inventory.critical', @@ -38,3 +42,26 @@ fn emit( Err(e) => raise_as_pg_error(&e), } } + +/// Prune already-delivered outbox rows older than `older_than` (a Postgres +/// interval literal, e.g. `'7 days'`), in batches of `batch_size` (default +/// 10000; pass `0` for a single unbounded DELETE). Pending (undelivered) +/// rows are never touched. Returns the number of rows removed. +/// +/// Maintenance helper for operators — the outbox is otherwise append-only +/// and grows unbounded as events are processed. Batching keeps the first +/// prune of a long-neglected outbox from running as one giant transaction +/// (huge WAL, long locks, replication stall). Not granted to PUBLIC by +/// default (see finalize.sql); grant it to a maintenance role explicitly. +/// +/// ```sql +/// SELECT ask.prune_events('30 days'); -- default batch size +/// SELECT ask.prune_events('30 days', 5000); -- custom batch size +/// ``` +#[pg_extern(schema = "ask", volatile, parallel_unsafe)] +fn prune_events(older_than: &str, batch_size: default!(i32, 10000)) -> i64 { + match events::prune(older_than, batch_size) { + Ok(n) => n, + Err(e) => raise_as_pg_error(&e), + } +} diff --git a/src/api/jobs.rs b/src/api/jobs.rs new file mode 100644 index 0000000..73ce922 --- /dev/null +++ b/src/api/jobs.rs @@ -0,0 +1,157 @@ +//! Async job queue SQL surface (v0.5.9 / ADR-0018). +//! +//! Thin `#[pg_extern]` wrappers. Enqueue is one SECURITY DEFINER call; +//! status/result are owner-scoped SELECTs; the drain entry point delegates +//! to [`crate::jobs`] (the use-case layer). No business logic here. + +use crate::infra::config::{RuntimeConfig, JOBS_BATCH, JOBS_ENABLED}; +use crate::infra::errors::raise_as_pg_error; +use crate::jobs; +use pgrx::prelude::*; +use pgrx::Uuid; + +/// Enqueue a question for asynchronous execution. Returns the new job id +/// immediately (the agent loop runs later in a worker), or NULL when the job +/// queue is disabled (`pg_ask.jobs_enabled = off`, the default). +/// +/// `kind` is `'ask'` (full agent loop, default) or `'sql'` (generate-only). +/// Poll the result with `ask.job_status(id)` / `ask.job_result(id)`, or +/// `LISTEN pg_ask_jobs_done` for a low-latency wake-up. +/// +/// ```sql +/// SELECT ask.ask_async('top 5 customers by revenue'); +/// SELECT ask.ask_async('count orders today', 'sql'); +/// ``` +#[pg_extern(schema = "ask", volatile, parallel_unsafe)] +fn ask_async(question: &str, kind: default!(&str, "'ask'")) -> Option { + match Spi::get_one_with_args::( + "SELECT ask._job_submit($1, $2)", + &[kind.into(), question.into()], + ) { + Ok(id) => id, + Err(e) => raise_as_pg_error(&crate::infra::errors::AskError::from(e)), + } +} + +/// Current status of a job you own: `pending` / `running` / `done` / +/// `failed` / `cancelled`, or NULL if no such job belongs to you (the +/// NotFound == Unauthorized collapse the rest of pg_ask uses, so id-space +/// probing leaks nothing). +// parallel_unsafe: this reads via SPI (Spi::get_one_with_args). A function +// the planner believes is parallel-safe may be invoked inside a parallel +// worker, where SPI is forbidden ("cannot start commands during a parallel +// operation"). STABLE keeps it inlinable in the leader; UNSAFE just bars the +// worker path. +#[pg_extern(schema = "ask", stable, parallel_unsafe)] +fn job_status(job_id: Uuid) -> Option { + // Scalar subquery so the outer SELECT always returns exactly one row + // (the value or NULL). A bare `SELECT ... WHERE` returns zero rows when + // nothing matches, which Spi::get_one surfaces as a "positioned before + // the start" error instead of the clean None we want for the + // NotFound == Unauthorized collapse. + match Spi::get_one_with_args::( + "SELECT (SELECT status FROM ask._jobs WHERE id = $1 AND owner = session_user)", + &[job_id.into()], + ) { + Ok(s) => s, + Err(e) => raise_as_pg_error(&crate::infra::errors::AskError::from(e)), + } +} + +/// The answer text of a completed job you own. NULL while the job is still +/// pending/running, if it failed (use `ask.job_error`), or if no such job +/// belongs to you. +// parallel_unsafe: reads via SPI (see job_status). +#[pg_extern(schema = "ask", stable, parallel_unsafe)] +fn job_result(job_id: Uuid) -> Option { + match Spi::get_one_with_args::( + "SELECT (SELECT answer FROM ask._jobs \ + WHERE id = $1 AND owner = session_user AND status = 'done')", + &[job_id.into()], + ) { + Ok(s) => s, + Err(e) => raise_as_pg_error(&crate::infra::errors::AskError::from(e)), + } +} + +/// The error text of a failed job you own, or NULL if it didn't fail / isn't +/// yours. Separate from `job_result` so a caller can tell "no answer yet" +/// from "answer is empty". +// parallel_unsafe: reads via SPI (see job_status). +#[pg_extern(schema = "ask", stable, parallel_unsafe)] +fn job_error(job_id: Uuid) -> Option { + match Spi::get_one_with_args::( + "SELECT (SELECT error FROM ask._jobs \ + WHERE id = $1 AND owner = session_user AND status = 'failed')", + &[job_id.into()], + ) { + Ok(s) => s, + Err(e) => raise_as_pg_error(&crate::infra::errors::AskError::from(e)), + } +} + +/// Cancel a job you own that is still pending or running. Returns true if it +/// was cancelled, false if it was already terminal or not yours. An +/// in-flight worker's result is discarded (the completion helpers refuse to +/// transition a non-running row). +#[pg_extern(schema = "ask", volatile, parallel_unsafe)] +fn cancel_job(job_id: Uuid) -> bool { + match Spi::get_one_with_args::("SELECT ask._job_cancel($1)", &[job_id.into()]) { + Ok(ok) => ok.unwrap_or(false), + Err(e) => raise_as_pg_error(&crate::infra::errors::AskError::from(e)), + } +} + +/// Synchronously drain up to `pg_ask.jobs_batch` pending jobs in the current +/// database, running each agent loop in this transaction, and return how +/// many were processed. Intended for installs without the background worker +/// (e.g. driven by pg_cron) or for tests. Returns 0 immediately when the +/// queue is disabled. +/// +/// Also recovers orphaned `running` jobs (from a crashed worker) first, so a +/// pg_cron-only deployment still gets crash recovery. +/// +/// Operator-only: this claims and runs jobs regardless of who enqueued them +/// (it goes through the operator-only worker-path helpers), so it is REVOKEd +/// from PUBLIC in finalize.sql. Grant it to your pg_cron / maintenance role. +/// +/// ```sql +/// SELECT cron.schedule('pg_ask-drain', '10 seconds', $$SELECT ask.run_pending_jobs()$$); +/// ``` +#[pg_extern(schema = "ask", volatile, parallel_unsafe)] +fn run_pending_jobs() -> i64 { + // Honour the master switch, mirroring the background worker (H1/H2): when + // the queue is disabled this returns 0 immediately and, crucially, does + // NOT run orphan recovery — so a paused queue is truly inert. + if !JOBS_ENABLED.get() { + return 0; + } + let cfg = match RuntimeConfig::load() { + Ok(c) => c, + Err(e) => raise_as_pg_error(&e), + }; + // Recover orphans first so a pg_cron-only deployment self-heals. + if let Err(e) = jobs::recover_orphans() { + raise_as_pg_error(&e); + } + let max = JOBS_BATCH.get().max(1) as u32; + match jobs::drain(&cfg, max) { + Ok(n) => n as i64, + Err(e) => raise_as_pg_error(&e), + } +} + +/// Prune terminal (done/failed/cancelled) jobs older than `older_than` +/// (interval literal, e.g. `'7 days'`), in batches of `batch_size` +/// (default 10000; `0` = single DELETE). Pending/running jobs are never +/// removed. Operator-only (not granted to PUBLIC; see finalize.sql). +#[pg_extern(schema = "ask", volatile, parallel_unsafe)] +fn prune_jobs(older_than: &str, batch_size: default!(i32, 10000)) -> i64 { + match Spi::get_one_with_args::( + "SELECT ask._jobs_prune($1::interval, $2)", + &[older_than.into(), batch_size.into()], + ) { + Ok(n) => n.unwrap_or(0), + Err(e) => raise_as_pg_error(&crate::infra::errors::AskError::from(e)), + } +} diff --git a/src/api/memory.rs b/src/api/memory.rs index 5d7aab1..49c1925 100644 --- a/src/api/memory.rs +++ b/src/api/memory.rs @@ -79,7 +79,10 @@ fn forget(id: Uuid) -> bool { /// Browse memories the caller owns. Optional `namespace` filter; defaults /// to NULL (all namespaces). Newest-first, `limit_n` capped at 200. -#[pg_extern(schema = "ask", stable, parallel_safe)] +/// +/// `parallel_unsafe`: reads `ask._memories` via SPI, forbidden in a parallel +/// worker. +#[pg_extern(schema = "ask", stable, parallel_unsafe)] fn list_memories( namespace: default!(Option<&str>, "NULL"), limit_n: default!(i32, "50"), @@ -115,7 +118,10 @@ fn list_memories( /// Enumerate namespaces the caller has populated, with row counts. /// Ordered by row count desc — a good "what is in here?" probe. -#[pg_extern(schema = "ask", stable, parallel_safe)] +/// +/// `parallel_unsafe`: reads `ask._memories` via SPI, forbidden in a parallel +/// worker. +#[pg_extern(schema = "ask", stable, parallel_unsafe)] fn list_namespaces() -> TableIterator<'static, (name!(namespace, String), name!(n, i64))> { let rows = match memory::namespaces() { Ok(r) => r, diff --git a/src/api/mod.rs b/src/api/mod.rs index 910a032..8dd0e37 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -13,6 +13,7 @@ pub mod ask; pub mod chat; pub mod config; pub mod emit; +pub mod jobs; pub mod memory; pub mod preview; pub mod status; diff --git a/src/api/status.rs b/src/api/status.rs index f835eae..102ddd3 100644 --- a/src/api/status.rs +++ b/src/api/status.rs @@ -14,10 +14,13 @@ use pgrx::prelude::*; /// Self-describing capability + configuration document for this install. /// -/// `STABLE` (reads config + catalog, no writes) and `parallel_safe`. Safe -/// to `GRANT EXECUTE ... TO PUBLIC`: the document reports -/// `provider_configured` as a boolean and never returns the api_key. -#[pg_extern(schema = "ask", stable, parallel_safe)] +/// `STABLE` (reads config + catalog, no writes). `parallel_unsafe` because +/// `status::snapshot()` reads via SPI (`has_schema_privilege`, `to_regclass` +/// probes); a parallel-safe function may be invoked inside a parallel worker +/// where SPI is forbidden ("cannot start commands during a parallel +/// operation"). Still safe to `GRANT EXECUTE ... TO PUBLIC`: the document +/// reports `provider_configured` as a boolean and never returns the api_key. +#[pg_extern(schema = "ask", stable, parallel_unsafe)] fn status() -> pgrx::Json { pgrx::Json(status::snapshot()) } diff --git a/src/api/tools.rs b/src/api/tools.rs index 3cfe877..3b8624f 100644 --- a/src/api/tools.rs +++ b/src/api/tools.rs @@ -63,7 +63,10 @@ fn do_unregister(name: &str) -> Result { } /// List user-defined tools for the current role. -#[pg_extern(schema = "ask", stable, parallel_safe)] +/// +/// `parallel_unsafe`: reads `ask._tools` via SPI (`Spi::connect`), which is +/// forbidden inside a parallel worker. +#[pg_extern(schema = "ask", stable, parallel_unsafe)] fn list_tools() -> TableIterator<'static, (name!(name, String), name!(spec, pgrx::Json))> { let rows = match do_list() { Ok(r) => r, diff --git a/src/bgworker.rs b/src/bgworker.rs index 6c2a0b7..c4e905a 100644 --- a/src/bgworker.rs +++ b/src/bgworker.rs @@ -1,48 +1,423 @@ -//! Background worker prototype (v0.5). +//! Background workers for the async job queue (v0.5.9 / ADR-0018). //! -//! Decouples long-running agent work from the calling backend by running -//! the loop in a separate background worker process. The caller will -//! eventually submit a question to `ask._jobs` and poll -//! `ask._job_results` for the answer. This is a stub — the full loop -//! wiring lands in v0.6. +//! PostgreSQL has no in-backend async: a backend is single-threaded and SPI +//! is not thread-safe. The only correct way to run work "in the background" +//! is a separate process — a `BackgroundWorker`. This module implements the +//! two-tier shape that lets ONE extension serve EVERY database: +//! +//! ```text +//! launcher (1 process, started from shared_preload_libraries) +//! │ connects to the 'postgres' maintenance DB +//! │ periodically lists databases that have pg_ask installed +//! ▼ +//! per-DB worker (1 dynamic process per pg_ask-enabled database) +//! connects to THAT database, drains ask._jobs in a loop: +//! recover_orphans → claim → run agent loop → complete/fail +//! ``` +//! +//! Why two tiers: a single bgworker binds to exactly one database for its +//! whole life (`connect_worker_to_spi(dbname)`), but pg_ask can be installed +//! in many. The launcher discovers them and spawns a dynamic worker per DB, +//! re-reconciling on an interval so a `CREATE EXTENSION pg_ask` in a new +//! database picks up a worker without a restart. +//! +//! Everything is opt-in: workers only register when the library is in +//! `shared_preload_libraries`, and each per-DB worker no-ops unless +//! `pg_ask.jobs_enabled = on` in its database. An install that doesn't use +//! async pays only the launcher's idle reconcile loop. +use crate::infra::config::{RuntimeConfig, JOBS_BATCH, JOBS_ENABLED, JOBS_POLL_INTERVAL_MS}; use pgrx::bgworkers::*; use pgrx::prelude::*; +use std::collections::HashMap; use std::time::Duration; -/// Register the background worker. Called from `_PG_init` when the -/// extension is loaded via `shared_preload_libraries`. +/// Maintenance DB the launcher connects to in order to enumerate databases. +const LAUNCHER_DB: &str = "postgres"; + +/// How often the launcher re-scans for pg_ask-enabled databases (ms). +const LAUNCHER_RECONCILE_MS: u64 = 30_000; + +/// Register the launcher. Called from `_PG_init` when the extension is +/// loaded via `shared_preload_libraries`. If loaded dynamically +/// (`LOAD 'pg_ask'`), registration is skipped — there is no postmaster +/// slot to attach to, and the synchronous `ask.run_pending_jobs()` path +/// remains available. pub fn register() { - // Workers can only be registered while Postgres is processing - // shared_preload_libraries. If the extension is loaded dynamically - // (e.g. `LOAD 'pg_ask'`), we silently skip registration. if unsafe { !pgrx::pg_sys::process_shared_preload_libraries_in_progress } { return; } - BackgroundWorkerBuilder::new("pg_ask worker") - .set_function("pg_ask_worker_main") + BackgroundWorkerBuilder::new("pg_ask launcher") + .set_function("pg_ask_launcher_main") .set_library("pg_ask") + .set_start_time(BgWorkerStartTime::RecoveryFinished) .enable_spi_access() + // Restart after 10s if it ever exits unexpectedly. + .set_restart_time(Some(Duration::from_secs(10))) .load(); } -/// Main entry point for the background worker process. +/// Launcher entry point. Connects to the maintenance DB, then loops: +/// discover pg_ask-enabled databases and ensure each has a running per-DB +/// worker. Dynamic workers self-terminate if their database loses the +/// extension; the launcher simply stops tracking them. #[pg_guard] #[no_mangle] -pub extern "C-unwind" fn pg_ask_worker_main(_arg: pg_sys::Datum) { +pub extern "C-unwind" fn pg_ask_launcher_main(_arg: pg_sys::Datum) { BackgroundWorker::attach_signal_handlers(SignalWakeFlags::SIGHUP | SignalWakeFlags::SIGTERM); - BackgroundWorker::connect_worker_to_spi(Some("postgres"), None); + BackgroundWorker::connect_worker_to_spi(Some(LAUNCHER_DB), None); + log!("pg_ask launcher started"); - log!("pg_ask worker started"); + // Per-database worker handles, kept so we can check liveness each + // reconcile and RESPAWN a worker that has died. A dynamic worker is + // created with set_restart_time(None) (the postmaster won't restart it), + // so respawning is the launcher's job: if `handle.pid()` no longer + // reports `Started`, the worker exited (crash, OOM, or extension drop) + // and we drop the handle so the spawn loop below recreates it. + // + // Restart safety (B3): when the postmaster restarts THIS launcher, the + // old launcher's dynamic workers keep running (Postgres does not + // parent-kill them) but this fresh process starts with an empty map. To + // avoid spawning a duplicate worker per database on every launcher + // restart, the spawn loop also checks `pg_stat_activity` for an existing + // 'pg_ask worker: {db}' backend and skips databases that already have a + // live worker. On a clean shutdown we additionally terminate our own + // workers so they don't outlive the launcher. + let mut workers: HashMap = HashMap::new(); - while BackgroundWorker::wait_latch(Some(Duration::from_secs(5))) { + while BackgroundWorker::wait_latch(Some(Duration::from_millis(LAUNCHER_RECONCILE_MS))) { if BackgroundWorker::sighup_received() { - // In v0.6: reload GUCs / config here. + // Nothing cached from GUCs here; reconcile picks up changes. + } + + // Drop handles for workers that are no longer running so they get + // respawned below. `pid()` returns Ok(Started) only while alive. + workers.retain(|db, handle| { + let alive = handle.pid().is_ok(); + if !alive { + log!("pg_ask launcher: worker for '{db}' is gone; will respawn"); + } + alive + }); + + let dbs = match list_pgask_databases() { + Ok(dbs) => dbs, + Err(e) => { + log!("pg_ask launcher: database scan failed: {e}"); + continue; + } + }; + + for db in dbs { + if workers.contains_key(&db) { + continue; // a live worker (ours) already owns this database + } + // B3: a worker from a previous launcher lifetime may still be + // running this database. Don't spawn a duplicate. + match db_worker_running(&db) { + Ok(true) => continue, + Ok(false) => {} + Err(e) => { + log!("pg_ask launcher: worker-presence check for '{db}' failed: {e}"); + continue; // be conservative: skip this round, retry next + } + } + match spawn_db_worker(&db) { + Ok(handle) => { + log!("pg_ask launcher: spawned worker for database '{db}'"); + workers.insert(db, handle); + } + Err(()) => { + log!("pg_ask launcher: failed to spawn worker for '{db}' (will retry)"); + } + } } - // v0.5 stub: heartbeat only. v0.6 will poll ask._jobs, - // run agent::run() against pending rows, and write results back. - log!("pg_ask worker heartbeat"); } - log!("pg_ask worker shutting down"); + // Clean shutdown (SIGTERM): terminate our dynamic workers so they don't + // outlive the launcher and get duplicated by the next launcher instance. + for (db, handle) in workers.drain() { + log!("pg_ask launcher: terminating worker for '{db}'"); + let _ = handle.terminate(); + } + log!("pg_ask launcher shutting down"); +} + +/// Is there already a live 'pg_ask worker: {db}' backend? Used to avoid +/// spawning a duplicate when this launcher restarted while the previous +/// launcher's dynamic workers are still running (B3). Matches the bgw name +/// we set in `spawn_db_worker`. +fn db_worker_running(db: &str) -> Result { + BackgroundWorker::transaction(|| { + let present: Option = Spi::get_one_with_args( + "SELECT EXISTS (SELECT 1 FROM pg_stat_activity \ + WHERE backend_type = $1)", + &[format!("pg_ask worker: {db}").into()], + ) + .map_err(|e| e.to_string())?; + Ok::<_, String>(present.unwrap_or(false)) + }) +} + +/// List databases that allow connections AND have the pg_ask extension +/// installed. Run in the launcher's maintenance-DB connection. +fn list_pgask_databases() -> Result, String> { + BackgroundWorker::transaction(|| { + // The launcher runs in the 'postgres' maintenance DB, where the + // pg_ask extension (and the `ask` schema) is NOT installed — so we + // CANNOT call any `ask.*` helper here. We discover pg_ask-enabled + // databases inline: + // + // * pg_extension is per-database, so we use dblink (standard + // contrib) to probe each connectable database's catalog and keep + // only the ones with pg_ask. This stops the launcher from + // endlessly respawning a short-lived worker in a database that + // will never have the extension (e.g. 'postgres' itself). + // * Each probe runs in its own subtransaction so an unreachable + // database (permissions, conn cap) is skipped, not fatal. + // * If dblink is not installed in this maintenance DB we cannot + // probe; fall back to every connectable database and rely on the + // per-DB worker's own extension_present() check (noisier: + // respawn churn for non-pg_ask DBs, but correct). + // + // dblink must be installed in the launcher's database (CREATE + // EXTENSION dblink in 'postgres'); the Docker image's initdb hook + // does this automatically. + Spi::connect(|client| { + let have_dblink = client + .select( + "SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'dblink')", + Some(1), + &[], + )? + .first() + .get::(1)? + == Some(true); + + let rows = client.select( + "SELECT datname::text FROM pg_database \ + WHERE datallowconn AND NOT datistemplate ORDER BY datname", + None, + &[], + )?; + let mut candidates = Vec::new(); + for row in rows { + if let Some(name) = row.get::(1)? { + candidates.push(name); + } + } + + // No dblink: return all candidates (worker self-check is backstop). + if !have_dblink { + return Ok::<_, pgrx::spi::SpiError>(candidates); + } + + // dblink available: keep only databases that truly have pg_ask. + let mut installed = Vec::new(); + for db in candidates { + // Each probe in its own subtransaction so a connect failure + // doesn't abort the whole scan. + let probed = + crate::infra::subtxn::run_in_subtransaction(Some("pgask_probe"), || { + // quote_LITERAL, not quote_ident: this is a libpq + // conninfo value, not a SQL identifier. quote_ident + // produces dbname="MyDb" which libpq reads as a DB + // literally named '"MyDb"' (quotes included) and fails + // — silently skipping every DB whose name has uppercase + // or special chars. quote_literal yields dbname='MyDb', + // the correct conninfo form. + let ok: Option = Spi::get_one_with_args( + "SELECT ok FROM dblink('dbname=' || quote_literal($1), \ + 'SELECT EXISTS (SELECT 1 FROM pg_extension \ + WHERE extname = ''pg_ask'')') AS t(ok bool)", + &[db.clone().into()], + )?; + Ok(ok.unwrap_or(false)) + }) + .unwrap_or(false); + if probed { + installed.push(db); + } + } + Ok::<_, pgrx::spi::SpiError>(installed) + }) + }) + .map_err(|e| e.to_string()) +} + +/// Spawn a dynamic per-database worker bound to `db`. The database name is +/// passed via `set_extra` so the worker knows where to connect. Returns the +/// handle so the launcher can check liveness and respawn on the next +/// reconcile if the worker dies. +fn spawn_db_worker(db: &str) -> Result { + BackgroundWorkerBuilder::new(&format!("pg_ask worker: {db}")) + .set_function("pg_ask_db_worker_main") + .set_library("pg_ask") + .set_extra(db) + .enable_spi_access() + // No auto-restart: the launcher owns respawn (it checks pid() each + // reconcile), which lets us also stop respawning a DB that has + // dropped the extension. + .set_restart_time(None) + .load_dynamic() + .map_err(|_| ()) +} + +/// Per-database worker entry point. Connects to the database named in +/// `get_extra()`, verifies pg_ask is installed, then drains the job queue +/// in a loop until the extension goes away or a SIGTERM arrives. +#[pg_guard] +#[no_mangle] +pub extern "C-unwind" fn pg_ask_db_worker_main(_arg: pg_sys::Datum) { + BackgroundWorker::attach_signal_handlers(SignalWakeFlags::SIGHUP | SignalWakeFlags::SIGTERM); + let db = BackgroundWorker::get_extra().to_string(); + if db.is_empty() { + log!("pg_ask worker: no database in extra; exiting"); + return; + } + BackgroundWorker::connect_worker_to_spi(Some(&db), None); + + // The launcher spawns a worker for every connectable database (it can't + // see other DBs' catalogs to pre-filter). Databases without pg_ask — e.g. + // the 'postgres' maintenance DB — exit immediately and quietly here, so a + // worker only runs where the extension actually lives. + match extension_present() { + Ok(true) => {} + Ok(false) => return, + Err(e) => { + log!("pg_ask worker for '{db}': extension check failed: {e}; exiting"); + return; + } + } + log!("pg_ask worker for '{db}' started"); + + // NB: a background worker CANNOT `LISTEN` — Postgres rejects it with + // "cannot execute LISTEN within a background process" (async.c gates + // LISTEN to regular client backends). So the worker is poll-driven: it + // wakes every `jobs_poll_interval_ms` and drains the queue. The enqueue + // path still fires pg_notify('pg_ask_jobs', id) — harmless here, and used + // by any external LISTENer — but the worker's own latency floor is the + // poll interval (default 5s). Keep that interval modest for snappier + // async; it is a single indexed query per wake when the queue is empty. + loop { + let poll_ms = poll_interval_ms(); + let latched = BackgroundWorker::wait_latch(Some(Duration::from_millis(poll_ms))); + if !latched { + // wait_latch returns false only on SIGTERM (shutdown requested). + break; + } + if BackgroundWorker::sighup_received() { + // GUCs are re-read each pass from the snapshot, nothing to cache. + } + + // If the extension is later dropped from this DB, the worker exits. + match extension_present() { + Ok(true) => {} + Ok(false) => { + log!("pg_ask worker for '{db}': extension dropped; exiting"); + break; + } + Err(e) => { + log!("pg_ask worker for '{db}': extension check failed: {e}"); + continue; + } + } + + if let Err(e) = drain_once() { + log!("pg_ask worker for '{db}': drain error: {e}"); + } + } + + log!("pg_ask worker for '{db}' shutting down"); +} + +/// Read the poll interval GUC (per pass, so a SIGHUP change is picked up). +/// Falls back to the default if the value is somehow out of range. +fn poll_interval_ms() -> u64 { + let v = JOBS_POLL_INTERVAL_MS.get(); + if v > 0 { + v as u64 + } else { + 5_000 + } +} + +/// Is the pg_ask extension installed in the worker's current database? +/// +/// Uses `SELECT EXISTS(...)` so the query ALWAYS returns exactly one row +/// (true/false). A bare `SELECT true FROM pg_extension WHERE ...` returns +/// zero rows when the extension is absent, which `Spi::get_one` surfaces as +/// "SpiTupleTable positioned before the start or after the end" rather than +/// a clean `None`. +fn extension_present() -> Result { + BackgroundWorker::transaction(|| { + let present: Option = + Spi::get_one("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_ask')") + .map_err(|e| e.to_string())?; + Ok::<_, String>(present.unwrap_or(false)) + }) +} + +/// One drain pass. The three phases each get their OWN transaction so the +/// durability guarantees are real (see the `src/jobs` module docs): +/// +/// 1. one txn: check the switch, recover orphans, snapshot config +/// 2. per job, in SEPARATE transactions: +/// a. claim — commits `running` (durable + visible) before slow work +/// b. execute — agent loop (no _jobs lock held), then complete/fail +/// +/// Splitting claim (2a) from execute (2b) is what makes orphan recovery +/// meaningful: a crash during the agent loop leaves a committed `running` +/// row that `_job_recover_orphans` can return to `pending`. A combined +/// claim+execute txn would roll the claim back on crash, so `running` would +/// never be visible and recovery would be dead code. +/// +/// SIGTERM responsiveness: the flag is checked between every job (before each +/// claim), so a shutdown takes effect within at most ONE job's runtime. A +/// shutdown that arrives mid-agent-loop still waits for that single job's +/// LLM call to return or hit `pg_ask.http_total_timeout_ms` — a background +/// worker's SIGTERM sets `ShutdownRequestPending`, which does NOT trip the +/// `check_for_interrupts!()` path the agent loop uses, so we cannot abort the +/// in-flight HTTP call cleanly. Operators who need a hard upper bound on +/// shutdown latency should keep `http_total_timeout_ms` modest. No-op +/// (cheap) when jobs are disabled. +fn drain_once() -> Result<(), String> { + // Step 1: cheap preamble in its own transaction. + let cfg = BackgroundWorker::transaction(|| { + if !JOBS_ENABLED.get() { + return Ok::<_, String>(None); + } + let _recovered = crate::jobs::recover_orphans().map_err(|e| e.to_string())?; + let cfg = RuntimeConfig::load().map_err(|e| e.to_string())?; + Ok(Some(cfg)) + })?; + + let Some(cfg) = cfg else { + return Ok(()); // jobs disabled + }; + + // Step 2: each job in its own claim txn + execute txn. + let max = JOBS_BATCH.get().max(1); + for _ in 0..max { + // Stop promptly on shutdown, leaving the rest pending. + if BackgroundWorker::sigterm_received() { + break; + } + + // 2a. Claim in its own transaction so `running` is COMMITTED before + // the agent loop runs. Returns None when the queue is empty. + let claimed = + BackgroundWorker::transaction(|| crate::jobs::claim_one().map_err(|e| e.to_string()))?; + let Some(job) = claimed else { + break; // queue drained + }; + + // 2b. Execute + complete/fail in a separate transaction. If the + // worker crashes here, the committed `running` row from 2a is + // reclaimed by orphan recovery on the next pass / restart. + BackgroundWorker::transaction(|| { + crate::jobs::execute_claimed(&cfg, &job).map_err(|e| e.to_string()) + })?; + } + Ok(()) } diff --git a/src/infra/config.rs b/src/infra/config.rs index db69c00..4d47bea 100644 --- a/src/infra/config.rs +++ b/src/infra/config.rs @@ -36,10 +36,63 @@ pub static TRACE_ENABLED: GucSetting = GucSetting::::new(true); /// Master switch for the event outbox (ADR-0017). When `false`, /// `ask.emit()` is a no-op returning NULL, so an install that doesn't use /// reverse notifications pays nothing and exposes no channel. Default -/// `false` (opt-in): emitting events is only useful when a listener (senti) -/// is actually consuming them. +/// `false` (opt-in): emitting events is only useful when a downstream +/// `LISTEN pg_ask_events` consumer is actually draining them. pub static EVENTS_ENABLED: GucSetting = GucSetting::::new(false); +/// Max serialized-JSON bytes accepted for an `ask.emit()` payload. Guards +/// the outbox against a runaway trigger writing multi-megabyte rows. `<= 0` +/// disables the check (operator opt-out). Default 64 KiB — comfortably +/// above any sane event body, well below a memory/IO concern. +pub static EVENTS_MAX_PAYLOAD_BYTES: GucSetting = GucSetting::::new(65_536); + +/// Per-(emitter, event) rate limit for `ask.emit()`, in emits per rolling +/// minute. Beyond this the emit is a silent no-op (the row is NOT written), +/// so a trigger flooding on every INSERT can't blow up the outbox. `<= 0` +/// disables the limit (default). Enforced atomically in `ask._outbox_emit`. +pub static EVENTS_MAX_PER_MINUTE: GucSetting = GucSetting::::new(0); + +/// Dedup window for `ask.emit()`, in milliseconds. Within the window, a +/// second emit with the same (emitter, event, payload) is a silent no-op, +/// collapsing duplicate alerts from a chatty trigger. `<= 0` disables +/// dedup (default). Enforced atomically in `ask._outbox_emit`. +pub static EVENTS_DEDUP_WINDOW_MS: GucSetting = GucSetting::::new(0); + +// ---------- Async job queue (v0.5.9 / ADR-0018) ---------- + +/// Master switch for the async job queue. When `false`, `ask.ask_async()` +/// is a no-op returning NULL (mirroring `events_enabled`), so an install +/// that doesn't use async pays nothing. Default `false` (opt-in). +pub static JOBS_ENABLED: GucSetting = GucSetting::::new(false); + +/// How many times a failed job is retried before it is marked `failed`. A +/// transient failure (provider 5xx, timeout) returns the job to `pending`; +/// once `attempts` reaches this, it is terminal. Minimum 1 (one attempt, no +/// retry). Default 3. +pub static JOBS_MAX_ATTEMPTS: GucSetting = GucSetting::::new(3); + +/// A `running` job whose `started_at` is older than this is considered +/// orphaned (its worker died) and returned to `pending` by +/// `ask._job_recover_orphans`. Must comfortably exceed the worst-case +/// agent-loop runtime so a slow-but-alive job isn't reclaimed under it: with +/// the defaults a single job can run up to roughly +/// `max_iterations * http_total_timeout_ms` (24 * 120s ≈ 48 min), so the +/// default here is 1 hour. The `worker_pid` guard in `_job_complete` / +/// `_job_fail` prevents a falsely-reclaimed job from being clobbered even if +/// this is set too low, but a too-low value still wastes an attempt on a +/// live job — keep it above your real worst case. Plain integer ms. +pub static JOBS_ORPHAN_TIMEOUT_MS: GucSetting = GucSetting::::new(3_600_000); + +/// Maximum jobs a single `ask.run_pending_jobs()` / worker drain pass +/// processes before returning, so one pass can't monopolise a worker. The +/// worker loops again immediately if more remain. Default 10. +pub static JOBS_BATCH: GucSetting = GucSetting::::new(10); + +/// Background-worker poll interval in ms: how often each per-database worker +/// wakes to drain the queue and recover orphans even without a NOTIFY (a +/// safety net against a missed wake-up). Default 5000. Plain integer ms. +pub static JOBS_POLL_INTERVAL_MS: GucSetting = GucSetting::::new(5_000); + /// Soft cap on the schema dump injected into the system prompt, measured /// in characters (a rough proxy for tokens at ~4 chars/token). /// @@ -316,6 +369,14 @@ const KNOWN_KEYS: &[&str] = &[ "tool_max_rows", "trace_enabled", "events_enabled", + "events_max_payload_bytes", + "events_max_per_minute", + "events_dedup_window_ms", + "jobs_enabled", + "jobs_max_attempts", + "jobs_orphan_timeout_ms", + "jobs_batch", + "jobs_poll_interval_ms", "schema_char_budget", "embedding_provider", "embedding_api_key", diff --git a/src/infra/events.rs b/src/infra/events.rs index 8122dc7..e335bd9 100644 --- a/src/infra/events.rs +++ b/src/infra/events.rs @@ -1,31 +1,72 @@ -//! Event outbox emission (ADR-0017: pg_ask -> senti reverse notifications). +//! Event outbox emission (ADR-0017: in-database reverse notifications). //! //! `ask.emit(event, payload, summary)` appends a durable row to //! `ask._outbox` and fires `pg_notify('pg_ask_events', )`. An external -//! orchestrator (senti) LISTENs on that channel, reads the row, and routes -//! it to the owning agent. The durable table is the source of truth; the -//! NOTIFY is only a low-latency wake-up, so nothing is lost if the listener -//! is offline (it drains the backlog on reconnect). +//! orchestrator (any process holding a `LISTEN pg_ask_events` connection) +//! reads the row and routes it onward. The durable table is the source of +//! truth; the NOTIFY is only a low-latency wake-up, so nothing is lost if +//! the listener is offline (it drains the backlog on reconnect). //! -//! pg_ask itself knows nothing about senti — it only deposits an event in -//! its own database. Who listens (and therefore which tenant/agent owns the -//! event) is decided entirely on the consumer side; see the ADR for why -//! that makes multi-tenant isolation automatic. +//! pg_ask itself knows nothing about who consumes these events — it only +//! deposits a row in its own database. Who listens (and therefore which +//! tenant/agent owns the event) is decided entirely on the consumer side; +//! see ADR-0017 for why that makes multi-tenant isolation automatic. +//! +//! ## Single authority: `ask._outbox_emit` +//! +//! The SECURITY DEFINER `ask._outbox_emit` SQL function is the *one* place +//! that decides whether and how an event is written. It re-checks +//! `events_enabled`, validates the input (event-name charset/length, summary +//! length, payload byte ceiling), enforces the rate-limit / dedup guards, +//! writes the durable row, AND fires the NOTIFY — all atomically. That +//! helper is GRANTed to PUBLIC, so a caller could invoke it directly; +//! keeping every rule in SQL means both entry points (`ask.emit` and a +//! direct `ask._outbox_emit` call) enforce exactly the same contract. +//! +//! This Rust layer is a thin adapter. It deliberately does **no** size or +//! charset validation of its own: duplicating those checks here once caused +//! silent drift (Rust counts bytes via `str::len`; SQL `length()` counts +//! characters; `serde_json` emits compact JSON while `jsonb::text` inserts +//! spaces — so the two layers disagreed on multi-byte summaries and on +//! payloads near the byte ceiling). The only thing Rust does is a cheap +//! `events_enabled` short-circuit to avoid an SPI round-trip on the common +//! disabled path; correctness lives entirely in SQL. +//! +//! ## Why suppression never raises +//! +//! `emit` is meant to be called from triggers and scheduled jobs, often on +//! the hot path of an application write. When an emit is suppressed by the +//! rate-limit or dedup window (or because events are disabled) the SQL +//! authority returns NULL and we surface it as `Ok(None)` — a silent no-op. +//! Raising there would roll back the surrounding INSERT/UPDATE that fired +//! the trigger, a cure far worse than a dropped duplicate alert. Only caller +//! bugs (invalid event name, oversized payload/summary) raise, and that +//! `RAISE` originates in `ask._outbox_emit` with a precise SQLSTATE. use crate::infra::config::EVENTS_ENABLED; -use crate::infra::errors::{AskError, Result}; +use crate::infra::errors::Result; use pgrx::prelude::*; use pgrx::{JsonB, Uuid}; /// NOTIFY channel name. Fixed (not configurable) so listeners have a stable /// contract; the per-event detail lives in the outbox row, not the channel. +/// The actual NOTIFY is fired inside `ask._outbox_emit` (see the SQL writer); +/// this constant documents the channel-name contract and is asserted by +/// `event_channel_constant_is_stable` so a rename is caught at test time. +#[cfg_attr(not(any(test, feature = "pg_test")), allow(dead_code))] pub const EVENT_CHANNEL: &str = "pg_ask_events"; /// Append an event to `ask._outbox` and notify listeners. /// -/// Returns `Some(id)` of the new row, or `None` when the events layer is -/// disabled (`pg_ask.events_enabled = off`, the default) — a no-op so an -/// install that doesn't use reverse notifications pays nothing. +/// Returns `Some(id)` of the new row, or `None` when the emit was a no-op +/// (events disabled, or suppressed by the rate-limit / dedup window). All of +/// that — plus input validation and the NOTIFY — is decided by the SQL +/// authority `ask._outbox_emit`; this function only short-circuits the +/// disabled case and forwards the call. +/// +/// Hard errors (`Err`, surfaced from a SQL `RAISE`) are reserved for caller +/// bugs: an invalid event name, or a payload/summary that exceeds the +/// configured size ceilings. /// /// `payload` defaults to `{}` when `None`. `summary` is an optional /// human-readable line (e.g. an `ask.ask()` result) kept separate from the @@ -35,34 +76,41 @@ pub fn emit( payload: Option, summary: Option<&str>, ) -> Result> { + // Cheap short-circuit: skip the SPI round-trip on the common disabled + // path. The SQL authority re-checks this flag, so skipping here is a pure + // optimization, not the gate. if !EVENTS_ENABLED.get() { return Ok(None); } - if event.trim().is_empty() { - return Err(AskError::InvalidConfig { - key: "emit", - message: "event name must not be empty".to_string(), - }); - } let payload_json = JsonB(payload.unwrap_or_else(|| serde_json::json!({}))); - // Phase 1: durable append via the SECURITY DEFINER writer, capturing the - // id. Routed through the helper (not a direct INSERT) for the same - // reason as _sql_audit: PUBLIC has no INSERT on ask._outbox. + // The SECURITY DEFINER writer is the single authority: it validates, + // re-checks the enabled flag, enforces the rate-limit / dedup guards, + // INSERTs the durable row, and fires pg_notify('pg_ask_events', id) — + // all atomically. It returns the new id, or NULL when the emit was a + // no-op (disabled / suppressed). We surface NULL as Ok(None): a silent + // no-op, never an error, so a trigger's transaction is never aborted. let id: Option = Spi::get_one_with_args( "SELECT ask._outbox_emit($1, $2, $3)", &[event.into(), payload_json.into(), summary.into()], )?; - let id = id.ok_or_else(|| AskError::Sql("ask._outbox_emit returned no id".to_string()))?; - // Phase 2: low-latency wake-up. pg_notify's payload is capped at 8 KB, - // so we send ONLY the id — the listener reads the full row from the - // outbox. This is the correct pattern regardless of payload size. - Spi::run_with_args( - "SELECT pg_notify($1, $2)", - &[EVENT_CHANNEL.into(), id.to_string().into()], - )?; + Ok(id) +} - Ok(Some(id)) +/// Delete already-delivered outbox rows older than `older_than` (a Postgres +/// interval literal such as `'7 days'`), in batches of `batch_size`. +/// Pending (undelivered) rows are never removed. Returns the number of rows +/// pruned. +/// +/// Thin wrapper over the SECURITY DEFINER `ask._outbox_prune`. The interval +/// is passed as text and cast in SQL so we don't have to bind pgrx's +/// `Interval` type here; an invalid literal surfaces as a normal SQL error. +pub fn prune(older_than: &str, batch_size: i32) -> Result { + let n: Option = Spi::get_one_with_args( + "SELECT ask._outbox_prune($1::interval, $2)", + &[older_than.into(), batch_size.into()], + )?; + Ok(n.unwrap_or(0)) } diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs new file mode 100644 index 0000000..09e9625 --- /dev/null +++ b/src/jobs/mod.rs @@ -0,0 +1,290 @@ +//! Async job queue domain layer (v0.5.9 / ADR-0018). +//! +//! This is the single home of the "claim → run → complete" business logic +//! for `ask._jobs`. Both drivers reuse it unchanged: +//! +//! * `ask.run_pending_jobs()` (a `#[pg_extern]`) — synchronous manual / +//! pg_cron-style drain, runs inside the caller's transaction. +//! * the background worker (`crate::bgworker`) — wraps each job in its own +//! `BackgroundWorker::transaction(...)` so a long LLM round-trip never +//! holds one transaction open across the whole batch. +//! +//! Keeping the logic here (not in either driver) is the clean-architecture +//! point: the SQL state machine (`ask._job_*` helpers) is the data layer, +//! this module is the use-case layer, and the two drivers are thin +//! delivery mechanisms. Neither driver duplicates a claim/complete rule. +//! +//! ## Transaction shape +//! +//! Each job goes through three SECURITY DEFINER calls: +//! 1. `_job_claim()` — atomically pending → running (FOR UPDATE SKIP LOCKED) +//! 2. agent loop — the LLM round-trip + tool SPI (no _jobs lock held) +//! 3. `_job_complete()` / `_job_fail()` — running → done / failed / pending +//! +//! These three are deliberately split across SEPARATE transactions by the +//! background-worker driver, which is what makes the durability story real: +//! +//! * **claim** commits on its own, so the `running` transition (with +//! `started_at`) is durable and visible to other backends *before* the +//! slow agent loop begins; +//! * the **agent loop** runs holding no `_jobs` lock; +//! * **complete/fail** commits on its own. +//! +//! If the worker crashes during the agent loop, the job stays `running` with +//! a stale `started_at`, and `_job_recover_orphans` (which only ever sees a +//! *committed* `running` row) returns it to `pending`. This is why claim must +//! commit separately — in a single combined transaction a crash would roll +//! the claim back too, the `running` state would never be visible, and +//! orphan recovery would be unreachable dead code. +//! +//! The synchronous `ask.run_pending_jobs()` driver cannot commit between +//! steps (a plain SQL function runs inside the caller's transaction and may +//! not `COMMIT`), so it runs the whole bounded batch in one transaction. The +//! trade-off there (a re-queued retry must not be re-claimed in the same +//! pass) is handled by [`drain`] tracking the ids it has already attempted. +//! For per-job durability, prefer the background worker; `run_pending_jobs` +//! is the pg_cron / no-preload fallback. +//! +//! This module stays transaction-agnostic: it only issues SPI calls. The +//! background-worker driver wraps [`claim_one`] and [`execute_claimed`] in +//! their own `BackgroundWorker::transaction(...)` blocks; the SQL function +//! driver calls [`drain`] inside the one transaction it already has. + +use crate::agent::{self, AgentMode}; +use crate::infra::config::{RuntimeConfig, JOBS_MAX_ATTEMPTS, JOBS_ORPHAN_TIMEOUT_MS}; +use crate::infra::errors::Result; +use crate::telemetry::{self, TraceKind, TraceRecord}; +use pgrx::prelude::*; +use pgrx::Uuid; + +/// A job claimed off the queue, ready to run. +pub struct ClaimedJob { + pub id: Uuid, + pub kind: AgentMode, + pub question: String, + /// The role that enqueued the job. The worker runs the agent loop under + /// this role (`SET LOCAL ROLE`) so async work has exactly the same + /// privileges the caller would have had synchronously — not the worker's + /// superuser rights. + pub owner: String, +} + +/// Atomically claim the oldest pending job for the current database, if any. +/// +/// Thin wrapper over `ask._job_claim()` (which does the FOR UPDATE SKIP +/// LOCKED + pending → running transition). Returns `None` when the queue is +/// empty. Maps the textual `kind` column to [`AgentMode`]; an unknown value +/// defaults to `Execute` (the `ask` mode) rather than failing the drain. +pub fn claim_one() -> Result> { + Spi::connect(|client| { + // _job_claim() also returns `attempts`; we deliberately select only + // the three columns the use-case layer needs. The retry policy lives + // entirely in _job_fail (which reads attempts in SQL), so the Rust + // layer never needs the count. Selecting a subset is forward- + // compatible: adding columns to _job_claim won't break this. + // _job_claim() returns (id, kind, question, attempts, owner). We read + // id/kind/question/owner; attempts stays in the SQL retry policy. + let tup = client + .select( + "SELECT id, kind, question, owner::text FROM ask._job_claim()", + Some(1), + &[], + )? + .first(); + if tup.is_empty() { + return Ok(None); + } + let id: Option = tup.get(1)?; + let kind_txt: Option = tup.get(2)?; + let question: Option = tup.get(3)?; + let owner: Option = tup.get(4)?; + let id = id.ok_or_else(|| crate::infra::errors::AskError::Sql("claim: null id".into()))?; + let question = question + .ok_or_else(|| crate::infra::errors::AskError::Sql("claim: null question".into()))?; + let owner = + owner.ok_or_else(|| crate::infra::errors::AskError::Sql("claim: null owner".into()))?; + let kind = match kind_txt.as_deref() { + Some("sql") => AgentMode::GenerateOnly, + _ => AgentMode::Execute, + }; + Ok(Some(ClaimedJob { + id, + kind, + question, + owner, + })) + }) +} + +/// Run a claimed job to completion: execute the agent loop, then record the +/// outcome via `_job_complete` (success) or `_job_fail` (which retries or +/// marks terminal per `jobs_max_attempts`). +/// +/// Never returns `Err` for an agent-level failure — a failed agent run is a +/// normal job outcome routed to `_job_fail`, not a driver error. `Err` is +/// reserved for an SPI failure while recording the outcome, which the driver +/// surfaces. A trace row is written for the run just like a synchronous +/// `ask.ask`, so async jobs are as observable as sync calls. +pub fn execute_claimed(cfg: &RuntimeConfig, job: &ClaimedJob) -> Result<()> { + let kind = match job.kind { + AgentMode::Execute => TraceKind::Ask, + AgentMode::GenerateOnly => TraceKind::Sql, + }; + + // Privilege isolation: run the agent loop under the role that ENQUEUED + // the job, not the worker's superuser identity. `SET LOCAL ROLE` switches + // current_user for the rest of this transaction, so every tool SPI query + // (sql_query, sample_table, …) is checked against the owner's privileges + // exactly as a synchronous ask.ask() by that role would be. We restore + // the worker role before recording the outcome so the SECURITY DEFINER + // completion helpers run as the worker. RESET runs on every path, + // including the agent-error path, via the explicit reset below. + set_local_role(&job.owner)?; + + // Run the agent loop with a trace row, mirroring the synchronous path. + let mut rec = TraceRecord::start(kind, cfg, &job.question); + let run = agent::run_with_cfg(cfg, &job.question, Vec::new(), job.kind); + + // Back to the worker role for the trusted state-machine writes. + reset_role(); + + match run { + Ok(outcome) => { + rec.iterations = outcome.iterations; + rec.tool_calls = outcome.tool_calls.clone(); + rec.final_text = Some(outcome.text.clone()); + if outcome.prompt_tokens > 0 || outcome.completion_tokens > 0 { + rec.prompt_tokens = Some(outcome.prompt_tokens); + rec.completion_tokens = Some(outcome.completion_tokens); + } + telemetry::write(&rec); + complete( + job.id, + &outcome.text, + outcome.prompt_tokens, + outcome.completion_tokens, + )?; + Ok(()) + } + Err(e) => { + rec.error = Some(e.to_string()); + telemetry::write(&rec); + // Route to the retry/terminal state machine. The returned status + // ("pending" = will retry, "failed" = terminal) is informational. + let _status = fail(job.id, &e.to_string())?; + Ok(()) + } + } +} + +/// Switch the current transaction's role to `owner` for the duration of the +/// agent loop (privilege isolation). The role name is escaped with +/// `quote_ident` in SQL so it can't break out of the `SET ROLE` statement. +/// `SET LOCAL` scopes the change to the current transaction; it is undone by +/// [`reset_role`] or automatically on transaction end. +fn set_local_role(owner: &str) -> Result<()> { + // Use set_config('role', , is_local=true) rather than building a + // `SET ROLE ` string: the role name is passed as a bound PARAMETER + // (a value, not spliced SQL text), so there is no identifier-injection + // surface at all — set_config takes the raw role name and validates it + // against pg_authid itself. true = SET LOCAL (transaction-scoped). + Spi::run_with_args( + "SELECT pg_catalog.set_config('role', $1, true)", + &[owner.into()], + )?; + Ok(()) +} + +/// Restore the worker's own role after the agent loop. Best-effort: a +/// failure here is non-fatal because the surrounding transaction (worker) or +/// statement (sync drain) will reset `role` on its own boundary anyway. +fn reset_role() { + let _ = Spi::run("RESET role"); +} + +/// Mark a running job done. Wrapper over `ask._job_complete`. +fn complete(id: Uuid, answer: &str, prompt_tokens: i64, completion_tokens: i64) -> Result { + let ok: Option = Spi::get_one_with_args( + "SELECT ask._job_complete($1, $2, $3, $4)", + &[ + id.into(), + answer.into(), + prompt_tokens.into(), + completion_tokens.into(), + ], + )?; + Ok(ok.unwrap_or(false)) +} + +/// Mark a running job failed (retry or terminal). Wrapper over +/// `ask._job_fail`; passes `jobs_max_attempts` so the retry policy lives in +/// one place. Returns the resulting status text, or `None` if the row was no +/// longer running (e.g. cancelled mid-flight). +fn fail(id: Uuid, error: &str) -> Result> { + let max_attempts = JOBS_MAX_ATTEMPTS.get(); + let status: Option = Spi::get_one_with_args( + "SELECT ask._job_fail($1, $2, $3)", + &[id.into(), error.into(), max_attempts.into()], + )?; + Ok(status) +} + +/// Return a running job we just re-claimed back to `pending` WITHOUT +/// consuming an attempt. Wrapper over `ask._job_release`. Used by the +/// synchronous drain's poison-pill guard so a re-queued retry doesn't sit +/// in `running` until orphan recovery. +fn release(id: Uuid) -> Result { + let ok: Option = Spi::get_one_with_args("SELECT ask._job_release($1)", &[id.into()])?; + Ok(ok.unwrap_or(false)) +} + +/// Return orphaned `running` jobs (worker died) to `pending`. Wrapper over +/// `ask._job_recover_orphans`, passing the configured orphan timeout. +/// Returns how many were recovered. +pub fn recover_orphans() -> Result { + let timeout_ms = JOBS_ORPHAN_TIMEOUT_MS.get(); + let n: Option = + Spi::get_one_with_args("SELECT ask._job_recover_orphans($1)", &[timeout_ms.into()])?; + Ok(n.unwrap_or(0)) +} + +/// Drain up to `max` pending jobs, each claimed-then-executed in sequence. +/// Returns the number of jobs processed (claimed and run, regardless of +/// success/failure outcome). Stops early when the queue is empty. +/// +/// This is the synchronous batch used by `ask.run_pending_jobs()`. The whole +/// batch runs in the caller's transaction; `max` (from `jobs_batch`) bounds +/// how long that transaction stays open. +/// +/// Poison-pill guard: because this runs in ONE transaction, a job that +/// `_job_fail` returns to `pending` (a transient failure with retries left) +/// is immediately visible to the next `claim_one` in this same loop — a +/// permanently-failing job could otherwise re-claim itself and burn the +/// whole batch budget, starving other pending jobs. We track the ids we've +/// already attempted this pass and skip a re-queued one, so each distinct +/// job is attempted at most once per drain; its retry happens on the next +/// drain (next worker poll / cron tick), giving fairness to other jobs. +pub fn drain(cfg: &RuntimeConfig, max: u32) -> Result { + let mut processed = 0u32; + let mut attempted: std::collections::HashSet = std::collections::HashSet::new(); + for _ in 0..max { + match claim_one()? { + None => break, + Some(job) => { + if !attempted.insert(job.id) { + // Re-claimed a job we already ran this pass (it was + // requeued by a retry). `claim_one` already flipped it + // back to 'running'; release it to 'pending' (M1) so it + // doesn't stall in 'running' until orphan recovery, then + // stop — it and anything behind it are picked up on the + // next drain. Continuing would just spin on this job. + release(job.id)?; + break; + } + execute_claimed(cfg, &job)?; + processed += 1; + } + } + } + Ok(processed) +} diff --git a/src/lib.rs b/src/lib.rs index a9ac0d4..a8afb61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,6 +54,7 @@ mod api; mod bgworker; mod embeddings; mod infra; +mod jobs; mod memory; mod planner; mod providers; @@ -208,6 +209,90 @@ pub extern "C-unwind" fn _PG_init() { GucContext::Userset, GucFlags::default(), ); + GucRegistry::define_int_guc( + c"pg_ask.events_max_payload_bytes", + c"Max serialized-JSON bytes for an ask.emit() payload (0 = unlimited)", + c"Oversized payloads are rejected before any outbox row is written.", + &EVENTS_MAX_PAYLOAD_BYTES, + 0, + 1_073_741_824, + GucContext::Userset, + GucFlags::default(), + ); + GucRegistry::define_int_guc( + c"pg_ask.events_max_per_minute", + c"Per-(emitter,event) ask.emit() rate limit per rolling minute (0 = off)", + c"Beyond the limit the emit is a silent no-op; the surrounding \ + trigger transaction is never aborted.", + &EVENTS_MAX_PER_MINUTE, + 0, + 1_000_000, + GucContext::Userset, + GucFlags::default(), + ); + GucRegistry::define_int_guc( + c"pg_ask.events_dedup_window_ms", + c"ask.emit() dedup window in ms for identical (emitter,event,payload) (0 = off)", + c"Within the window a duplicate emit is a silent no-op. Plain integer \ + milliseconds (e.g. 60000), NOT a unit-suffixed value: the SQL-side \ + writer reads this via current_setting()::int, which a UNIT_MS GUC \ + would break by normalizing 60000 to '1min'.", + &EVENTS_DEDUP_WINDOW_MS, + 0, + 86_400_000, + GucContext::Userset, + GucFlags::default(), + ); + GucRegistry::define_bool_guc( + c"pg_ask.jobs_enabled", + c"Enable ask.ask_async(): enqueue jobs to ask._jobs for a worker to run", + c"", + &JOBS_ENABLED, + GucContext::Userset, + GucFlags::default(), + ); + GucRegistry::define_int_guc( + c"pg_ask.jobs_max_attempts", + c"Times a failed async job is retried before it is marked failed", + c"A transient failure returns the job to pending; once attempts reach \ + this it is terminal. Minimum 1 (no retry).", + &JOBS_MAX_ATTEMPTS, + 1, + 100, + GucContext::Userset, + GucFlags::default(), + ); + GucRegistry::define_int_guc( + c"pg_ask.jobs_orphan_timeout_ms", + c"A running job older than this (ms) is reclaimed to pending (worker died)", + c"Must exceed the worst-case agent-loop runtime. Plain integer ms; the \ + SQL recovery helper reads it via current_setting()::int.", + &JOBS_ORPHAN_TIMEOUT_MS, + 1_000, + 86_400_000, + GucContext::Userset, + GucFlags::default(), + ); + GucRegistry::define_int_guc( + c"pg_ask.jobs_batch", + c"Max jobs processed per ask.run_pending_jobs() / worker drain pass", + c"", + &JOBS_BATCH, + 1, + 10_000, + GucContext::Userset, + GucFlags::default(), + ); + GucRegistry::define_int_guc( + c"pg_ask.jobs_poll_interval_ms", + c"Background worker queue-poll interval in ms (safety net vs missed NOTIFY)", + c"Plain integer ms.", + &JOBS_POLL_INTERVAL_MS, + 100, + 3_600_000, + GucContext::Userset, + GucFlags::default(), + ); GucRegistry::define_int_guc( c"pg_ask.schema_char_budget", c"Soft cap on schema dump injected into the system prompt (characters).", @@ -435,6 +520,602 @@ mod tests { assert_eq!(second, Some(false)); } + #[pg_test] + fn emit_rejects_invalid_event_name() { + Spi::run("SET pg_ask.events_enabled = on").unwrap(); + // Whitespace / illegal chars are rejected before any write. + let res = std::panic::catch_unwind(|| { + Spi::get_one::("SELECT ask.emit('bad name!', '{}'::jsonb)") + }); + assert!(res.is_err(), "event name with space/!@ must raise"); + let n: Option = Spi::get_one("SELECT count(*) FROM ask._outbox").unwrap(); + assert_eq!(n, Some(0), "no row written on validation failure"); + } + + #[pg_test] + fn emit_summary_ceiling_is_bytes_not_chars() { + // D1 regression: the summary ceiling is measured in BYTES on both + // layers (Rust dropped its own check; SQL uses octet_length). A + // multi-byte string just under 8192 chars but over 8192 bytes must + // be rejected consistently — no Rust/SQL drift. + Spi::run("SET pg_ask.events_enabled = on").unwrap(); + // 'ş' is 2 bytes in UTF-8. 5000 of them = 10000 bytes, 5000 chars. + let res = std::panic::catch_unwind(|| { + Spi::get_one::("SELECT ask.emit('s.evt', '{}'::jsonb, repeat('ş', 5000))") + }); + assert!( + res.is_err(), + "summary over 8192 BYTES must raise even if < 8192 chars" + ); + let n: Option = Spi::get_one("SELECT count(*) FROM ask._outbox").unwrap(); + assert_eq!(n, Some(0), "no row written when summary too large"); + } + + #[pg_test] + fn emit_rejects_oversized_payload() { + Spi::run("SET pg_ask.events_enabled = on").unwrap(); + Spi::run("SET pg_ask.events_max_payload_bytes = 64").unwrap(); + // A payload whose serialized form exceeds 64 bytes must raise. + let res = std::panic::catch_unwind(|| { + Spi::get_one::( + "SELECT ask.emit('x.y', jsonb_build_object('blob', repeat('a', 200)))", + ) + }); + assert!(res.is_err(), "oversized payload must raise"); + let n: Option = Spi::get_one("SELECT count(*) FROM ask._outbox").unwrap(); + assert_eq!(n, Some(0), "no row written when payload too large"); + } + + #[pg_test] + fn emit_dedup_window_collapses_duplicates() { + Spi::run("SET pg_ask.events_enabled = on").unwrap(); + Spi::run("SET pg_ask.events_dedup_window_ms = 60000").unwrap(); + // First emit writes a row. + let first: Option = + Spi::get_one("SELECT ask.emit('dup.event', '{\"a\": 1}'::jsonb)").unwrap(); + assert!(first.is_some(), "first emit writes"); + // Identical (emitter,event,payload) within the window → silent no-op. + let second: Option = + Spi::get_one("SELECT ask.emit('dup.event', '{\"a\": 1}'::jsonb)").unwrap(); + assert!(second.is_none(), "duplicate within window is suppressed"); + // A different payload is NOT a duplicate → writes. + let third: Option = + Spi::get_one("SELECT ask.emit('dup.event', '{\"a\": 2}'::jsonb)").unwrap(); + assert!(third.is_some(), "distinct payload is not deduped"); + let n: Option = Spi::get_one("SELECT count(*) FROM ask._outbox").unwrap(); + assert_eq!(n, Some(2), "exactly two rows written"); + } + + #[pg_test] + fn emit_rate_limit_suppresses_excess() { + Spi::run("SET pg_ask.events_enabled = on").unwrap(); + Spi::run("SET pg_ask.events_max_per_minute = 2").unwrap(); + // Distinct payloads so dedup (off here) is irrelevant; same event so + // they share the rate-limit key. + let a: Option = + Spi::get_one("SELECT ask.emit('rl.event', '{\"i\": 1}'::jsonb)").unwrap(); + let b: Option = + Spi::get_one("SELECT ask.emit('rl.event', '{\"i\": 2}'::jsonb)").unwrap(); + let c: Option = + Spi::get_one("SELECT ask.emit('rl.event', '{\"i\": 3}'::jsonb)").unwrap(); + assert!(a.is_some() && b.is_some(), "first two within limit"); + assert!(c.is_none(), "third over the per-minute cap is suppressed"); + let n: Option = Spi::get_one("SELECT count(*) FROM ask._outbox").unwrap(); + assert_eq!(n, Some(2), "only two rows survive the rate limit"); + } + + #[pg_test] + fn prune_events_removes_only_delivered() { + Spi::run("SET pg_ask.events_enabled = on").unwrap(); + // One row we deliver+age, one we leave pending. + let delivered: pgrx::Uuid = Spi::get_one("SELECT ask.emit('p.delivered', '{}'::jsonb)") + .unwrap() + .unwrap(); + let _pending: pgrx::Uuid = Spi::get_one("SELECT ask.emit('p.pending', '{}'::jsonb)") + .unwrap() + .unwrap(); + // Mark the first processed and backdate it past the prune horizon. + Spi::get_one_with_args::( + "SELECT ask._outbox_mark_processed($1)", + &[delivered.into()], + ) + .unwrap(); + Spi::run_with_args( + "UPDATE ask._outbox SET processed_at = now() - interval '10 days' WHERE id = $1", + &[delivered.into()], + ) + .unwrap(); + // Prune everything delivered older than 7 days. + let removed: Option = Spi::get_one("SELECT ask.prune_events('7 days')").unwrap(); + assert_eq!(removed, Some(1), "one delivered+aged row pruned"); + // The pending row is untouched. + let remaining: Option = Spi::get_one("SELECT count(*) FROM ask._outbox").unwrap(); + assert_eq!(remaining, Some(1), "pending row survives prune"); + let ev: Option = Spi::get_one("SELECT event FROM ask._outbox").unwrap(); + assert_eq!(ev.as_deref(), Some("p.pending")); + } + + #[pg_test] + fn outbox_emit_direct_call_respects_disabled_switch() { + // B2: the enabled-check must live in _outbox_emit, not only in the + // Rust wrapper — otherwise a direct helper call bypasses it. + Spi::run("SET pg_ask.events_enabled = off").unwrap(); + let id: Option = + Spi::get_one("SELECT ask._outbox_emit('direct.evt', '{}'::jsonb, NULL)").unwrap(); + assert!(id.is_none(), "direct _outbox_emit must no-op when disabled"); + let n: Option = Spi::get_one("SELECT count(*) FROM ask._outbox").unwrap(); + assert_eq!(n, Some(0), "no row written via direct call when disabled"); + } + + #[pg_test] + fn outbox_emit_direct_call_validates_event_name() { + // B2: validation must live in _outbox_emit so a direct call can't + // smuggle a newline-laced / illegal event name past the Rust checks. + Spi::run("SET pg_ask.events_enabled = on").unwrap(); + let res = std::panic::catch_unwind(|| { + Spi::get_one::("SELECT ask._outbox_emit(E'bad\\nname', '{}'::jsonb, NULL)") + }); + assert!( + res.is_err(), + "direct call with newline event name must raise" + ); + let n: Option = Spi::get_one("SELECT count(*) FROM ask._outbox").unwrap(); + assert_eq!( + n, + Some(0), + "no row written on direct-call validation failure" + ); + } + + #[pg_test] + fn outbox_emit_direct_call_enforces_payload_ceiling() { + // B2: payload ceiling enforced in SQL, not just Rust. + Spi::run("SET pg_ask.events_enabled = on").unwrap(); + Spi::run("SET pg_ask.events_max_payload_bytes = 64").unwrap(); + let res = std::panic::catch_unwind(|| { + Spi::get_one::( + "SELECT ask._outbox_emit('x.y', jsonb_build_object('b', repeat('a', 200)), NULL)", + ) + }); + assert!( + res.is_err(), + "direct call with oversized payload must raise" + ); + } + + #[pg_test] + fn emit_fires_notify_without_error() { + // The NOTIFY is now fired inside _outbox_emit. pgrx tests run in a + // single transaction, so the notification is never delivered to a + // client and can't be asserted directly — but the pg_notify call is + // exercised on every emit, and a failure there (e.g. a bad channel + // or a shadowed function) would surface as an ERROR from emit. A + // clean id return is the evidence the NOTIFY path ran. + Spi::run("SET pg_ask.events_enabled = on").unwrap(); + Spi::run("LISTEN pg_ask_events").unwrap(); + let id: Option = + Spi::get_one("SELECT ask.emit('notify.evt', '{}'::jsonb)").unwrap(); + assert!( + id.is_some(), + "emit returns an id; NOTIFY path ran without error" + ); + } + + #[pg_test] + fn event_channel_constant_is_stable() { + // The consumer (senti listener) hard-codes 'pg_ask_events'. Guard the + // contract so a rename is caught at test time. + assert_eq!(crate::infra::events::EVENT_CHANNEL, "pg_ask_events"); + } + + #[pg_test] + fn prune_events_batched_removes_all_eligible() { + // H4: with a small batch size the loop must still remove every + // eligible row, not just one batch. + Spi::run("SET pg_ask.events_enabled = on").unwrap(); + for i in 0..5 { + let id: pgrx::Uuid = Spi::get_one(&format!( + "SELECT ask.emit('b.evt', jsonb_build_object('i', {i}))" + )) + .unwrap() + .unwrap(); + Spi::get_one_with_args::("SELECT ask._outbox_mark_processed($1)", &[id.into()]) + .unwrap(); + } + // Age them all past the horizon. + Spi::run("UPDATE ask._outbox SET processed_at = now() - interval '10 days'").unwrap(); + // Batch size 2 over 5 rows → must still delete all 5. + let removed: Option = Spi::get_one("SELECT ask.prune_events('7 days', 2)").unwrap(); + assert_eq!(removed, Some(5), "batched prune removes every eligible row"); + let n: Option = Spi::get_one("SELECT count(*) FROM ask._outbox").unwrap(); + assert_eq!(n, Some(0)); + } + + #[pg_test] + fn emit_dedup_is_key_order_insensitive() { + // H1: dedup compares md5(payload::text); jsonb normalizes key order + // before the cast, so the same logical payload with reordered keys + // must be treated as a duplicate. + Spi::run("SET pg_ask.events_enabled = on").unwrap(); + Spi::run("SET pg_ask.events_dedup_window_ms = 60000").unwrap(); + let first: Option = + Spi::get_one("SELECT ask.emit('dk.evt', '{\"a\": 1, \"b\": 2}'::jsonb)").unwrap(); + assert!(first.is_some()); + // Same content, reversed key order → deduped. + let second: Option = + Spi::get_one("SELECT ask.emit('dk.evt', '{\"b\": 2, \"a\": 1}'::jsonb)").unwrap(); + assert!( + second.is_none(), + "reordered-key duplicate must be suppressed" + ); + let n: Option = Spi::get_one("SELECT count(*) FROM ask._outbox").unwrap(); + assert_eq!(n, Some(1)); + } + + // ===================== Async job queue (v0.5.9) ===================== + + #[pg_test] + fn ask_async_is_noop_when_jobs_disabled() { + Spi::run("SET pg_ask.jobs_enabled = off").unwrap(); + let id: Option = Spi::get_one("SELECT ask.ask_async('anything')").unwrap(); + assert!(id.is_none(), "ask_async returns NULL when disabled"); + let n: Option = Spi::get_one("SELECT count(*) FROM ask._jobs").unwrap(); + assert_eq!(n, Some(0), "no job row when disabled"); + } + + #[pg_test] + fn ask_async_enqueues_pending_job() { + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + let id: Option = + Spi::get_one("SELECT ask.ask_async('count rows', 'sql')").unwrap(); + assert!(id.is_some(), "ask_async returns a job id when enabled"); + let status: Option = + Spi::get_one("SELECT status FROM ask._jobs ORDER BY ts DESC LIMIT 1").unwrap(); + assert_eq!(status.as_deref(), Some("pending")); + let kind: Option = + Spi::get_one("SELECT kind FROM ask._jobs ORDER BY ts DESC LIMIT 1").unwrap(); + assert_eq!(kind.as_deref(), Some("sql")); + } + + #[pg_test] + fn ask_async_rejects_bad_kind() { + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + let res = std::panic::catch_unwind(|| { + Spi::get_one::("SELECT ask.ask_async('q', 'bogus')") + }); + assert!(res.is_err(), "unknown kind must raise"); + } + + #[pg_test] + fn job_claim_is_atomic_and_fifo() { + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + let first: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('first', 'sql')") + .unwrap() + .unwrap(); + let _second: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('second', 'sql')") + .unwrap() + .unwrap(); + // Both rows get ts = now() (the statement timestamp is fixed within a + // pg_test's single transaction), so `(ts, id)` would otherwise fall + // back to the random uuid for ordering. Make `first` unambiguously + // older so the FIFO assertion is deterministic. + Spi::run_with_args( + "UPDATE ask._jobs SET ts = ts - interval '1 second' WHERE id = $1", + &[first.into()], + ) + .unwrap(); + // Claim once → oldest (first) flips to running. + let claimed: Option = Spi::get_one("SELECT id FROM ask._job_claim()").unwrap(); + assert_eq!(claimed, Some(first), "claim returns the oldest pending job"); + let st: Option = Spi::get_one_with_args( + "SELECT status FROM ask._jobs WHERE id = $1", + &[first.into()], + ) + .unwrap(); + assert_eq!(st.as_deref(), Some("running")); + // attempts bumped to 1, started_at + worker_pid stamped. + let attempts: Option = Spi::get_one_with_args( + "SELECT attempts FROM ask._jobs WHERE id = $1", + &[first.into()], + ) + .unwrap(); + assert_eq!(attempts, Some(1)); + } + + #[pg_test] + fn job_complete_only_acts_on_running() { + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + let id: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('q', 'sql')") + .unwrap() + .unwrap(); + // Complete a PENDING (not yet claimed) job → no-op (false). + let ok: Option = + Spi::get_one_with_args("SELECT ask._job_complete($1, 'ans', 0, 0)", &[id.into()]) + .unwrap(); + assert_eq!(ok, Some(false), "complete refuses a non-running row"); + // Claim then complete → succeeds. + Spi::run("SELECT id FROM ask._job_claim()").unwrap(); + let ok2: Option = Spi::get_one_with_args( + "SELECT ask._job_complete($1, 'the answer', 12, 34)", + &[id.into()], + ) + .unwrap(); + assert_eq!(ok2, Some(true)); + let st: Option = + Spi::get_one_with_args("SELECT status FROM ask._jobs WHERE id=$1", &[id.into()]) + .unwrap(); + assert_eq!(st.as_deref(), Some("done")); + } + + #[pg_test] + fn job_fail_retries_then_terminal() { + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + let id: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('q', 'sql')") + .unwrap() + .unwrap(); + // max_attempts = 2: first claim+fail → back to pending (retry). + Spi::run("SELECT id FROM ask._job_claim()").unwrap(); + let s1: Option = + Spi::get_one_with_args("SELECT ask._job_fail($1, 'boom', 2)", &[id.into()]).unwrap(); + assert_eq!(s1.as_deref(), Some("pending"), "first failure retries"); + // second claim+fail → terminal failed (attempts now 2 >= 2). + Spi::run("SELECT id FROM ask._job_claim()").unwrap(); + let s2: Option = + Spi::get_one_with_args("SELECT ask._job_fail($1, 'boom again', 2)", &[id.into()]) + .unwrap(); + assert_eq!(s2.as_deref(), Some("failed"), "second failure is terminal"); + let err: Option = + Spi::get_one_with_args("SELECT error FROM ask._jobs WHERE id=$1", &[id.into()]) + .unwrap(); + assert_eq!(err.as_deref(), Some("boom again")); + } + + #[pg_test] + fn job_recover_orphans_revives_stale_running() { + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + let id: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('q', 'sql')") + .unwrap() + .unwrap(); + Spi::run("SELECT id FROM ask._job_claim()").unwrap(); + // Backdate started_at so it looks orphaned. + Spi::run_with_args( + "UPDATE ask._jobs SET started_at = now() - interval '1 hour' WHERE id = $1", + &[id.into()], + ) + .unwrap(); + // Recover with a 60s timeout → the 1h-old running job is revived. + let n: Option = Spi::get_one("SELECT ask._job_recover_orphans(60000)").unwrap(); + assert_eq!(n, Some(1), "one orphan recovered"); + let st: Option = + Spi::get_one_with_args("SELECT status FROM ask._jobs WHERE id=$1", &[id.into()]) + .unwrap(); + assert_eq!(st.as_deref(), Some("pending"), "orphan back to pending"); + } + + #[pg_test] + fn job_cancel_blocks_completion() { + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + let id: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('q', 'sql')") + .unwrap() + .unwrap(); + Spi::run("SELECT id FROM ask._job_claim()").unwrap(); + // Cancel the running job. + let cancelled: Option = + Spi::get_one_with_args("SELECT ask.cancel_job($1)", &[id.into()]).unwrap(); + assert_eq!(cancelled, Some(true)); + // A late completion must NOT resurrect a cancelled job. + let ok: Option = Spi::get_one_with_args( + "SELECT ask._job_complete($1, 'too late', 0, 0)", + &[id.into()], + ) + .unwrap(); + assert_eq!(ok, Some(false), "cancelled job can't be completed"); + let st: Option = + Spi::get_one_with_args("SELECT status FROM ask._jobs WHERE id=$1", &[id.into()]) + .unwrap(); + assert_eq!(st.as_deref(), Some("cancelled")); + } + + #[pg_test] + fn job_complete_requires_owning_worker_pid() { + // B1 regression: _job_complete only transitions a row whose + // worker_pid matches the caller's backend. We simulate a "ghost + // worker" by claiming (stamping our pid), then rewriting worker_pid + // to a different value, then completing — it must be a no-op. + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + let id: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('q', 'sql')") + .unwrap() + .unwrap(); + Spi::run("SELECT id FROM ask._job_claim()").unwrap(); + // Pretend another backend now owns the running row. + Spi::run_with_args( + "UPDATE ask._jobs SET worker_pid = pg_backend_pid() + 1 WHERE id = $1", + &[id.into()], + ) + .unwrap(); + let ok: Option = + Spi::get_one_with_args("SELECT ask._job_complete($1, 'stale', 0, 0)", &[id.into()]) + .unwrap(); + assert_eq!(ok, Some(false), "complete from a non-owning pid is a no-op"); + let st: Option = + Spi::get_one_with_args("SELECT status FROM ask._jobs WHERE id=$1", &[id.into()]) + .unwrap(); + assert_eq!(st.as_deref(), Some("running"), "row stays running"); + } + + #[pg_test] + fn job_release_returns_running_to_pending() { + // M1: _job_release flips our running claim back to pending without + // consuming an attempt. + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + let id: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('q', 'sql')") + .unwrap() + .unwrap(); + Spi::run("SELECT id FROM ask._job_claim()").unwrap(); + let attempts_before: Option = + Spi::get_one_with_args("SELECT attempts FROM ask._jobs WHERE id=$1", &[id.into()]) + .unwrap(); + assert_eq!(attempts_before, Some(1)); + let released: Option = + Spi::get_one_with_args("SELECT ask._job_release($1)", &[id.into()]).unwrap(); + assert_eq!(released, Some(true)); + let st: Option = + Spi::get_one_with_args("SELECT status FROM ask._jobs WHERE id=$1", &[id.into()]) + .unwrap(); + assert_eq!(st.as_deref(), Some("pending"), "released back to pending"); + // attempts NOT incremented by release. + let attempts_after: Option = + Spi::get_one_with_args("SELECT attempts FROM ask._jobs WHERE id=$1", &[id.into()]) + .unwrap(); + assert_eq!( + attempts_after, + Some(1), + "release does not consume an attempt" + ); + } + + #[pg_test] + fn run_pending_jobs_is_noop_when_disabled() { + // H1 regression: with jobs disabled, run_pending_jobs returns 0 and + // does NOT run orphan recovery (a running row stays running). + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + let id: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('q', 'sql')") + .unwrap() + .unwrap(); + Spi::run("SELECT id FROM ask._job_claim()").unwrap(); + Spi::run_with_args( + "UPDATE ask._jobs SET started_at = now() - interval '1 hour' WHERE id = $1", + &[id.into()], + ) + .unwrap(); + // Now disable and drain. + Spi::run("SET pg_ask.jobs_enabled = off").unwrap(); + let processed: Option = Spi::get_one("SELECT ask.run_pending_jobs()").unwrap(); + assert_eq!(processed, Some(0), "disabled drain processes nothing"); + let st: Option = + Spi::get_one_with_args("SELECT status FROM ask._jobs WHERE id=$1", &[id.into()]) + .unwrap(); + assert_eq!( + st.as_deref(), + Some("running"), + "disabled drain does not run orphan recovery" + ); + } + + #[pg_test] + fn job_claim_returns_owner() { + // Privilege isolation: claim must expose the enqueuing owner so the + // worker can SET ROLE to it before running the agent loop. + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + Spi::get_one::("SELECT ask.ask_async('q', 'sql')") + .unwrap() + .unwrap(); + let owner: Option = + Spi::get_one("SELECT owner::text FROM ask._job_claim()").unwrap(); + // session_user in the test backend; just assert it's non-null and + // matches the job row's owner. + assert!(owner.is_some(), "claim returns the owner role"); + } + + #[pg_test] + fn run_pending_jobs_executes_end_to_end() { + // Full async path against the fixture provider: enqueue a 'sql' + // job, drain it synchronously, and assert the answer landed. + use_fixture("sql_only"); + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + let id: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('count rows', 'sql')") + .unwrap() + .unwrap(); + let processed: Option = Spi::get_one("SELECT ask.run_pending_jobs()").unwrap(); + assert_eq!(processed, Some(1), "one job processed"); + let st: Option = + Spi::get_one_with_args("SELECT ask.job_status($1)", &[id.into()]).unwrap(); + assert_eq!(st.as_deref(), Some("done")); + let ans: Option = + Spi::get_one_with_args("SELECT ask.job_result($1)", &[id.into()]).unwrap(); + assert_eq!( + ans.as_deref(), + Some("SELECT count(*) FROM pg_class"), + "job answer matches the fixture's final text" + ); + } + + #[pg_test] + fn job_accessors_return_null_not_error_when_absent() { + // Regression: job_status/result/error use a scalar subquery so an + // unknown id (or another owner's job) yields a clean NULL, not a + // "SpiTupleTable positioned before the start" error. This is the + // NotFound == Unauthorized collapse the rest of pg_ask relies on. + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + let bogus = "00000000-0000-0000-0000-000000000000"; + let st: Option = + Spi::get_one(&format!("SELECT ask.job_status('{bogus}'::uuid)")).unwrap(); + assert!(st.is_none(), "unknown job_status is NULL, not an error"); + let ans: Option = + Spi::get_one(&format!("SELECT ask.job_result('{bogus}'::uuid)")).unwrap(); + assert!(ans.is_none(), "unknown job_result is NULL"); + let err: Option = + Spi::get_one(&format!("SELECT ask.job_error('{bogus}'::uuid)")).unwrap(); + assert!(err.is_none(), "unknown job_error is NULL"); + // A pending (not-done) job: status set, but result/error still NULL. + let id: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('q', 'sql')") + .unwrap() + .unwrap(); + let st2: Option = + Spi::get_one_with_args("SELECT ask.job_status($1)", &[id.into()]).unwrap(); + assert_eq!(st2.as_deref(), Some("pending")); + let ans2: Option = + Spi::get_one_with_args("SELECT ask.job_result($1)", &[id.into()]).unwrap(); + assert!(ans2.is_none(), "pending job has no result yet"); + } + + #[pg_test] + fn prune_jobs_removes_only_terminal() { + Spi::run("SET pg_ask.jobs_enabled = on").unwrap(); + Spi::run("DELETE FROM ask._jobs").unwrap(); + // One done+aged, one pending. + let done: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('a', 'sql')") + .unwrap() + .unwrap(); + let _pending: pgrx::Uuid = Spi::get_one("SELECT ask.ask_async('b', 'sql')") + .unwrap() + .unwrap(); + // Claim THIS specific job (don't assume claim order): the + // worker_pid + (ts,id) tie-break make "claim returns 'a'" unreliable, + // so claim whatever comes out and complete that exact id, then age it. + let claimed: pgrx::Uuid = Spi::get_one("SELECT id FROM ask._job_claim()") + .unwrap() + .unwrap(); + Spi::get_one_with_args::( + "SELECT ask._job_complete($1, 'x', 0, 0)", + &[claimed.into()], + ) + .unwrap(); + Spi::run_with_args( + "UPDATE ask._jobs SET finished_at = now() - interval '10 days' WHERE id = $1", + &[claimed.into()], + ) + .unwrap(); + let _ = done; + let removed: Option = Spi::get_one("SELECT ask.prune_jobs('7 days')").unwrap(); + assert_eq!(removed, Some(1), "only the terminal+aged job is pruned"); + let remaining: Option = Spi::get_one("SELECT count(*) FROM ask._jobs").unwrap(); + assert_eq!(remaining, Some(1), "pending job survives"); + } + /// Configure every fixture-driven test the same way: pick the /// fixture provider, point at a scenario, and turn telemetry off /// because the SECURITY DEFINER writer assumes the extension