From 70110de43f6221f31bb39842416e14e647027d91 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Jun 2026 13:34:09 +0000 Subject: [PATCH 1/2] fix(ts): back off on nack/ack failure in consumer The Consumer.start() loop slept pollInterval only on receive error or empty batch. When a batch was received but finalization failed (a nack failed so ack was skipped, or ack threw), the loop re-polled instantly; since the batch was never finished, pgque.next_batch returned the same batch immediately. A persistent nack/ack failure (e.g. partial grants where receive works but nack/ack do not) therefore hot-looped at full speed, re-running every handler with duplicate side effects, and even a single transient ack failure re-executed the batch with zero delay. Sleep pollIntervalMs (abort-aware) before re-polling on both paths. Ack returning 0 without throwing stays warning-only with no sleep. Red/green: the two new mock-based tests hot-loop so hard unfixed that the vitest worker dies of OOM recording mock calls; green after fix. Addresses finding B1 (TypeScript) of #283. https://claude.ai/code/session_01KAaEGkQZmey1D1xCsVGmqv --- clients/typescript/src/consumer.ts | 10 ++- clients/typescript/test/consumer.test.ts | 98 ++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 2 deletions(-) diff --git a/clients/typescript/src/consumer.ts b/clients/typescript/src/consumer.ts index fee27aaa..6b6a4605 100644 --- a/clients/typescript/src/consumer.ts +++ b/clients/typescript/src/consumer.ts @@ -131,11 +131,14 @@ export class Consumer { if (batchId !== null) { if (anyNackFailed) { // At least one required nack failed. Skip ack so PgQ redelivers - // the batch on the next poll instead of advancing the consumer - // past messages we couldn't route. + // the batch instead of advancing the consumer past messages we + // couldn't route. The batch is unfinished, so `next_batch` would + // return it again immediately — sleep one poll interval before + // re-polling to avoid a hot loop that re-runs every handler. this.logger.error( `pgque: skipping ack for batch ${batchId}; one or more nacks failed and the batch will be redelivered`, ); + await sleep(this.pollIntervalMs, signal); } else { try { const n = await this.client.ack(batchId); @@ -145,7 +148,10 @@ export class Consumer { ); } } catch (err) { + // Unfinished batch: same redelivery situation as the failed-nack + // path above, so back off one poll interval before re-polling. this.logger.error(`pgque: ack error: ${formatErr(err)}`); + await sleep(this.pollIntervalMs, signal); } } } diff --git a/clients/typescript/test/consumer.test.ts b/clients/typescript/test/consumer.test.ts index 294bb98f..0e687cc4 100644 --- a/clients/typescript/test/consumer.test.ts +++ b/clients/typescript/test/consumer.test.ts @@ -456,6 +456,104 @@ describe('Consumer (in-memory mocks)', () => { ).toThrow(/retryAfter/); }); + it('sleeps pollInterval before re-polling after a nack failure', async () => { + const msg: Message = { + msgId: 4n, + batchId: 102n, + type: 'will_fail', + payload: '{}', + retryCount: null, + createdAt: new Date(), + extra1: null, + extra2: null, + extra3: null, + extra4: null, + }; + + // receive always returns the same batch (PgQ redelivers an unfinished + // batch instantly) and nack always fails, so an unfixed loop re-polls + // at full speed. With a 60s pollInterval, a backoff-respecting loop + // must call receive exactly once within the observation window. + const fakeClient = { + receive: vi.fn(async () => [msg]), + ack: vi.fn(async () => undefined), + nack: vi.fn(async () => { + throw new Error('synthetic persistent nack failure'); + }), + }; + + const consumer = new Consumer(fakeClient as unknown as Client, 'q', 'c', { + pollInterval: 60_000, + logger: { warn: () => undefined, error: () => undefined }, + }); + consumer.handle('will_fail', async () => { + throw new Error('handler boom'); + }); + + const ac = new AbortController(); + const startPromise = consumer.start(ac.signal); + const deadline = Date.now() + 4000; + while (Date.now() < deadline && fakeClient.nack.mock.calls.length === 0) { + await sleep(10); + } + // Give an unfixed (hot-looping) consumer time to re-poll. + await sleep(300); + ac.abort(); + await startPromise; + + expect(fakeClient.receive).toHaveBeenCalledTimes(1); + expect(fakeClient.nack).toHaveBeenCalledTimes(1); + expect(fakeClient.ack).toHaveBeenCalledTimes(0); + }); + + it('sleeps pollInterval before re-polling after an ack error', async () => { + const msg: Message = { + msgId: 5n, + batchId: 103n, + type: 'fine', + payload: '{}', + retryCount: null, + createdAt: new Date(), + extra1: null, + extra2: null, + extra3: null, + extra4: null, + }; + + let handlerCalls = 0; + const fakeClient = { + receive: vi.fn(async () => [msg]), + ack: vi.fn(async () => { + throw new Error('synthetic ack failure'); + }), + nack: vi.fn(async () => undefined), + }; + + const consumer = new Consumer(fakeClient as unknown as Client, 'q', 'c', { + pollInterval: 60_000, + logger: { warn: () => undefined, error: () => undefined }, + }); + consumer.handle('fine', async () => { + handlerCalls += 1; + }); + + const ac = new AbortController(); + const startPromise = consumer.start(ac.signal); + const deadline = Date.now() + 4000; + while (Date.now() < deadline && fakeClient.ack.mock.calls.length === 0) { + await sleep(10); + } + // Give an unfixed (hot-looping) consumer time to re-poll and re-run + // the handler with duplicate side effects. + await sleep(300); + ac.abort(); + await startPromise; + + expect(fakeClient.receive).toHaveBeenCalledTimes(1); + expect(fakeClient.ack).toHaveBeenCalledTimes(1); + expect(handlerCalls).toBe(1); + }); + it('does not call ack when nack fails for an unknown event type', async () => { const msg: Message = { msgId: 2n, From 88600bb0875e5c2bc09bef0a2b08ecb1b3d5aa51 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Jun 2026 13:35:21 +0000 Subject: [PATCH 2/2] chore(ts): quote table identifier in bench query producer_bench.ts spliced the table name returned by pgque.current_event_table() into 'select count(*) from ...' via a raw template literal -- the only non-parameterized query in the repo. The value is self-generated and the script is dev-only, so this is not exploitable, but quote it properly anyway with pg's escapeIdentifier, part by part for the schema-qualified name. Also fix the result generic: the pgque pool parses int8 (OID 20) to BigInt, so count(*) arrives as bigint, not string; the old code only worked because Number.parseInt coerces its argument. Verified: bun run check, plus a live run against local PG 16 (PGQUE_BENCH_REPEATS=1 bun src/producer_bench.ts) where verifyCount passes for all batch sizes. Addresses the producer_bench informational note of #283. https://claude.ai/code/session_01KAaEGkQZmey1D1xCsVGmqv --- clients/typescript/src/producer_bench.ts | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/clients/typescript/src/producer_bench.ts b/clients/typescript/src/producer_bench.ts index f97acbad..5913039a 100644 --- a/clients/typescript/src/producer_bench.ts +++ b/clients/typescript/src/producer_bench.ts @@ -6,8 +6,11 @@ import { randomBytes } from 'node:crypto'; import { performance } from 'node:perf_hooks'; +import pg from 'pg'; import { connect, type Client } from './client.js'; +const { escapeIdentifier } = pg; + const dsn = process.env.PGQUE_TEST_DSN; const batchSizes = [1, 100, 1000] as const; const repeats = Number.parseInt(process.env.PGQUE_BENCH_REPEATS ?? '3', 10); @@ -112,13 +115,25 @@ async function verifyCount(client: Client, queue: string, expected: number): Pro ); const table = tableResult.rows[0]?.current_event_table; if (!table) throw new Error(`current_event_table returned no row for ${queue}`); - const countResult = await client.rawPool.query<{ count: string }>(`select count(*) from ${table}`); - const got = Number.parseInt(countResult.rows[0]?.count ?? 'NaN', 10); - if (got !== expected) { - throw new Error(`${queue}: expected ${expected} events, got ${got}`); + // count(*) is int8, which the pgque pool parses to BigInt (OID 20). + const countResult = await client.rawPool.query<{ count: bigint }>( + `select count(*) from ${quoteQualifiedIdent(table)}`, + ); + const got = countResult.rows[0]?.count; + if (got !== BigInt(expected)) { + throw new Error(`${queue}: expected ${expected} events, got ${got ?? 'no row'}`); } } +// Quote a possibly schema-qualified relation name (e.g. "pgque.event_1_0") +// part by part so it is safe to splice into SQL as an identifier. +function quoteQualifiedIdent(name: string): string { + return name + .split('.') + .map((part) => escapeIdentifier(part)) + .join('.'); +} + function median(values: number[]): number { const sorted = [...values].sort((a, b) => a - b); const mid = Math.floor(sorted.length / 2);