feat(emit): production-harden the ADR-0017 event outbox (v0.5.8)#1
Merged
Conversation
Harden `ask.emit()` / the `ask._outbox` event outbox into a
production-grade primitive, without breaking the consumer (LISTEN
pg_ask_events) contract. Ships as v0.5.8.
Single authority (security):
- `ask._outbox_emit` is now the ONE place that decides whether/how an
event is written: it re-checks events_enabled, validates input,
enforces flood control, INSERTs, AND fires pg_notify — all atomically
in one SECURITY DEFINER call. Previously validation/enabled/NOTIFY
lived only in the Rust `ask.emit` wrapper, so a role with EXECUTE on
the PUBLIC-granted helper could call it directly to bypass every check
(newline-laced event name, multi-MB payload, write while disabled, no
NOTIFY). The Rust layer is now a thin defense-in-depth adapter.
Input validation (owned solely by SQL, so no Rust/SQL drift):
- event name: non-empty, <= 127 chars, ^[A-Za-z0-9][A-Za-z0-9._:-]*$
- summary: <= 8192 BYTES (octet_length — exact for multi-byte text)
- payload: <= pg_ask.events_max_payload_bytes serialized-JSON bytes
(new GUC, default 64 KiB, 0 disables)
- violations RAISE invalid_parameter_value before any row is written
Flood control (opt-in GUCs, both default 0 = off):
- pg_ask.events_max_per_minute — per-(emitter,event) rate cap
- pg_ask.events_dedup_window_ms — collapse identical
(emitter,event,payload) repeats, compared via md5(payload::text)
(key-order insensitive, cheap vs jsonb equality on large payloads)
- suppressed emits are a silent NULL no-op (never raise — emit runs in
the caller's trigger txn; raising would roll back the firing DML) and
log a RAISE DEBUG line for observability
- checks run atomically under a transaction-scoped advisory lock to kill
the check-then-insert race; the two-arg (domain,key) form keeps it in
a separate lock space from the int8 session lock (no cross-domain
collisions), masked to int4 range to avoid "integer out of range"
Indexes for the new hot paths:
- _outbox_rate_idx (emitter,event,ts) — rate/dedup checks
- _outbox_processed_idx (processed_at) WHERE NOT NULL — retention prune
Retention:
- ask.prune_events('<interval>'[, batch_size]) + SECURITY DEFINER
ask._outbox_prune(interval, int). Deletes only already-delivered rows
older than the interval, in batches (default 10000). Pending rows are
never removed. Operator-only (not granted to PUBLIC).
Consumer contract preserved (no senti listener changes needed):
- channel pg_ask_events, _outbox columns, _outbox_emit(text,jsonb,text)
and _outbox_mark_processed(uuid) signatures all unchanged.
Tests: +12 #[pg_test] covering validation, direct-call bypass attempts,
rate-limit, dedup (incl. key-order), batched prune, summary byte ceiling,
NOTIFY path, and the channel-name contract. Full suite 103 passed.
CI parity verified locally: cargo fmt --check, clippy -D warnings,
cargo pgrx test pg18 all green.
Two upgrade-path bugs found by actually running a 0.5.6 -> 0.5.8 upgrade in Docker (neither is reachable by `cargo pgrx test`, which only ever exercises a fresh install): 1. Dockerfile dropped new upgrade scripts. The staging step copied upgrade SQL via a hardcoded list that stopped at 0.5.5--0.5.6, so 0.5.6--0.5.7 and 0.5.7--0.5.8 never made it into the image — an operator on 0.5.6 had no path to 0.5.8 at all. Replaced with a glob (pg_ask--*--*.sql), matching the deb/apk/rpm packaging scripts, so new migrations are picked up automatically. 2. Migration never created ask.prune_events. pgrx only emits CREATE FUNCTION DDL for #[pg_extern]s into the base-install script, NOT into an ALTER EXTENSION UPDATE script. The 0.5.7--0.5.8 migration only REVOKEd prune_events (guarded by a DO-block that silently skipped because the function didn't exist), so after an upgrade the new ask.prune_events entry point was simply missing. Added the explicit CREATE FUNCTION (LANGUAGE c, MODULE_PATHNAME, prune_events_wrapper) mirroring the pgrx-generated base definition, then the REVOKE. Verified with a real ALTER EXTENSION pg_ask UPDATE TO '0.5.8' in an isolated container: upgrade completes, prune_events is created, bound to the extension, operator-only (PUBLIC revoked), and callable; the new indexes, single-authority _outbox_emit, and emit/dedup all work. Also applied live to the pg_ask_playground (shop_demo) DB, data preserved.
Add ask.ask_async() so a question can be submitted without blocking the
calling backend on the LLM round-trip. The job is enqueued to ask._jobs and
a background worker runs the agent loop in its own backend, writing the
answer back. Ships as v0.5.9.
Why a worker, not off-thread: a PostgreSQL backend is single-threaded and
SPI is not thread-safe, so the only correct async shape is to hand the work
to a separate process. The synchronous ask.ask() path is unchanged.
Architecture (clean-architecture seam):
- data layer: ask._jobs durable state machine + SECURITY DEFINER helpers
(_job_submit/_claim/_complete/_fail/_recover_orphans/_cancel/_jobs_prune).
Claims use FOR UPDATE SKIP LOCKED; owner stamped from session_user.
- use-case layer: src/jobs (claim_one / execute_claimed / drain /
recover_orphans) is the single home of the claim->run->complete logic.
- delivery: both ask.run_pending_jobs() (synchronous / pg_cron) and the
background worker reuse src/jobs unchanged; neither duplicates a rule.
Background workers (when loaded via shared_preload_libraries):
- a launcher discovers pg_ask-enabled databases and spawns one dynamic
per-database worker each, re-reconciling so a new CREATE EXTENSION gets a
worker without a restart. A bgworker binds to one DB for life, hence the
two-tier shape.
- workers are poll-driven: a bgworker cannot LISTEN (PostgreSQL restricts
LISTEN to client backends), so each wakes on jobs_poll_interval_ms. The
enqueue path still pg_notify's pg_ask_jobs for external listeners.
Durability: pending -> running -> done/failed with retry
(jobs_max_attempts) on transient failure and orphan recovery
(jobs_orphan_timeout_ms) so a crashed worker's in-flight job is re-queued,
never lost. Each job runs in its own worker transaction, committing the
running transition before the slow agent loop begins.
Surface: ask.ask_async, job_status/result/error, cancel_job,
run_pending_jobs, prune_jobs. GUCs: jobs_enabled (off), jobs_max_attempts
(3), jobs_orphan_timeout_ms (300000), jobs_batch (10),
jobs_poll_interval_ms (5000).
Bugs caught during live testing (only reachable by a real worker run):
- pg_database.datname is the `name` type (Oid 19), not text; cast to ::text
for the launcher's Rust String mapping.
- a bgworker cannot LISTEN ("cannot execute LISTEN within a background
process"); switched to poll-only.
- extension_present() must use SELECT EXISTS(...) (always one row) instead
of SELECT true FROM ... WHERE (zero rows), which Spi::get_one reports as
"positioned before the start".
- job_status/result/error must use a scalar subquery so an unknown/foreign
id returns NULL, not the same zero-row Spi::get_one error.
Tests: +9 #[pg_test] (submit, atomic FIFO claim, complete-only-on-running,
retry-then-terminal, orphan recovery, cancel-blocks-completion,
end-to-end drain via fixture, NULL-safe accessors, prune). 114 green.
Verified live on the pg_ask_playground (shop_demo): real async jobs ran in
the per-DB worker and returned LLM answers; data preserved across upgrade.
Also fixes the prior bgworker heartbeat log spam (the v0.5 stub logged every
5s); the worker now logs only meaningful lifecycle events.
…cher bugs
Findings from a six-model parallel code review of the async job queue, each
empirically verified against a live PostgreSQL before fixing.
Correctness / security (BLOCKER):
- worker_pid guard: _job_complete / _job_fail / _job_release now require
worker_pid = pg_backend_pid(). A slow-but-alive worker whose job was
orphan-recovered and re-claimed by another worker could otherwise clobber
the new attempt with a stale result (double-execution / wrong answer).
- privilege isolation: the worker runs each job's agent loop under the role
that ENQUEUED it (set_config('role', owner, true) = SET LOCAL ROLE), then
resets to the worker role for the trusted state-machine writes. _job_claim
now returns owner. Async work has exactly the caller's privileges, not the
worker's superuser rights. Verified: a limited_user job runs under
limited_user; RLS scopes its visibility to its own rows.
- dblink conninfo: the launcher's discovery probe used quote_ident, which
produces dbname="MyDb" (libpq reads a DB literally named with the quotes
and fails) — switched to quote_literal. Verified live: quote_ident('MyDb')
fails, quote_literal('MyDb') connects. Without this, DBs with uppercase /
special names were silently never given a worker.
- launcher restart leak: on restart the old launcher's dynamic workers keep
running; the launcher now checks pg_stat_activity for an existing
'pg_ask worker: {db}' before spawning (no duplicate per restart) and
terminates its workers on clean shutdown.
HIGH:
- run_pending_jobs honours jobs_enabled (returns 0, skips orphan recovery
when disabled) — matches the worker and its own docstring.
- jobs_orphan_timeout_ms default 5min -> 1h so a slow-but-legitimate job
(up to max_iterations * http_total_timeout_ms) isn't falsely reclaimed.
- indexes: _jobs_pending_idx / _jobs_running_idx now lead with db (multi-DB
claim/recovery scans); new partial _jobs_terminal_idx (finished_at) WHERE
status IN (done,failed,cancelled) serves prune_jobs.
MEDIUM:
- RLS on ask._jobs (_jobs_owner_select USING owner = session_user), matching
_traces; bgworker (superuser) bypasses. Verified: owner sees 1, superuser
sees all.
- poison-pill drain: a re-claimed retry is _job_release'd back to pending
instead of being left stuck in running until orphan recovery.
- FIFO tie-break: _job_claim orders by (ts, id).
- docker initdb installs dblink in the 'postgres' DB for the launcher.
Tests: +4 #[pg_test] (worker_pid guard, _job_release, run_pending_jobs
disabled no-op, claim returns owner). 118 green. fmt + clippy -D warnings
clean. Migration ↔ bootstrap: all 8 helper bodies character-identical.
Verified live on pg_ask_playground: dblink filter stops the postgres-worker
respawn churn, only shop_demo gets a worker, privilege SET ROLE works, RLS
isolates tenants, retry/fail state machine drives a job to terminal.
CI (a different test execution order than local) exposed two flaky job tests: - All job tests now `DELETE FROM ask._jobs` after enabling the queue, so a job committed by an earlier pg_test (pgrx commits each test's txn) can't leak in and break "oldest pending" / row-count assertions. - job_claim_is_atomic_and_fifo: both rows get ts = now() (the statement timestamp is fixed within a single pg_test transaction), so the (ts, id) tie-break fell back to the random uuid. The test now backdates `first` by 1s so the FIFO assertion is deterministic. - prune_jobs_removes_only_terminal: claim whatever _job_claim returns and complete THAT id (the worker_pid guard + tie-break make "claim returns 'a'" unreliable) instead of assuming claim order. 118 tests green across repeated runs; clippy -D warnings + fmt clean.
These three accessors were annotated `parallel_safe` but read via SPI
(Spi::get_one_with_args). A function the planner believes is parallel-safe
can be invoked inside a parallel worker, where SPI is forbidden
("cannot start commands during a parallel operation"). Switched to
parallel_unsafe in both the #[pg_extern] and the migration's CREATE FUNCTION
DDL. STABLE is kept so the leader can still inline them; only the
parallel-worker invocation path is barred.
Same fix is applied to the pre-existing status / list_tools / list_memories /
list_namespaces functions in a separate PR off main.
118 tests green; clippy -D warnings + fmt clean.
status / list_tools / list_memories / list_namespaces (and, in the previous
commit, job_status / job_result / job_error) were annotated PARALLEL SAFE but
all read via SPI. The planner may invoke a parallel-safe function inside a
parallel worker, where SPI is forbidden ("cannot start commands during a
parallel operation"), so a query like
SELECT ask.status() FROM big_table WHERE ...
could crash the parallel worker. Switched the #[pg_extern] annotations to
parallel_unsafe; STABLE is kept so the leader can still inline them.
version() and status_api_level() are left PARALLEL SAFE — they read no SPI
(env! / a constant).
Upgrade reach: pgrx does not change a function's proparallel flag on
ALTER EXTENSION UPDATE (only fresh-install base SQL reflects the new
annotation), so the 0.5.8->0.5.9 migration now issues
ALTER FUNCTION ask.status() PARALLEL UNSAFE; (and the other three)
so upgraded installs get the fix too. Verified live on pg_ask_playground:
ALTER FUNCTION flips proparallel s->u for all four; version /
status_api_level stay s; the functions still run.
118 tests green; clippy -D warnings + fmt clean.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Production-hardens the ADR-0017 event outbox (
ask.emit()/ask._outbox) into a robust primitive safe to call from triggers and scheduled jobs on the application write hot path. Ships as v0.5.8.The work was done in two passes — an initial hardening pass, then a critical self-review (independent reviewer agent + empirical verification against a live PostgreSQL) that found real bugs the first pass introduced. Everything below is the post-review, fully-validated result.
Why
ask.emit()is meant to be called from DB triggers and cron-style jobs. That imposes two hard constraints the previous implementation didn't fully meet:emitmust not be rolled back just because an alert was a duplicate or rate-limited.GRANTed toPUBLIC, so any rule enforced only in the Rust wrapper could be bypassed by calling the SQL helper directly.What changed
�� Security — single authority (closes a real bypass)
ask._outbox_emitis now the one place that decides whether and how an event is written. It re-checksevents_enabled, validates input, enforces flood control,INSERTs, and firespg_notify— all atomically in oneSECURITY DEFINERcall.Previously validation / the enabled-check / NOTIFY lived only in the Rust
ask.emitwrapper. A role withEXECUTEon the PUBLIC-granted helper could callask._outbox_emit(...)directly to:The Rust layer is now a thin defense-in-depth adapter (cheap
events_enabledshort-circuit + SPI forward). It does no size/charset validation of its own — duplicating those checks is what caused the drift bugs below.✅ Input validation (owned solely by SQL — no drift possible)
event<= 127chars,^[A-Za-z0-9][A-Za-z0-9._:-]*$summary<= 8192bytes (octet_length, exact for multi-byte text)payload<= pg_ask.events_max_payload_bytesserialized-JSON bytes (new GUC, default 64 KiB;0disables)Violations
RAISE invalid_parameter_valuebefore any row is written.�� Flood control (opt-in GUCs, both default
0= off)pg_ask.events_max_per_minute— per-(emitter, event)rate cap.pg_ask.events_dedup_window_ms— collapse identical(emitter, event, payload)repeats, compared viamd5(payload::text)(key-order insensitive because jsonb normalizes before the text cast; far cheaper thanjsonb =on large payloads).A suppressed emit is a silent
NULLno-op — it never raises (so the firing trigger's transaction is never rolled back) and logs aRAISE DEBUGline for observability underlog_min_messages = debug1.Checks run atomically under a transaction-scoped advisory lock to eliminate the check-then-insert race. The lock uses the two-arg
pg_advisory_xact_lock(domain, key)form (domain ='pg_ask._outbox'), keeping it in a separate lock space from the int8 session lock — no cross-domain collisions.⚡ Indexes for the new hot paths
_outbox_rate_idx (emitter, event, ts)— so rate-limit / dedup checks never seq-scan the outbox on emit._outbox_processed_idx (processed_at) WHERE processed_at IS NOT NULL— partial index so retention prunes hit an index instead of a seq scan.��️ Retention
ask.prune_events('<interval>'[, batch_size])+SECURITY DEFINER ask._outbox_prune(interval, int). Deletes only already-delivered rows (processed_at IS NOT NULL) older than the interval, in batches (default 10000;0= single DELETE). Pending rows are never removed. Operator-only (not granted toPUBLIC).Bugs caught during review (and fixed)
The review pass was deliberately skeptical and verified claims against a live PG instance rather than trusting them. It surfaced real issues:
summarybyte/char drift (D1). Rust counted bytes (str::len), SQL counted characters (length()) — a multi-byte summary near the limit was accepted by one layer and rejected by the other. Fix: removed Rust validation entirely (SQL is the single authority) and switched SQL tooctet_length. Regression test added.payloadbyte drift (D2). Rust measured compact JSON; SQL measuredjsonb::text(which inserts spaces and normalizes numbers) — the "same quantity" claim was false. Fix: same single-authority consolidation.::intoverflow.hashtextextended(...)::intraisedinteger out of rangefor hashes outside int4 — which would have crashed every rate-limited/dedup emit in production (and rolled back the firing trigger). Fix: mask to the low 31 bits (& 2147483647). Caught only by the live test run.UNIT_MSGUC vscurrent_setting()::int. Postgres normalizes aUNIT_MSvalue (60000→'1min'), which broke the SQL-side::intcast. Fix: plain-integer GUC. Also caught only by the live test run.Testing & CI
cargo pgrx test pg18→ 103 passed, 0 failed (+12 new#[pg_test]: validation, direct-call bypass attempts for disabled/invalid-name/oversized-payload, rate-limit, dedup incl. key-order, batched prune, summary byte ceiling, NOTIFY path, channel-name contract).cargo fmt --all -- --check→ clean (this check found and we fixed formatting in the new test blocks that would otherwise have failed CI).cargo clippy --no-default-features --features pg18 -- -D warnings→ 0 warnings.All three match
ci.ymlexactly.Migration / release notes
sql/pg_ask--0.5.7--0.5.8.sql(idempotent;CREATE OR REPLACEpreserves grants).events_max_payload_bytes(64 KiB),events_max_per_minute(0),events_dedup_window_ms(0).pg_ask.control/Cargo.toml/Cargo.lockand CHANGELOG. The release maintainer owns final versioning/tagging — adjust the version/tag here if 0.5.8 isn't the intended release number.Files