Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,32 @@ test first, then the implementation that turns it green.
the implementation. Applies to pgque-api, observability, client libraries,
CLI. Does NOT apply to pgque-core repackaging (PgQ already has tests).
See SPECx.md section 13.2.
7. **Separate-transactions rule (snapshot visibility).** PgQue is
snapshot-based. The following operations MUST run in distinct, committed
transactions to be observable downstream — combining them in one tx
silently produces empty batches and dropped messages:
- `pgque.send` / `pgque.insert_event` (producer commit)
→ `pgque.ticker` / `pgque.force_tick` (snapshot is taken here)
→ `pgque.receive` / `pgque.next_batch` (sees only what committed
before the snapshot).
- `pgque.maint_retry_events` (re-inserts retry rows into event tables
with `pg_current_xact_id()`) → `pgque.ticker` (must run in a later
tx so the new ev_txids are visible in its snapshot) → `pgque.receive`.
- `pgque.maint_rotate_tables_step1` → `pgque.maint_rotate_tables_step2`
(PgQ design requirement; already documented in `sql/pgque-api/maint.sql`).

In SQL tests, this means one `do $$ ... $$` block per logical step
(see `tests/test_api_receive.sql` for the canonical pattern). In
client code, default-mode `pgxpool` / `pg.Pool` / `psycopg(autocommit)`
already runs each call in its own implicit tx — the rule is
transparently satisfied. The footgun is reaching for `Pool()` /
`client.conn` to wrap send + receive in one explicit tx; the consumer
side won't see what the producer just sent.

When adding a new test that crosses these boundaries, mark each
transaction boundary in a `-- separate transaction (snapshot
visibility)` comment. When adding new client examples, do not place
send/tick/receive inside a shared `BEGIN`/`COMMIT`.

## Copyright

Expand Down
31 changes: 25 additions & 6 deletions clients/go/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,33 @@ func TestNack_ToDLQAtRetryLimit(t *testing.T) {
const maxCycles = 5
for i := 0; i < maxCycles; i++ {
// Expire any pending retry delays so maint_retry_events picks them up
// immediately; in production these would expire naturally.
if _, err := client.Pool().Exec(ctx,
"update pgque.retry_queue set ev_retry_after = now() - interval '1 second'"); err != nil {
// immediately; in production these would expire naturally. Scope the
// update to this test's queue — pgque.ack() now re-queues unreturned
// batch rows globally (#134), so a global update would race with rows
// from sibling tests.
if _, err := client.Pool().Exec(ctx, `
update pgque.retry_queue rq
set ev_retry_after = now() - interval '1 second'
from pgque.queue q
where q.queue_id = rq.ev_queue
and q.queue_name = $1`, queue); err != nil {
t.Logf("retry_queue update unavailable: %v", err)
}
// Re-queue retry_queue rows for redelivery.
if _, err := client.Pool().Exec(ctx, "select pgque.maint_retry_events()"); err != nil {
t.Logf("maint_retry_events unavailable, using ticker fallback: %v", err)
// Re-queue retry_queue rows for redelivery. maint_retry_events()
// processes at most 10 rows per call; loop until drained so that
// past-due rows left behind by sibling tests (which can pile up under
// the global retry_queue now that pgque.ack() re-queues unreturned
// batch events for #134) do not starve this test's row.
for {
var n int
if err := client.Pool().QueryRow(ctx,
"select pgque.maint_retry_events()").Scan(&n); err != nil {
t.Logf("maint_retry_events unavailable, using ticker fallback: %v", err)
break
}
if n == 0 {
break
}
}
tick(t, client, queue)

Expand Down
4 changes: 2 additions & 2 deletions docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ Grant: `pgque_reader`. Source: `sql/pgque-api/receive.sql`.
select * from pgque.receive('orders', 'processor', 100);
```

**Batch-ownership caveat.** `max_return` limits the number of rows returned to the caller, but `ack(batch_id)` advances the consumer cursor past the entire underlying batch. If `max_return < ticker_max_count`, calling `ack()` after a partial receive will drop the unreturned rows from the consumer's perspective. Either consume the full batch before acking, or use `max_return >= ticker_max_count` for safe pagination.
**Partial-receive safety (#134).** `max_return` caps the rows returned to the caller. The underlying PgQ batch may contain more rows (the dual-tick window is sized by `ticker_max_count` / `ticker_max_lag`). `pgque.receive()` records which msg_ids it actually yielded, and `pgque.ack()` re-queues the unreturned events to `pgque.retry_queue` (with `ev_retry` preserved — these events were never delivered to a handler) before closing the batch. The next `pgque.maint_retry_events()` cycle moves them back into the main event table for re-delivery. Callers using lower-level primitives (`next_batch` + `finish_batch` directly) are unaffected.

#### `pgque.ack(batch_id bigint) → integer`

Closes the batch and advances the consumer position. Modern alias for `pgque.finish_batch`. Returns `1` on success, `0` if the batch was not found.
Closes the batch and advances the consumer position. Returns `1` on success, `0` if the batch was not found. Before calling `pgque.finish_batch`, re-queues any events the batch contained but `pgque.receive()` did not yield (see #134 above).
Grant: `pgque_reader`. Source: `sql/pgque-api/receive.sql`.

#### `pgque.nack(batch_id bigint, msg pgque.message, retry_after interval default '60 seconds', reason text default null) → integer`
Expand Down
67 changes: 67 additions & 0 deletions sql/pgque-api/receive.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,28 @@ begin
end if;
end $$;

-- Tracking table for #134: pgque.receive() records the msg_ids it actually
-- yielded so pgque.ack() can re-queue any unreturned events from the
-- underlying PgQ batch instead of silently dropping them. The row is keyed
-- by batch_id and cleared by ack(); finish_batch() callers that bypass
-- pgque.receive() are unaffected (no row → no re-queue, legacy behavior).
create table if not exists pgque.batch_returned (
batch_id bigint primary key,
returned_msg_ids bigint[] not null default '{}'::bigint[]
);

-- pgque.receive() -- wraps next_batch + get_batch_events
--
-- Fix #134: records returned msg_ids in pgque.batch_returned so ack() can
-- re-queue events the underlying PgQ batch contained but max_return clipped.
-- Without this, ack(batch_id) → finish_batch advances sub_last_tick past the
-- whole tick window and the unreturned events become unreachable.
create or replace function pgque.receive(
i_queue text, i_consumer text, i_max_return int default 100)
returns setof pgque.message as $$
declare
v_batch_id bigint;
v_returned bigint[] := '{}'::bigint[];
ev record;
cnt int := 0;
begin
Expand All @@ -55,23 +71,74 @@ begin
ev.ev_retry, ev.ev_time,
ev.ev_extra1, ev.ev_extra2, ev.ev_extra3, ev.ev_extra4
)::pgque.message;
v_returned := v_returned || ev.ev_id;
cnt := cnt + 1;
exit when cnt >= i_max_return;
end loop;

-- Empty batch: finish immediately to advance the consumer cursor.
if cnt = 0 then
perform pgque.finish_batch(v_batch_id);
return;
end if;

-- Record which msg_ids the caller actually saw so ack() can re-queue
-- the rest. Upsert guards against a re-open of the same batch within
-- the active subscription (next_batch returns the existing batch_id
-- if one is already active; the latest receive() wins).
insert into pgque.batch_returned (batch_id, returned_msg_ids)
values (v_batch_id, v_returned)
on conflict (batch_id) do update
set returned_msg_ids = excluded.returned_msg_ids;

return;
end;
$$ language plpgsql security definer set search_path = pgque, pg_catalog;

-- pgque.ack() -- finishes the batch, advances consumer position
--
-- Fix #134: before finishing the batch, re-queue any events the underlying
-- PgQ batch contained but pgque.receive() did not yield (because of the
-- max_return cap). Re-queue uses the existing pgque.retry_queue path with
-- ev_retry_after = now() so the events are eligible for the next
-- maint_retry_events() cycle, and ev_retry is preserved (these events were
-- never delivered to a handler — they must not count as a retry attempt).
--
-- Backward compatibility: callers that opened the batch via lower-level
-- primitives (next_batch + finish_batch) leave no row in pgque.batch_returned,
-- so ack() falls through to plain finish_batch as before.
create or replace function pgque.ack(i_batch_id bigint)
returns integer as $$
declare
v_returned bigint[];
v_sub_id int4;
v_sub_queue int4;
begin
select returned_msg_ids into v_returned
from pgque.batch_returned
where batch_id = i_batch_id;

if found then
select sub_id, sub_queue into v_sub_id, v_sub_queue
from pgque.subscription
where sub_batch = i_batch_id;

if v_sub_id is not null then
insert into pgque.retry_queue (
ev_retry_after, ev_queue, ev_id, ev_time, ev_txid, ev_owner,
ev_retry, ev_type, ev_data,
ev_extra1, ev_extra2, ev_extra3, ev_extra4)
select now(), v_sub_queue, b.ev_id, b.ev_time, NULL::xid8, v_sub_id,
coalesce(b.ev_retry, 0), b.ev_type, b.ev_data,
b.ev_extra1, b.ev_extra2, b.ev_extra3, b.ev_extra4
from pgque.get_batch_events(i_batch_id) b
where not (b.ev_id = any(coalesce(v_returned, '{}'::bigint[])))
on conflict (ev_owner, ev_id) do nothing;
end if;

delete from pgque.batch_returned where batch_id = i_batch_id;
end if;

return pgque.finish_batch(i_batch_id);
end;
$$ language plpgsql security definer set search_path = pgque, pg_catalog;
Expand Down
67 changes: 67 additions & 0 deletions sql/pgque-tle.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5186,12 +5186,28 @@ begin
end if;
end $$;

-- Tracking table for #134: pgque.receive() records the msg_ids it actually
-- yielded so pgque.ack() can re-queue any unreturned events from the
-- underlying PgQ batch instead of silently dropping them. The row is keyed
-- by batch_id and cleared by ack(); finish_batch() callers that bypass
-- pgque.receive() are unaffected (no row → no re-queue, legacy behavior).
create table if not exists pgque.batch_returned (
batch_id bigint primary key,
returned_msg_ids bigint[] not null default '{}'::bigint[]
);

-- pgque.receive() -- wraps next_batch + get_batch_events
--
-- Fix #134: records returned msg_ids in pgque.batch_returned so ack() can
-- re-queue events the underlying PgQ batch contained but max_return clipped.
-- Without this, ack(batch_id) → finish_batch advances sub_last_tick past the
-- whole tick window and the unreturned events become unreachable.
create or replace function pgque.receive(
i_queue text, i_consumer text, i_max_return int default 100)
returns setof pgque.message as $$
declare
v_batch_id bigint;
v_returned bigint[] := '{}'::bigint[];
ev record;
cnt int := 0;
begin
Expand All @@ -5216,23 +5232,74 @@ begin
ev.ev_retry, ev.ev_time,
ev.ev_extra1, ev.ev_extra2, ev.ev_extra3, ev.ev_extra4
)::pgque.message;
v_returned := v_returned || ev.ev_id;
cnt := cnt + 1;
exit when cnt >= i_max_return;
end loop;

-- Empty batch: finish immediately to advance the consumer cursor.
if cnt = 0 then
perform pgque.finish_batch(v_batch_id);
return;
end if;

-- Record which msg_ids the caller actually saw so ack() can re-queue
-- the rest. Upsert guards against a re-open of the same batch within
-- the active subscription (next_batch returns the existing batch_id
-- if one is already active; the latest receive() wins).
insert into pgque.batch_returned (batch_id, returned_msg_ids)
values (v_batch_id, v_returned)
on conflict (batch_id) do update
set returned_msg_ids = excluded.returned_msg_ids;

return;
end;
$$ language plpgsql security definer set search_path = pgque, pg_catalog;

-- pgque.ack() -- finishes the batch, advances consumer position
--
-- Fix #134: before finishing the batch, re-queue any events the underlying
-- PgQ batch contained but pgque.receive() did not yield (because of the
-- max_return cap). Re-queue uses the existing pgque.retry_queue path with
-- ev_retry_after = now() so the events are eligible for the next
-- maint_retry_events() cycle, and ev_retry is preserved (these events were
-- never delivered to a handler — they must not count as a retry attempt).
--
-- Backward compatibility: callers that opened the batch via lower-level
-- primitives (next_batch + finish_batch) leave no row in pgque.batch_returned,
-- so ack() falls through to plain finish_batch as before.
create or replace function pgque.ack(i_batch_id bigint)
returns integer as $$
declare
v_returned bigint[];
v_sub_id int4;
v_sub_queue int4;
begin
select returned_msg_ids into v_returned
from pgque.batch_returned
where batch_id = i_batch_id;

if found then
select sub_id, sub_queue into v_sub_id, v_sub_queue
from pgque.subscription
where sub_batch = i_batch_id;

if v_sub_id is not null then
insert into pgque.retry_queue (
ev_retry_after, ev_queue, ev_id, ev_time, ev_txid, ev_owner,
ev_retry, ev_type, ev_data,
ev_extra1, ev_extra2, ev_extra3, ev_extra4)
select now(), v_sub_queue, b.ev_id, b.ev_time, NULL::xid8, v_sub_id,
coalesce(b.ev_retry, 0), b.ev_type, b.ev_data,
b.ev_extra1, b.ev_extra2, b.ev_extra3, b.ev_extra4
from pgque.get_batch_events(i_batch_id) b
where not (b.ev_id = any(coalesce(v_returned, '{}'::bigint[])))
on conflict (ev_owner, ev_id) do nothing;
end if;

delete from pgque.batch_returned where batch_id = i_batch_id;
end if;

return pgque.finish_batch(i_batch_id);
end;
$$ language plpgsql security definer set search_path = pgque, pg_catalog;
Expand Down
67 changes: 67 additions & 0 deletions sql/pgque.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5098,12 +5098,28 @@ begin
end if;
end $$;

-- Tracking table for #134: pgque.receive() records the msg_ids it actually
-- yielded so pgque.ack() can re-queue any unreturned events from the
-- underlying PgQ batch instead of silently dropping them. The row is keyed
-- by batch_id and cleared by ack(); finish_batch() callers that bypass
-- pgque.receive() are unaffected (no row → no re-queue, legacy behavior).
create table if not exists pgque.batch_returned (
batch_id bigint primary key,
returned_msg_ids bigint[] not null default '{}'::bigint[]
);

-- pgque.receive() -- wraps next_batch + get_batch_events
--
-- Fix #134: records returned msg_ids in pgque.batch_returned so ack() can
-- re-queue events the underlying PgQ batch contained but max_return clipped.
-- Without this, ack(batch_id) → finish_batch advances sub_last_tick past the
-- whole tick window and the unreturned events become unreachable.
create or replace function pgque.receive(
i_queue text, i_consumer text, i_max_return int default 100)
returns setof pgque.message as $$
declare
v_batch_id bigint;
v_returned bigint[] := '{}'::bigint[];
ev record;
cnt int := 0;
begin
Expand All @@ -5128,23 +5144,74 @@ begin
ev.ev_retry, ev.ev_time,
ev.ev_extra1, ev.ev_extra2, ev.ev_extra3, ev.ev_extra4
)::pgque.message;
v_returned := v_returned || ev.ev_id;
cnt := cnt + 1;
exit when cnt >= i_max_return;
end loop;

-- Empty batch: finish immediately to advance the consumer cursor.
if cnt = 0 then
perform pgque.finish_batch(v_batch_id);
return;
end if;

-- Record which msg_ids the caller actually saw so ack() can re-queue
-- the rest. Upsert guards against a re-open of the same batch within
-- the active subscription (next_batch returns the existing batch_id
-- if one is already active; the latest receive() wins).
insert into pgque.batch_returned (batch_id, returned_msg_ids)
values (v_batch_id, v_returned)
on conflict (batch_id) do update
set returned_msg_ids = excluded.returned_msg_ids;

return;
end;
$$ language plpgsql security definer set search_path = pgque, pg_catalog;

-- pgque.ack() -- finishes the batch, advances consumer position
--
-- Fix #134: before finishing the batch, re-queue any events the underlying
-- PgQ batch contained but pgque.receive() did not yield (because of the
-- max_return cap). Re-queue uses the existing pgque.retry_queue path with
-- ev_retry_after = now() so the events are eligible for the next
-- maint_retry_events() cycle, and ev_retry is preserved (these events were
-- never delivered to a handler — they must not count as a retry attempt).
--
-- Backward compatibility: callers that opened the batch via lower-level
-- primitives (next_batch + finish_batch) leave no row in pgque.batch_returned,
-- so ack() falls through to plain finish_batch as before.
create or replace function pgque.ack(i_batch_id bigint)
returns integer as $$
declare
v_returned bigint[];
v_sub_id int4;
v_sub_queue int4;
begin
select returned_msg_ids into v_returned
from pgque.batch_returned
where batch_id = i_batch_id;

if found then
select sub_id, sub_queue into v_sub_id, v_sub_queue
from pgque.subscription
where sub_batch = i_batch_id;

if v_sub_id is not null then
insert into pgque.retry_queue (
ev_retry_after, ev_queue, ev_id, ev_time, ev_txid, ev_owner,
ev_retry, ev_type, ev_data,
ev_extra1, ev_extra2, ev_extra3, ev_extra4)
select now(), v_sub_queue, b.ev_id, b.ev_time, NULL::xid8, v_sub_id,
coalesce(b.ev_retry, 0), b.ev_type, b.ev_data,
b.ev_extra1, b.ev_extra2, b.ev_extra3, b.ev_extra4
from pgque.get_batch_events(i_batch_id) b
where not (b.ev_id = any(coalesce(v_returned, '{}'::bigint[])))
on conflict (ev_owner, ev_id) do nothing;
end if;

delete from pgque.batch_returned where batch_id = i_batch_id;
end if;

return pgque.finish_batch(i_batch_id);
end;
$$ language plpgsql security definer set search_path = pgque, pg_catalog;
Expand Down
Loading
Loading