From 84d02c1c1e9a4566789df2c840f16ebf9a86b576 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Jun 2026 13:32:03 +0000 Subject: [PATCH] fix: lock dead_letter rows in dlq_replay Two concurrent pgque.dlq_replay() calls for the same dl_id could both pass the unlocked existence select, both call insert_event(), and re-enqueue the dead-lettered event twice; the second delete then silently removed 0 rows. pgque.dlq_replay_all() had the same unlocked-select shape. dlq_replay() now locks the dead_letter row with 'for update of dl': the second caller blocks, re-evaluates after the first commits, finds no row, and raises the existing 'dead letter entry not found' error. dlq_replay_all() uses 'for update of dl skip locked' so a bulk replay skips rows already being replayed by a concurrent session instead of blocking or double-replaying them. Adds tests/two_session_dlq_replay_race.sh, a deterministic two-session harness that fails on the unfixed code (event enqueued twice) and passes with the row lock (one event, clean error for the loser). Verification: bash build/transform.sh psql -d -v ON_ERROR_STOP=1 -f sql/pgque.sql PGQUE_TEST_DSN=postgresql:/// tests/two_session_dlq_replay_race.sh psql -d -v ON_ERROR_STOP=1 -f tests/run_all.sql Addresses finding A3 of #283. https://claude.ai/code/session_01KAaEGkQZmey1D1xCsVGmqv --- sql/pgque-additions/dlq.sql | 17 ++- sql/pgque-tle.sql | 17 ++- sql/pgque.sql | 17 ++- tests/two_session_dlq_replay_race.sh | 177 +++++++++++++++++++++++++++ 4 files changed, 225 insertions(+), 3 deletions(-) create mode 100755 tests/two_session_dlq_replay_race.sh diff --git a/sql/pgque-additions/dlq.sql b/sql/pgque-additions/dlq.sql index e16eeb53..69909dfd 100644 --- a/sql/pgque-additions/dlq.sql +++ b/sql/pgque-additions/dlq.sql @@ -119,6 +119,12 @@ end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; -- pgque.dlq_replay() -- replay a single dead letter event back into the queue +-- +-- The initial select locks the dead_letter row (`for update of dl`) so that +-- two concurrent dlq_replay() calls for the same dl_id cannot both pass the +-- existence check and re-enqueue the event twice. The second caller blocks on +-- the row lock; after the first commits its delete, the second's select +-- re-evaluates, finds no row, and raises 'dead letter entry not found'. create or replace function pgque.dlq_replay(i_dead_letter_id bigint) returns bigint as $$ declare @@ -129,7 +135,8 @@ begin select dl.*, q.queue_name into v_dl from pgque.dead_letter dl join pgque.queue q on q.queue_id = dl.dl_queue_id - where dl.dl_id = i_dead_letter_id; + where dl.dl_id = i_dead_letter_id + for update of dl; if not found then raise exception 'dead letter entry not found: %', i_dead_letter_id; @@ -154,6 +161,13 @@ $$ language plpgsql security definer set search_path = pgque, pg_catalog; -- which is hidden under many production configs). Callers can check -- failed > 0 to detect partial success programmatically. -- +-- The loop's select locks each dead_letter row (`for update of dl skip +-- locked`) before replaying it. `skip locked` (rather than blocking) fits the +-- replay-everything semantics: a row locked by a concurrent dlq_replay() or +-- dlq_replay_all() is already being handled by that session, so this call +-- skips it instead of waiting only to replay it twice (the pre-lock race) or +-- to count a guaranteed failure. +-- -- Return-type change from v0.1's bare integer count to a record is a breaking -- API change accepted at the v0.2 cut. Callers previously doing -- select pgque.dlq_replay_all('q') -- returned int @@ -180,6 +194,7 @@ begin from pgque.dead_letter dl join pgque.queue q on q.queue_id = dl.dl_queue_id where q.queue_name = i_queue_name + for update of dl skip locked loop begin perform pgque.insert_event(v_dl.queue_name, v_dl.ev_type, v_dl.ev_data, diff --git a/sql/pgque-tle.sql b/sql/pgque-tle.sql index 2601a24f..16f5423e 100644 --- a/sql/pgque-tle.sql +++ b/sql/pgque-tle.sql @@ -5067,6 +5067,12 @@ end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; -- pgque.dlq_replay() -- replay a single dead letter event back into the queue +-- +-- The initial select locks the dead_letter row (`for update of dl`) so that +-- two concurrent dlq_replay() calls for the same dl_id cannot both pass the +-- existence check and re-enqueue the event twice. The second caller blocks on +-- the row lock; after the first commits its delete, the second's select +-- re-evaluates, finds no row, and raises 'dead letter entry not found'. create or replace function pgque.dlq_replay(i_dead_letter_id bigint) returns bigint as $$ declare @@ -5077,7 +5083,8 @@ begin select dl.*, q.queue_name into v_dl from pgque.dead_letter dl join pgque.queue q on q.queue_id = dl.dl_queue_id - where dl.dl_id = i_dead_letter_id; + where dl.dl_id = i_dead_letter_id + for update of dl; if not found then raise exception 'dead letter entry not found: %', i_dead_letter_id; @@ -5102,6 +5109,13 @@ $$ language plpgsql security definer set search_path = pgque, pg_catalog; -- which is hidden under many production configs). Callers can check -- failed > 0 to detect partial success programmatically. -- +-- The loop's select locks each dead_letter row (`for update of dl skip +-- locked`) before replaying it. `skip locked` (rather than blocking) fits the +-- replay-everything semantics: a row locked by a concurrent dlq_replay() or +-- dlq_replay_all() is already being handled by that session, so this call +-- skips it instead of waiting only to replay it twice (the pre-lock race) or +-- to count a guaranteed failure. +-- -- Return-type change from v0.1's bare integer count to a record is a breaking -- API change accepted at the v0.2 cut. Callers previously doing -- select pgque.dlq_replay_all('q') -- returned int @@ -5128,6 +5142,7 @@ begin from pgque.dead_letter dl join pgque.queue q on q.queue_id = dl.dl_queue_id where q.queue_name = i_queue_name + for update of dl skip locked loop begin perform pgque.insert_event(v_dl.queue_name, v_dl.ev_type, v_dl.ev_data, diff --git a/sql/pgque.sql b/sql/pgque.sql index e3ac01a7..3b3d26d3 100644 --- a/sql/pgque.sql +++ b/sql/pgque.sql @@ -4979,6 +4979,12 @@ end; $$ language plpgsql security definer set search_path = pgque, pg_catalog; -- pgque.dlq_replay() -- replay a single dead letter event back into the queue +-- +-- The initial select locks the dead_letter row (`for update of dl`) so that +-- two concurrent dlq_replay() calls for the same dl_id cannot both pass the +-- existence check and re-enqueue the event twice. The second caller blocks on +-- the row lock; after the first commits its delete, the second's select +-- re-evaluates, finds no row, and raises 'dead letter entry not found'. create or replace function pgque.dlq_replay(i_dead_letter_id bigint) returns bigint as $$ declare @@ -4989,7 +4995,8 @@ begin select dl.*, q.queue_name into v_dl from pgque.dead_letter dl join pgque.queue q on q.queue_id = dl.dl_queue_id - where dl.dl_id = i_dead_letter_id; + where dl.dl_id = i_dead_letter_id + for update of dl; if not found then raise exception 'dead letter entry not found: %', i_dead_letter_id; @@ -5014,6 +5021,13 @@ $$ language plpgsql security definer set search_path = pgque, pg_catalog; -- which is hidden under many production configs). Callers can check -- failed > 0 to detect partial success programmatically. -- +-- The loop's select locks each dead_letter row (`for update of dl skip +-- locked`) before replaying it. `skip locked` (rather than blocking) fits the +-- replay-everything semantics: a row locked by a concurrent dlq_replay() or +-- dlq_replay_all() is already being handled by that session, so this call +-- skips it instead of waiting only to replay it twice (the pre-lock race) or +-- to count a guaranteed failure. +-- -- Return-type change from v0.1's bare integer count to a record is a breaking -- API change accepted at the v0.2 cut. Callers previously doing -- select pgque.dlq_replay_all('q') -- returned int @@ -5040,6 +5054,7 @@ begin from pgque.dead_letter dl join pgque.queue q on q.queue_id = dl.dl_queue_id where q.queue_name = i_queue_name + for update of dl skip locked loop begin perform pgque.insert_event(v_dl.queue_name, v_dl.ev_type, v_dl.ev_data, diff --git a/tests/two_session_dlq_replay_race.sh b/tests/two_session_dlq_replay_race.sh new file mode 100755 index 00000000..eed4005c --- /dev/null +++ b/tests/two_session_dlq_replay_race.sh @@ -0,0 +1,177 @@ +#!/usr/bin/env bash +# Validate dlq_replay() serialization with two real sessions. +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. +# Includes code derived from PgQ (ISC license, Marko Kreen / Skype Technologies OU). +set -Eeuo pipefail + +# Usage: +# PGQUE_TEST_DSN=postgresql://postgres:***@localhost/pgque_test \ +# tests/two_session_dlq_replay_race.sh +# +# The target database must already have sql/pgque.sql installed. The harness +# dead-letters one event, then has session 1 call pgque.dlq_replay(dl_id) +# inside an open transaction (commit delayed by pg_sleep) while session 2 +# calls pgque.dlq_replay(dl_id) for the same id concurrently. With the +# row lock in dlq_replay, session 2 must block behind session 1's locked +# dead_letter row, re-evaluate after commit, find the row gone, and raise +# 'dead letter entry not found'. Pre-fix code fails this harness: session 2's +# unlocked select still sees the row, so both sessions call insert_event and +# the event is re-enqueued twice. + +if [[ -z "${PGQUE_TEST_DSN:-}" ]]; then + echo "PGQUE_TEST_DSN is required" >&2 + exit 2 +fi + +psql_base=(psql --no-psqlrc -v ON_ERROR_STOP=1 "${PGQUE_TEST_DSN}") +queue_name="two_session_dlq_replay_${$}_$(date +%s)" +session1_app="pgque_dlq_replay_s1_${$}_$(date +%s)" +hold_seconds=4 +workdir="$(mktemp -d)" +cleanup() { + "${psql_base[@]}" -qAtc " + select pgque.unregister_consumer('${queue_name}', 'c1'); + select pgque.drop_queue('${queue_name}', true); + " >/dev/null 2>&1 || true + rm -rf "${workdir}" +} +trap cleanup EXIT + +cat >"${workdir}/setup.sql" <"${workdir}/dead_letter.sql" </dev/null +"${psql_base[@]}" -f "${workdir}/dead_letter.sql" >/dev/null + +dl_id=$("${psql_base[@]}" -qAtc " + select dl.dl_id + from pgque.dead_letter dl + join pgque.queue q on q.queue_id = dl.dl_queue_id + where q.queue_name = '${queue_name}' +") +if [[ -z "${dl_id}" ]]; then + echo "FAIL: no dead_letter row created during setup" >&2 + exit 1 +fi + +cat >"${workdir}/session1.sql" <&2 + print_debug + exit 1 +fi + +set +e +"${psql_base[@]}" -f "${workdir}/session2.sql" >"${workdir}/session2.out" 2>"${workdir}/session2.err" +session2_status=$? +wait "${session1_pid}" +session1_status=$? +set -e + +if (( session1_status != 0 )); then + echo "FAIL: session1 replay failed unexpectedly" >&2 + print_debug + exit 1 +fi + +if (( session2_status == 0 )); then + echo "FAIL: session2 replay succeeded; expected 'dead letter entry not found' after waiting on session1" >&2 + print_debug + exit 1 +fi +if ! grep -q "dead letter entry not found: ${dl_id}" "${workdir}/session2.err"; then + echo "FAIL: session2 failed with an unexpected error" >&2 + print_debug + exit 1 +fi + +# The event must be re-enqueued exactly once (pre-fix code enqueues it twice). +"${psql_base[@]}" -qAtc " + select pgque.force_tick('${queue_name}'); + select pgque.ticker(); +" >/dev/null +replayed_count=$("${psql_base[@]}" -qAtc " + select count(*) + from pgque.receive('${queue_name}', 'c1', 100) + where type = 'dlq.race' +") +if [[ "${replayed_count}" != "1" ]]; then + echo "FAIL: expected exactly 1 replayed event, got ${replayed_count}" >&2 + print_debug + exit 1 +fi + +dlq_count=$("${psql_base[@]}" -qAtc " + select count(*) + from pgque.dead_letter dl + join pgque.queue q on q.queue_id = dl.dl_queue_id + where q.queue_name = '${queue_name}' +") +if [[ "${dlq_count}" != "0" ]]; then + echo "FAIL: expected empty DLQ after replay, got ${dlq_count} rows" >&2 + print_debug + exit 1 +fi + +echo "PASS: concurrent dlq_replay serialized; second caller got 'dead letter entry not found' and the event was re-enqueued exactly once"