diff --git a/CLAUDE.md b/CLAUDE.md index fd475ab7..2fd07e52 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -171,6 +171,32 @@ test first, then the implementation that turns it green. the implementation. Applies to pgque-api, observability, client libraries, CLI. Does NOT apply to pgque-core repackaging (PgQ already has tests). See SPECx.md section 13.2. +7. **Separate-transactions rule (snapshot visibility).** PgQue is + snapshot-based. The following operations MUST run in distinct, committed + transactions to be observable downstream — combining them in one tx + silently produces empty batches and dropped messages: + - `pgque.send` / `pgque.insert_event` (producer commit) + → `pgque.ticker` / `pgque.force_tick` (snapshot is taken here) + → `pgque.receive` / `pgque.next_batch` (sees only what committed + before the snapshot). + - `pgque.maint_retry_events` (re-inserts retry rows into event tables + with `pg_current_xact_id()`) → `pgque.ticker` (must run in a later + tx so the new ev_txids are visible in its snapshot) → `pgque.receive`. + - `pgque.maint_rotate_tables_step1` → `pgque.maint_rotate_tables_step2` + (PgQ design requirement; already documented in `sql/pgque-api/maint.sql`). + + In SQL tests, this means one `do $$ ... $$` block per logical step + (see `tests/test_api_receive.sql` for the canonical pattern). In + client code, default-mode `pgxpool` / `pg.Pool` / `psycopg(autocommit)` + already runs each call in its own implicit tx — the rule is + transparently satisfied. The footgun is reaching for `Pool()` / + `client.conn` to wrap send + receive in one explicit tx; the consumer + side won't see what the producer just sent. + + When adding a new test that crosses these boundaries, mark each + transaction boundary in a `-- separate transaction (snapshot + visibility)` comment. When adding new client examples, do not place + send/tick/receive inside a shared `BEGIN`/`COMMIT`. ## Copyright diff --git a/clients/go/integration_test.go b/clients/go/integration_test.go index ac7b77f8..3b74a457 100644 --- a/clients/go/integration_test.go +++ b/clients/go/integration_test.go @@ -153,14 +153,33 @@ func TestNack_ToDLQAtRetryLimit(t *testing.T) { const maxCycles = 5 for i := 0; i < maxCycles; i++ { // Expire any pending retry delays so maint_retry_events picks them up - // immediately; in production these would expire naturally. - if _, err := client.Pool().Exec(ctx, - "update pgque.retry_queue set ev_retry_after = now() - interval '1 second'"); err != nil { + // immediately; in production these would expire naturally. Scope the + // update to this test's queue — pgque.ack() now re-queues unreturned + // batch rows globally (#134), so a global update would race with rows + // from sibling tests. + if _, err := client.Pool().Exec(ctx, ` + update pgque.retry_queue rq + set ev_retry_after = now() - interval '1 second' + from pgque.queue q + where q.queue_id = rq.ev_queue + and q.queue_name = $1`, queue); err != nil { t.Logf("retry_queue update unavailable: %v", err) } - // Re-queue retry_queue rows for redelivery. - if _, err := client.Pool().Exec(ctx, "select pgque.maint_retry_events()"); err != nil { - t.Logf("maint_retry_events unavailable, using ticker fallback: %v", err) + // Re-queue retry_queue rows for redelivery. maint_retry_events() + // processes at most 10 rows per call; loop until drained so that + // past-due rows left behind by sibling tests (which can pile up under + // the global retry_queue now that pgque.ack() re-queues unreturned + // batch events for #134) do not starve this test's row. + for { + var n int + if err := client.Pool().QueryRow(ctx, + "select pgque.maint_retry_events()").Scan(&n); err != nil { + t.Logf("maint_retry_events unavailable, using ticker fallback: %v", err) + break + } + if n == 0 { + break + } } tick(t, client, queue) diff --git a/docs/reference.md b/docs/reference.md index 368645e4..38eff7df 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -126,11 +126,11 @@ Grant: `pgque_reader`. Source: `sql/pgque-api/receive.sql`. select * from pgque.receive('orders', 'processor', 100); ``` -**Batch-ownership caveat.** `max_return` limits the number of rows returned to the caller, but `ack(batch_id)` advances the consumer cursor past the entire underlying batch. If `max_return < ticker_max_count`, calling `ack()` after a partial receive will drop the unreturned rows from the consumer's perspective. Either consume the full batch before acking, or use `max_return >= ticker_max_count` for safe pagination. +**Partial-receive safety (#134).** `max_return` caps the rows returned to the caller. The underlying PgQ batch may contain more rows (the dual-tick window is sized by `ticker_max_count` / `ticker_max_lag`). `pgque.receive()` records which msg_ids it actually yielded, and `pgque.ack()` re-queues the unreturned events to `pgque.retry_queue` (with `ev_retry` preserved — these events were never delivered to a handler) before closing the batch. The next `pgque.maint_retry_events()` cycle moves them back into the main event table for re-delivery. Callers using lower-level primitives (`next_batch` + `finish_batch` directly) are unaffected. #### `pgque.ack(batch_id bigint) → integer` -Closes the batch and advances the consumer position. Modern alias for `pgque.finish_batch`. Returns `1` on success, `0` if the batch was not found. +Closes the batch and advances the consumer position. Returns `1` on success, `0` if the batch was not found. Before calling `pgque.finish_batch`, re-queues any events the batch contained but `pgque.receive()` did not yield (see #134 above). Grant: `pgque_reader`. Source: `sql/pgque-api/receive.sql`. #### `pgque.nack(batch_id bigint, msg pgque.message, retry_after interval default '60 seconds', reason text default null) → integer` diff --git a/sql/pgque-api/receive.sql b/sql/pgque-api/receive.sql index 4484e7e4..7aecf5a9 100644 --- a/sql/pgque-api/receive.sql +++ b/sql/pgque-api/receive.sql @@ -25,12 +25,28 @@ begin end if; end $$; +-- Tracking table for #134: pgque.receive() records the msg_ids it actually +-- yielded so pgque.ack() can re-queue any unreturned events from the +-- underlying PgQ batch instead of silently dropping them. The row is keyed +-- by batch_id and cleared by ack(); finish_batch() callers that bypass +-- pgque.receive() are unaffected (no row → no re-queue, legacy behavior). +create table if not exists pgque.batch_returned ( + batch_id bigint primary key, + returned_msg_ids bigint[] not null default '{}'::bigint[] +); + -- pgque.receive() -- wraps next_batch + get_batch_events +-- +-- Fix #134: records returned msg_ids in pgque.batch_returned so ack() can +-- re-queue events the underlying PgQ batch contained but max_return clipped. +-- Without this, ack(batch_id) → finish_batch advances sub_last_tick past the +-- whole tick window and the unreturned events become unreachable. create or replace function pgque.receive( i_queue text, i_consumer text, i_max_return int default 100) returns setof pgque.message as $$ declare v_batch_id bigint; + v_returned bigint[] := '{}'::bigint[]; ev record; cnt int := 0; begin @@ -55,6 +71,7 @@ begin ev.ev_retry, ev.ev_time, ev.ev_extra1, ev.ev_extra2, ev.ev_extra3, ev.ev_extra4 )::pgque.message; + v_returned := v_returned || ev.ev_id; cnt := cnt + 1; exit when cnt >= i_max_return; end loop; @@ -62,16 +79,66 @@ begin -- Empty batch: finish immediately to advance the consumer cursor. if cnt = 0 then perform pgque.finish_batch(v_batch_id); + return; end if; + -- Record which msg_ids the caller actually saw so ack() can re-queue + -- the rest. Upsert guards against a re-open of the same batch within + -- the active subscription (next_batch returns the existing batch_id + -- if one is already active; the latest receive() wins). + insert into pgque.batch_returned (batch_id, returned_msg_ids) + values (v_batch_id, v_returned) + on conflict (batch_id) do update + set returned_msg_ids = excluded.returned_msg_ids; + return; end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; -- pgque.ack() -- finishes the batch, advances consumer position +-- +-- Fix #134: before finishing the batch, re-queue any events the underlying +-- PgQ batch contained but pgque.receive() did not yield (because of the +-- max_return cap). Re-queue uses the existing pgque.retry_queue path with +-- ev_retry_after = now() so the events are eligible for the next +-- maint_retry_events() cycle, and ev_retry is preserved (these events were +-- never delivered to a handler — they must not count as a retry attempt). +-- +-- Backward compatibility: callers that opened the batch via lower-level +-- primitives (next_batch + finish_batch) leave no row in pgque.batch_returned, +-- so ack() falls through to plain finish_batch as before. create or replace function pgque.ack(i_batch_id bigint) returns integer as $$ +declare + v_returned bigint[]; + v_sub_id int4; + v_sub_queue int4; begin + select returned_msg_ids into v_returned + from pgque.batch_returned + where batch_id = i_batch_id; + + if found then + select sub_id, sub_queue into v_sub_id, v_sub_queue + from pgque.subscription + where sub_batch = i_batch_id; + + if v_sub_id is not null then + insert into pgque.retry_queue ( + ev_retry_after, ev_queue, ev_id, ev_time, ev_txid, ev_owner, + ev_retry, ev_type, ev_data, + ev_extra1, ev_extra2, ev_extra3, ev_extra4) + select now(), v_sub_queue, b.ev_id, b.ev_time, NULL::xid8, v_sub_id, + coalesce(b.ev_retry, 0), b.ev_type, b.ev_data, + b.ev_extra1, b.ev_extra2, b.ev_extra3, b.ev_extra4 + from pgque.get_batch_events(i_batch_id) b + where not (b.ev_id = any(coalesce(v_returned, '{}'::bigint[]))) + on conflict (ev_owner, ev_id) do nothing; + end if; + + delete from pgque.batch_returned where batch_id = i_batch_id; + end if; + return pgque.finish_batch(i_batch_id); end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; diff --git a/sql/pgque-tle.sql b/sql/pgque-tle.sql index e40c8420..7395d803 100644 --- a/sql/pgque-tle.sql +++ b/sql/pgque-tle.sql @@ -5186,12 +5186,28 @@ begin end if; end $$; +-- Tracking table for #134: pgque.receive() records the msg_ids it actually +-- yielded so pgque.ack() can re-queue any unreturned events from the +-- underlying PgQ batch instead of silently dropping them. The row is keyed +-- by batch_id and cleared by ack(); finish_batch() callers that bypass +-- pgque.receive() are unaffected (no row → no re-queue, legacy behavior). +create table if not exists pgque.batch_returned ( + batch_id bigint primary key, + returned_msg_ids bigint[] not null default '{}'::bigint[] +); + -- pgque.receive() -- wraps next_batch + get_batch_events +-- +-- Fix #134: records returned msg_ids in pgque.batch_returned so ack() can +-- re-queue events the underlying PgQ batch contained but max_return clipped. +-- Without this, ack(batch_id) → finish_batch advances sub_last_tick past the +-- whole tick window and the unreturned events become unreachable. create or replace function pgque.receive( i_queue text, i_consumer text, i_max_return int default 100) returns setof pgque.message as $$ declare v_batch_id bigint; + v_returned bigint[] := '{}'::bigint[]; ev record; cnt int := 0; begin @@ -5216,6 +5232,7 @@ begin ev.ev_retry, ev.ev_time, ev.ev_extra1, ev.ev_extra2, ev.ev_extra3, ev.ev_extra4 )::pgque.message; + v_returned := v_returned || ev.ev_id; cnt := cnt + 1; exit when cnt >= i_max_return; end loop; @@ -5223,16 +5240,66 @@ begin -- Empty batch: finish immediately to advance the consumer cursor. if cnt = 0 then perform pgque.finish_batch(v_batch_id); + return; end if; + -- Record which msg_ids the caller actually saw so ack() can re-queue + -- the rest. Upsert guards against a re-open of the same batch within + -- the active subscription (next_batch returns the existing batch_id + -- if one is already active; the latest receive() wins). + insert into pgque.batch_returned (batch_id, returned_msg_ids) + values (v_batch_id, v_returned) + on conflict (batch_id) do update + set returned_msg_ids = excluded.returned_msg_ids; + return; end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; -- pgque.ack() -- finishes the batch, advances consumer position +-- +-- Fix #134: before finishing the batch, re-queue any events the underlying +-- PgQ batch contained but pgque.receive() did not yield (because of the +-- max_return cap). Re-queue uses the existing pgque.retry_queue path with +-- ev_retry_after = now() so the events are eligible for the next +-- maint_retry_events() cycle, and ev_retry is preserved (these events were +-- never delivered to a handler — they must not count as a retry attempt). +-- +-- Backward compatibility: callers that opened the batch via lower-level +-- primitives (next_batch + finish_batch) leave no row in pgque.batch_returned, +-- so ack() falls through to plain finish_batch as before. create or replace function pgque.ack(i_batch_id bigint) returns integer as $$ +declare + v_returned bigint[]; + v_sub_id int4; + v_sub_queue int4; begin + select returned_msg_ids into v_returned + from pgque.batch_returned + where batch_id = i_batch_id; + + if found then + select sub_id, sub_queue into v_sub_id, v_sub_queue + from pgque.subscription + where sub_batch = i_batch_id; + + if v_sub_id is not null then + insert into pgque.retry_queue ( + ev_retry_after, ev_queue, ev_id, ev_time, ev_txid, ev_owner, + ev_retry, ev_type, ev_data, + ev_extra1, ev_extra2, ev_extra3, ev_extra4) + select now(), v_sub_queue, b.ev_id, b.ev_time, NULL::xid8, v_sub_id, + coalesce(b.ev_retry, 0), b.ev_type, b.ev_data, + b.ev_extra1, b.ev_extra2, b.ev_extra3, b.ev_extra4 + from pgque.get_batch_events(i_batch_id) b + where not (b.ev_id = any(coalesce(v_returned, '{}'::bigint[]))) + on conflict (ev_owner, ev_id) do nothing; + end if; + + delete from pgque.batch_returned where batch_id = i_batch_id; + end if; + return pgque.finish_batch(i_batch_id); end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; diff --git a/sql/pgque.sql b/sql/pgque.sql index ce8ddb48..824790df 100644 --- a/sql/pgque.sql +++ b/sql/pgque.sql @@ -5098,12 +5098,28 @@ begin end if; end $$; +-- Tracking table for #134: pgque.receive() records the msg_ids it actually +-- yielded so pgque.ack() can re-queue any unreturned events from the +-- underlying PgQ batch instead of silently dropping them. The row is keyed +-- by batch_id and cleared by ack(); finish_batch() callers that bypass +-- pgque.receive() are unaffected (no row → no re-queue, legacy behavior). +create table if not exists pgque.batch_returned ( + batch_id bigint primary key, + returned_msg_ids bigint[] not null default '{}'::bigint[] +); + -- pgque.receive() -- wraps next_batch + get_batch_events +-- +-- Fix #134: records returned msg_ids in pgque.batch_returned so ack() can +-- re-queue events the underlying PgQ batch contained but max_return clipped. +-- Without this, ack(batch_id) → finish_batch advances sub_last_tick past the +-- whole tick window and the unreturned events become unreachable. create or replace function pgque.receive( i_queue text, i_consumer text, i_max_return int default 100) returns setof pgque.message as $$ declare v_batch_id bigint; + v_returned bigint[] := '{}'::bigint[]; ev record; cnt int := 0; begin @@ -5128,6 +5144,7 @@ begin ev.ev_retry, ev.ev_time, ev.ev_extra1, ev.ev_extra2, ev.ev_extra3, ev.ev_extra4 )::pgque.message; + v_returned := v_returned || ev.ev_id; cnt := cnt + 1; exit when cnt >= i_max_return; end loop; @@ -5135,16 +5152,66 @@ begin -- Empty batch: finish immediately to advance the consumer cursor. if cnt = 0 then perform pgque.finish_batch(v_batch_id); + return; end if; + -- Record which msg_ids the caller actually saw so ack() can re-queue + -- the rest. Upsert guards against a re-open of the same batch within + -- the active subscription (next_batch returns the existing batch_id + -- if one is already active; the latest receive() wins). + insert into pgque.batch_returned (batch_id, returned_msg_ids) + values (v_batch_id, v_returned) + on conflict (batch_id) do update + set returned_msg_ids = excluded.returned_msg_ids; + return; end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; -- pgque.ack() -- finishes the batch, advances consumer position +-- +-- Fix #134: before finishing the batch, re-queue any events the underlying +-- PgQ batch contained but pgque.receive() did not yield (because of the +-- max_return cap). Re-queue uses the existing pgque.retry_queue path with +-- ev_retry_after = now() so the events are eligible for the next +-- maint_retry_events() cycle, and ev_retry is preserved (these events were +-- never delivered to a handler — they must not count as a retry attempt). +-- +-- Backward compatibility: callers that opened the batch via lower-level +-- primitives (next_batch + finish_batch) leave no row in pgque.batch_returned, +-- so ack() falls through to plain finish_batch as before. create or replace function pgque.ack(i_batch_id bigint) returns integer as $$ +declare + v_returned bigint[]; + v_sub_id int4; + v_sub_queue int4; begin + select returned_msg_ids into v_returned + from pgque.batch_returned + where batch_id = i_batch_id; + + if found then + select sub_id, sub_queue into v_sub_id, v_sub_queue + from pgque.subscription + where sub_batch = i_batch_id; + + if v_sub_id is not null then + insert into pgque.retry_queue ( + ev_retry_after, ev_queue, ev_id, ev_time, ev_txid, ev_owner, + ev_retry, ev_type, ev_data, + ev_extra1, ev_extra2, ev_extra3, ev_extra4) + select now(), v_sub_queue, b.ev_id, b.ev_time, NULL::xid8, v_sub_id, + coalesce(b.ev_retry, 0), b.ev_type, b.ev_data, + b.ev_extra1, b.ev_extra2, b.ev_extra3, b.ev_extra4 + from pgque.get_batch_events(i_batch_id) b + where not (b.ev_id = any(coalesce(v_returned, '{}'::bigint[]))) + on conflict (ev_owner, ev_id) do nothing; + end if; + + delete from pgque.batch_returned where batch_id = i_batch_id; + end if; + return pgque.finish_batch(i_batch_id); end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; diff --git a/tests/run_all.sql b/tests/run_all.sql index 7d25a50a..f683e8bd 100644 --- a/tests/run_all.sql +++ b/tests/run_all.sql @@ -91,6 +91,9 @@ \echo 'Running: test_receive_empty_batch' \i tests/test_receive_empty_batch.sql +\echo 'Running: test_receive_partial_no_skip' +\i tests/test_receive_partial_no_skip.sql + \echo 'Running: test_force_next_tick_alias' \i tests/test_force_next_tick_alias.sql diff --git a/tests/test_api_receive.sql b/tests/test_api_receive.sql index 3beb0598..ce7043aa 100644 --- a/tests/test_api_receive.sql +++ b/tests/test_api_receive.sql @@ -58,7 +58,12 @@ begin assert v_count = 0, 'should have no more messages after ack'; end $$; --- Step 6: partial receive still acks the whole underlying batch +-- Step 6: partial receive must NOT silently drop unreturned batch rows (#134). +-- Pre-fix behavior: ack(batch_id) advanced sub_last_tick past the whole tick +-- window, so events the caller never saw became unreachable. The fix is in +-- pgque.ack(), which now re-queues unreturned events to pgque.retry_queue +-- before calling finish_batch. See tests/test_receive_partial_no_skip.sql for +-- the full red/green regression coverage. do $$ begin perform pgque.create_queue('test_recv_partial'); @@ -96,18 +101,30 @@ begin perform pgque.ack(v_batch_id); end $$; +-- Push the re-queued rows back into the main event table. +do $$ begin perform pgque.maint_retry_events(); end $$; + +do $$ begin + perform pgque.force_tick('test_recv_partial'); + perform pgque.ticker(); +end $$; + do $$ declare v_msg pgque.message; v_count int := 0; + v_batch_id bigint; begin for v_msg in select * from pgque.receive('test_recv_partial', 'c1', 10) loop v_count := v_count + 1; + v_batch_id := v_msg.batch_id; end loop; - assert v_count = 0, - 'ack(batch_id) should finish the whole batch, even if receive(..., 1) returned one row'; + assert v_count = 2, + format('the 2 unreturned events MUST be re-delivered after ack (#134), got %s', v_count); + + perform pgque.ack(v_batch_id); end $$; -- Step 7: send(text) fast path must store payload byte-for-byte diff --git a/tests/test_receive_partial_no_skip.sql b/tests/test_receive_partial_no_skip.sql new file mode 100644 index 00000000..273ee3fe --- /dev/null +++ b/tests/test_receive_partial_no_skip.sql @@ -0,0 +1,132 @@ +\set ON_ERROR_STOP on + +-- Regression test for #134: pgque.receive(..., max_return) followed by +-- pgque.ack(batch_id) must NOT silently drop the events that the underlying +-- PgQ batch contained but receive() did not yield. +-- +-- Root cause: pgque.ack() called pgque.finish_batch() unconditionally, which +-- advances sub_last_tick past the entire tick window. Any events in that +-- window that the caller never saw became unreachable. +-- +-- Fix contract: pgque.receive() records which msg_ids it actually returned, +-- and pgque.ack() re-queues the unreturned events to pgque.retry_queue with +-- ev_retry preserved (these events were never delivered to a handler) so +-- they are eligible for the next receive() call after maint_retry_events(). + +-- Step 1: setup +do $$ +begin + perform pgque.create_queue('t134_partial'); + perform pgque.register_consumer('t134_partial', 'c'); +end $$; + +-- Step 2: insert 105 events and tick (separate transactions) +do $$ +begin + perform pgque.send('t134_partial', 'tt', jsonb_build_object('i', g)) + from generate_series(1, 105) g; +end $$; + +do $$ +begin + perform pgque.force_tick('t134_partial'); + perform pgque.ticker('t134_partial'); +end $$; + +-- Step 3: receive 100 of the 105 and ack the batch. +-- Before the fix, this silently dropped the remaining 5. +do $$ +declare + v_msg pgque.message; + v_count int := 0; + v_batch_id bigint; + v_max_id bigint := 0; +begin + for v_msg in select * from pgque.receive('t134_partial', 'c', 100) + loop + v_count := v_count + 1; + v_batch_id := v_msg.batch_id; + if v_msg.msg_id > v_max_id then + v_max_id := v_msg.msg_id; + end if; + end loop; + + assert v_count = 100, + format('first receive should return 100 rows, got %s', v_count); + assert v_batch_id is not null, 'batch_id should be set'; + + perform pgque.ack(v_batch_id); +end $$; + +-- Step 4: pump the retry path so re-queued rows get re-inserted into the +-- main event table, then create a tick that covers them. Each DO block is +-- a separate transaction — required by PgQ snapshot-visibility semantics +-- (events inserted by maint_retry_events must commit before the ticker +-- snapshot is taken, otherwise the next batch can't see them). +do $$ begin + perform pgque.maint_retry_events(); +end $$; + +do $$ begin + perform pgque.force_tick('t134_partial'); + perform pgque.ticker('t134_partial'); +end $$; + +-- Step 5: the 5 unreturned events MUST be visible to the next receive() call. +do $$ +declare + v_msg pgque.message; + v_count int := 0; + v_batch_id bigint; + v_seen_ids bigint[] := '{}'::bigint[]; +begin + for v_msg in select * from pgque.receive('t134_partial', 'c', 100) + loop + v_count := v_count + 1; + v_batch_id := v_msg.batch_id; + v_seen_ids := v_seen_ids || v_msg.msg_id; + end loop; + + assert v_count = 5, + format('after ack, the 5 unreturned events MUST be re-delivered, got %s (issue #134)', v_count); + + -- The unreturned events were msg_ids 101..105 (last 5 of the 105 sent). + assert v_seen_ids @> array[101::bigint, 102, 103, 104, 105], + format('expected msg_ids 101..105 to be re-delivered, got %s', v_seen_ids::text); + + perform pgque.ack(v_batch_id); +end $$; + +-- Step 6: confirm the second ack closes the queue cleanly (no leftovers). +do $$ begin + perform pgque.maint_retry_events(); +end $$; + +do $$ begin + perform pgque.force_tick('t134_partial'); + perform pgque.ticker('t134_partial'); +end $$; + +do $$ +declare + v_msg pgque.message; + v_count int := 0; +begin + for v_msg in select * from pgque.receive('t134_partial', 'c', 100) + loop + v_count := v_count + 1; + perform pgque.ack(v_msg.batch_id); + exit; + end loop; + + assert v_count = 0, + format('after both batches acked, queue should be drained, got %s leftover', v_count); +end $$; + +-- Cleanup +do $$ +begin + perform pgque.unregister_consumer('t134_partial', 'c'); + perform pgque.drop_queue('t134_partial'); + raise notice 'PASS: receive/ack does not skip unreturned batch rows (#134)'; +end $$;