Skip to content

feat(emit): production-harden the ADR-0017 event outbox (v0.5.8)#1

Merged
ersintarhan merged 7 commits into
mainfrom
harden/emit-event-outbox
Jun 8, 2026
Merged

feat(emit): production-harden the ADR-0017 event outbox (v0.5.8)#1
ersintarhan merged 7 commits into
mainfrom
harden/emit-event-outbox

Conversation

@alpertarhan

Copy link
Copy Markdown
Contributor

Summary

Production-hardens the ADR-0017 event outbox (ask.emit() / ask._outbox) into a robust primitive safe to call from triggers and scheduled jobs on the application write hot path. Ships as v0.5.8.

The work was done in two passes — an initial hardening pass, then a critical self-review (independent reviewer agent + empirical verification against a live PostgreSQL) that found real bugs the first pass introduced. Everything below is the post-review, fully-validated result.

Consumer contract is preserved. The pg_ask_events channel, the ask._outbox columns, the pending-row query, and the ask._outbox_emit(text,jsonb,text) / ask._outbox_mark_processed(uuid) / ask.emit signatures are all unchanged. An existing LISTEN pg_ask_events consumer needs zero changes across the upgrade.


Why

ask.emit() is meant to be called from DB triggers and cron-style jobs. That imposes two hard constraints the previous implementation didn't fully meet:

  1. It must never abort the caller's transaction. A trigger that fires emit must not be rolled back just because an alert was a duplicate or rate-limited.
  2. It can't trust the caller. The writer helper is GRANTed to PUBLIC, so any rule enforced only in the Rust wrapper could be bypassed by calling the SQL helper directly.

What changed

�� Security — single authority (closes a real bypass)

ask._outbox_emit is now the one place that decides whether and how an event is written. It re-checks events_enabled, validates input, enforces flood control, INSERTs, and fires pg_notify — all atomically in one SECURITY DEFINER call.

Previously validation / the enabled-check / NOTIFY lived only in the Rust ask.emit wrapper. A role with EXECUTE on the PUBLIC-granted helper could call ask._outbox_emit(...) directly to:

  • write a newline-laced event name (corrupting the durable log / a listener's routing),
  • write a multi-megabyte payload,
  • write a row while events were globally disabled,
  • …all without firing a NOTIFY.

The Rust layer is now a thin defense-in-depth adapter (cheap events_enabled short-circuit + SPI forward). It does no size/charset validation of its own — duplicating those checks is what caused the drift bugs below.

✅ Input validation (owned solely by SQL — no drift possible)

Field Rule
event non-empty, <= 127 chars, ^[A-Za-z0-9][A-Za-z0-9._:-]*$
summary <= 8192 bytes (octet_length, exact for multi-byte text)
payload <= pg_ask.events_max_payload_bytes serialized-JSON bytes (new GUC, default 64 KiB; 0 disables)

Violations RAISE invalid_parameter_value before any row is written.

�� Flood control (opt-in GUCs, both default 0 = off)

  • pg_ask.events_max_per_minute — per-(emitter, event) rate cap.
  • pg_ask.events_dedup_window_ms — collapse identical (emitter, event, payload) repeats, compared via md5(payload::text) (key-order insensitive because jsonb normalizes before the text cast; far cheaper than jsonb = on large payloads).

A suppressed emit is a silent NULL no-op — it never raises (so the firing trigger's transaction is never rolled back) and logs a RAISE DEBUG line for observability under log_min_messages = debug1.

Checks run atomically under a transaction-scoped advisory lock to eliminate the check-then-insert race. The lock uses the two-arg pg_advisory_xact_lock(domain, key) form (domain = 'pg_ask._outbox'), keeping it in a separate lock space from the int8 session lock — no cross-domain collisions.

⚡ Indexes for the new hot paths

  • _outbox_rate_idx (emitter, event, ts) — so rate-limit / dedup checks never seq-scan the outbox on emit.
  • _outbox_processed_idx (processed_at) WHERE processed_at IS NOT NULL — partial index so retention prunes hit an index instead of a seq scan.

��️ Retention

ask.prune_events('<interval>'[, batch_size]) + SECURITY DEFINER ask._outbox_prune(interval, int). Deletes only already-delivered rows (processed_at IS NOT NULL) older than the interval, in batches (default 10000; 0 = single DELETE). Pending rows are never removed. Operator-only (not granted to PUBLIC).

Honest limitation documented in code/CHANGELOG: as a plpgsql function the whole loop runs in the caller's single transaction, so batching bounds per-statement size (memory, lock acquisition, dead-tuple churn) but does not split WAL across commits or shorten lock lifetime. For that, call it repeatedly from separate transactions.


Bugs caught during review (and fixed)

The review pass was deliberately skeptical and verified claims against a live PG instance rather than trusting them. It surfaced real issues:

  1. summary byte/char drift (D1). Rust counted bytes (str::len), SQL counted characters (length()) — a multi-byte summary near the limit was accepted by one layer and rejected by the other. Fix: removed Rust validation entirely (SQL is the single authority) and switched SQL to octet_length. Regression test added.
  2. payload byte drift (D2). Rust measured compact JSON; SQL measured jsonb::text (which inserts spaces and normalizes numbers) — the "same quantity" claim was false. Fix: same single-authority consolidation.
  3. Advisory lock ::int overflow. hashtextextended(...)::int raised integer out of range for hashes outside int4 — which would have crashed every rate-limited/dedup emit in production (and rolled back the firing trigger). Fix: mask to the low 31 bits (& 2147483647). Caught only by the live test run.
  4. UNIT_MS GUC vs current_setting()::int. Postgres normalizes a UNIT_MS value (60000'1min'), which broke the SQL-side ::int cast. Fix: plain-integer GUC. Also caught only by the live test run.
  5. Missing prune index (H1) and advisory-lock namespace collision (H2) — addressed as above.
  6. Batch-prune over-claim (D3) — corrected the docs to state the real transaction semantics.

Testing & CI

  • cargo pgrx test pg18103 passed, 0 failed (+12 new #[pg_test]: validation, direct-call bypass attempts for disabled/invalid-name/oversized-payload, rate-limit, dedup incl. key-order, batched prune, summary byte ceiling, NOTIFY path, channel-name contract).
  • cargo fmt --all -- --check → clean (this check found and we fixed formatting in the new test blocks that would otherwise have failed CI).
  • cargo clippy --no-default-features --features pg18 -- -D warnings → 0 warnings.
  • Migration ↔ bootstrap function bodies verified byte-identical via diff.

All three match ci.yml exactly.


Migration / release notes

  • New upgrade script sql/pg_ask--0.5.7--0.5.8.sql (idempotent; CREATE OR REPLACE preserves grants).
  • New GUCs: events_max_payload_bytes (64 KiB), events_max_per_minute (0), events_dedup_window_ms (0).
  • Version bumped to 0.5.8 across pg_ask.control / Cargo.toml / Cargo.lock and CHANGELOG. The release maintainer owns final versioning/tagging — adjust the version/tag here if 0.5.8 isn't the intended release number.

Files

sql/pg_ask--0.5.7--0.5.8.sql   (new migration)
sql/bootstrap.sql              (_outbox_emit authority, _outbox_prune batch, indexes)
sql/finalize.sql               (prune_events operator-only REVOKE)
src/infra/events.rs            (thin adapter; drift source removed)
src/infra/config.rs            (3 new GUCs)
src/api/emit.rs                (emit + prune_events #[pg_extern])
src/lib.rs                     (GUC registration + 12 new tests)
pg_ask.control, Cargo.toml, Cargo.lock, CHANGELOG.md, .gitignore

Harden `ask.emit()` / the `ask._outbox` event outbox into a
production-grade primitive, without breaking the consumer (LISTEN
pg_ask_events) contract. Ships as v0.5.8.

Single authority (security):
- `ask._outbox_emit` is now the ONE place that decides whether/how an
  event is written: it re-checks events_enabled, validates input,
  enforces flood control, INSERTs, AND fires pg_notify — all atomically
  in one SECURITY DEFINER call. Previously validation/enabled/NOTIFY
  lived only in the Rust `ask.emit` wrapper, so a role with EXECUTE on
  the PUBLIC-granted helper could call it directly to bypass every check
  (newline-laced event name, multi-MB payload, write while disabled, no
  NOTIFY). The Rust layer is now a thin defense-in-depth adapter.

Input validation (owned solely by SQL, so no Rust/SQL drift):
- event name: non-empty, <= 127 chars, ^[A-Za-z0-9][A-Za-z0-9._:-]*$
- summary: <= 8192 BYTES (octet_length — exact for multi-byte text)
- payload: <= pg_ask.events_max_payload_bytes serialized-JSON bytes
  (new GUC, default 64 KiB, 0 disables)
- violations RAISE invalid_parameter_value before any row is written

Flood control (opt-in GUCs, both default 0 = off):
- pg_ask.events_max_per_minute   — per-(emitter,event) rate cap
- pg_ask.events_dedup_window_ms  — collapse identical
  (emitter,event,payload) repeats, compared via md5(payload::text)
  (key-order insensitive, cheap vs jsonb equality on large payloads)
- suppressed emits are a silent NULL no-op (never raise — emit runs in
  the caller's trigger txn; raising would roll back the firing DML) and
  log a RAISE DEBUG line for observability
- checks run atomically under a transaction-scoped advisory lock to kill
  the check-then-insert race; the two-arg (domain,key) form keeps it in
  a separate lock space from the int8 session lock (no cross-domain
  collisions), masked to int4 range to avoid "integer out of range"

Indexes for the new hot paths:
- _outbox_rate_idx (emitter,event,ts)             — rate/dedup checks
- _outbox_processed_idx (processed_at) WHERE NOT NULL — retention prune

Retention:
- ask.prune_events('<interval>'[, batch_size]) + SECURITY DEFINER
  ask._outbox_prune(interval, int). Deletes only already-delivered rows
  older than the interval, in batches (default 10000). Pending rows are
  never removed. Operator-only (not granted to PUBLIC).

Consumer contract preserved (no senti listener changes needed):
- channel pg_ask_events, _outbox columns, _outbox_emit(text,jsonb,text)
  and _outbox_mark_processed(uuid) signatures all unchanged.

Tests: +12 #[pg_test] covering validation, direct-call bypass attempts,
rate-limit, dedup (incl. key-order), batched prune, summary byte ceiling,
NOTIFY path, and the channel-name contract. Full suite 103 passed.
CI parity verified locally: cargo fmt --check, clippy -D warnings,
cargo pgrx test pg18 all green.
Two upgrade-path bugs found by actually running a 0.5.6 -> 0.5.8 upgrade
in Docker (neither is reachable by `cargo pgrx test`, which only ever
exercises a fresh install):

1. Dockerfile dropped new upgrade scripts. The staging step copied
   upgrade SQL via a hardcoded list that stopped at 0.5.5--0.5.6, so
   0.5.6--0.5.7 and 0.5.7--0.5.8 never made it into the image — an
   operator on 0.5.6 had no path to 0.5.8 at all. Replaced with a glob
   (pg_ask--*--*.sql), matching the deb/apk/rpm packaging scripts, so new
   migrations are picked up automatically.

2. Migration never created ask.prune_events. pgrx only emits CREATE
   FUNCTION DDL for #[pg_extern]s into the base-install script, NOT into
   an ALTER EXTENSION UPDATE script. The 0.5.7--0.5.8 migration only
   REVOKEd prune_events (guarded by a DO-block that silently skipped
   because the function didn't exist), so after an upgrade the new
   ask.prune_events entry point was simply missing. Added the explicit
   CREATE FUNCTION (LANGUAGE c, MODULE_PATHNAME, prune_events_wrapper)
   mirroring the pgrx-generated base definition, then the REVOKE.

Verified with a real ALTER EXTENSION pg_ask UPDATE TO '0.5.8' in an
isolated container: upgrade completes, prune_events is created, bound to
the extension, operator-only (PUBLIC revoked), and callable; the new
indexes, single-authority _outbox_emit, and emit/dedup all work. Also
applied live to the pg_ask_playground (shop_demo) DB, data preserved.
Add ask.ask_async() so a question can be submitted without blocking the
calling backend on the LLM round-trip. The job is enqueued to ask._jobs and
a background worker runs the agent loop in its own backend, writing the
answer back. Ships as v0.5.9.

Why a worker, not off-thread: a PostgreSQL backend is single-threaded and
SPI is not thread-safe, so the only correct async shape is to hand the work
to a separate process. The synchronous ask.ask() path is unchanged.

Architecture (clean-architecture seam):
- data layer: ask._jobs durable state machine + SECURITY DEFINER helpers
  (_job_submit/_claim/_complete/_fail/_recover_orphans/_cancel/_jobs_prune).
  Claims use FOR UPDATE SKIP LOCKED; owner stamped from session_user.
- use-case layer: src/jobs (claim_one / execute_claimed / drain /
  recover_orphans) is the single home of the claim->run->complete logic.
- delivery: both ask.run_pending_jobs() (synchronous / pg_cron) and the
  background worker reuse src/jobs unchanged; neither duplicates a rule.

Background workers (when loaded via shared_preload_libraries):
- a launcher discovers pg_ask-enabled databases and spawns one dynamic
  per-database worker each, re-reconciling so a new CREATE EXTENSION gets a
  worker without a restart. A bgworker binds to one DB for life, hence the
  two-tier shape.
- workers are poll-driven: a bgworker cannot LISTEN (PostgreSQL restricts
  LISTEN to client backends), so each wakes on jobs_poll_interval_ms. The
  enqueue path still pg_notify's pg_ask_jobs for external listeners.

Durability: pending -> running -> done/failed with retry
(jobs_max_attempts) on transient failure and orphan recovery
(jobs_orphan_timeout_ms) so a crashed worker's in-flight job is re-queued,
never lost. Each job runs in its own worker transaction, committing the
running transition before the slow agent loop begins.

Surface: ask.ask_async, job_status/result/error, cancel_job,
run_pending_jobs, prune_jobs. GUCs: jobs_enabled (off), jobs_max_attempts
(3), jobs_orphan_timeout_ms (300000), jobs_batch (10),
jobs_poll_interval_ms (5000).

Bugs caught during live testing (only reachable by a real worker run):
- pg_database.datname is the `name` type (Oid 19), not text; cast to ::text
  for the launcher's Rust String mapping.
- a bgworker cannot LISTEN ("cannot execute LISTEN within a background
  process"); switched to poll-only.
- extension_present() must use SELECT EXISTS(...) (always one row) instead
  of SELECT true FROM ... WHERE (zero rows), which Spi::get_one reports as
  "positioned before the start".
- job_status/result/error must use a scalar subquery so an unknown/foreign
  id returns NULL, not the same zero-row Spi::get_one error.

Tests: +9 #[pg_test] (submit, atomic FIFO claim, complete-only-on-running,
retry-then-terminal, orphan recovery, cancel-blocks-completion,
end-to-end drain via fixture, NULL-safe accessors, prune). 114 green.
Verified live on the pg_ask_playground (shop_demo): real async jobs ran in
the per-DB worker and returned LLM answers; data preserved across upgrade.

Also fixes the prior bgworker heartbeat log spam (the v0.5 stub logged every
5s); the worker now logs only meaningful lifecycle events.
…cher bugs

Findings from a six-model parallel code review of the async job queue, each
empirically verified against a live PostgreSQL before fixing.

Correctness / security (BLOCKER):
- worker_pid guard: _job_complete / _job_fail / _job_release now require
  worker_pid = pg_backend_pid(). A slow-but-alive worker whose job was
  orphan-recovered and re-claimed by another worker could otherwise clobber
  the new attempt with a stale result (double-execution / wrong answer).
- privilege isolation: the worker runs each job's agent loop under the role
  that ENQUEUED it (set_config('role', owner, true) = SET LOCAL ROLE), then
  resets to the worker role for the trusted state-machine writes. _job_claim
  now returns owner. Async work has exactly the caller's privileges, not the
  worker's superuser rights. Verified: a limited_user job runs under
  limited_user; RLS scopes its visibility to its own rows.
- dblink conninfo: the launcher's discovery probe used quote_ident, which
  produces dbname="MyDb" (libpq reads a DB literally named with the quotes
  and fails) — switched to quote_literal. Verified live: quote_ident('MyDb')
  fails, quote_literal('MyDb') connects. Without this, DBs with uppercase /
  special names were silently never given a worker.
- launcher restart leak: on restart the old launcher's dynamic workers keep
  running; the launcher now checks pg_stat_activity for an existing
  'pg_ask worker: {db}' before spawning (no duplicate per restart) and
  terminates its workers on clean shutdown.

HIGH:
- run_pending_jobs honours jobs_enabled (returns 0, skips orphan recovery
  when disabled) — matches the worker and its own docstring.
- jobs_orphan_timeout_ms default 5min -> 1h so a slow-but-legitimate job
  (up to max_iterations * http_total_timeout_ms) isn't falsely reclaimed.
- indexes: _jobs_pending_idx / _jobs_running_idx now lead with db (multi-DB
  claim/recovery scans); new partial _jobs_terminal_idx (finished_at) WHERE
  status IN (done,failed,cancelled) serves prune_jobs.

MEDIUM:
- RLS on ask._jobs (_jobs_owner_select USING owner = session_user), matching
  _traces; bgworker (superuser) bypasses. Verified: owner sees 1, superuser
  sees all.
- poison-pill drain: a re-claimed retry is _job_release'd back to pending
  instead of being left stuck in running until orphan recovery.
- FIFO tie-break: _job_claim orders by (ts, id).
- docker initdb installs dblink in the 'postgres' DB for the launcher.

Tests: +4 #[pg_test] (worker_pid guard, _job_release, run_pending_jobs
disabled no-op, claim returns owner). 118 green. fmt + clippy -D warnings
clean. Migration ↔ bootstrap: all 8 helper bodies character-identical.
Verified live on pg_ask_playground: dblink filter stops the postgres-worker
respawn churn, only shop_demo gets a worker, privilege SET ROLE works, RLS
isolates tenants, retry/fail state machine drives a job to terminal.
CI (a different test execution order than local) exposed two flaky job
tests:

- All job tests now `DELETE FROM ask._jobs` after enabling the queue, so a
  job committed by an earlier pg_test (pgrx commits each test's txn) can't
  leak in and break "oldest pending" / row-count assertions.
- job_claim_is_atomic_and_fifo: both rows get ts = now() (the statement
  timestamp is fixed within a single pg_test transaction), so the (ts, id)
  tie-break fell back to the random uuid. The test now backdates `first` by
  1s so the FIFO assertion is deterministic.
- prune_jobs_removes_only_terminal: claim whatever _job_claim returns and
  complete THAT id (the worker_pid guard + tie-break make "claim returns 'a'"
  unreliable) instead of assuming claim order.

118 tests green across repeated runs; clippy -D warnings + fmt clean.
These three accessors were annotated `parallel_safe` but read via SPI
(Spi::get_one_with_args). A function the planner believes is parallel-safe
can be invoked inside a parallel worker, where SPI is forbidden
("cannot start commands during a parallel operation"). Switched to
parallel_unsafe in both the #[pg_extern] and the migration's CREATE FUNCTION
DDL. STABLE is kept so the leader can still inline them; only the
parallel-worker invocation path is barred.

Same fix is applied to the pre-existing status / list_tools / list_memories /
list_namespaces functions in a separate PR off main.

118 tests green; clippy -D warnings + fmt clean.
status / list_tools / list_memories / list_namespaces (and, in the previous
commit, job_status / job_result / job_error) were annotated PARALLEL SAFE but
all read via SPI. The planner may invoke a parallel-safe function inside a
parallel worker, where SPI is forbidden ("cannot start commands during a
parallel operation"), so a query like
  SELECT ask.status() FROM big_table WHERE ...
could crash the parallel worker. Switched the #[pg_extern] annotations to
parallel_unsafe; STABLE is kept so the leader can still inline them.

version() and status_api_level() are left PARALLEL SAFE — they read no SPI
(env! / a constant).

Upgrade reach: pgrx does not change a function's proparallel flag on
ALTER EXTENSION UPDATE (only fresh-install base SQL reflects the new
annotation), so the 0.5.8->0.5.9 migration now issues
  ALTER FUNCTION ask.status() PARALLEL UNSAFE; (and the other three)
so upgraded installs get the fix too. Verified live on pg_ask_playground:
ALTER FUNCTION flips proparallel s->u for all four; version /
status_api_level stay s; the functions still run.

118 tests green; clippy -D warnings + fmt clean.
@alpertarhan alpertarhan requested a review from ersintarhan June 8, 2026 09:39
@ersintarhan ersintarhan merged commit e9103ee into main Jun 8, 2026
1 check passed
@ersintarhan ersintarhan deleted the harden/emit-event-outbox branch June 8, 2026 11:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants