diff --git a/clients/typescript/src/consumer.ts b/clients/typescript/src/consumer.ts index fee27aaa..6b6a4605 100644 --- a/clients/typescript/src/consumer.ts +++ b/clients/typescript/src/consumer.ts @@ -131,11 +131,14 @@ export class Consumer { if (batchId !== null) { if (anyNackFailed) { // At least one required nack failed. Skip ack so PgQ redelivers - // the batch on the next poll instead of advancing the consumer - // past messages we couldn't route. + // the batch instead of advancing the consumer past messages we + // couldn't route. The batch is unfinished, so `next_batch` would + // return it again immediately — sleep one poll interval before + // re-polling to avoid a hot loop that re-runs every handler. this.logger.error( `pgque: skipping ack for batch ${batchId}; one or more nacks failed and the batch will be redelivered`, ); + await sleep(this.pollIntervalMs, signal); } else { try { const n = await this.client.ack(batchId); @@ -145,7 +148,10 @@ export class Consumer { ); } } catch (err) { + // Unfinished batch: same redelivery situation as the failed-nack + // path above, so back off one poll interval before re-polling. this.logger.error(`pgque: ack error: ${formatErr(err)}`); + await sleep(this.pollIntervalMs, signal); } } } diff --git a/clients/typescript/src/producer_bench.ts b/clients/typescript/src/producer_bench.ts index f97acbad..5913039a 100644 --- a/clients/typescript/src/producer_bench.ts +++ b/clients/typescript/src/producer_bench.ts @@ -6,8 +6,11 @@ import { randomBytes } from 'node:crypto'; import { performance } from 'node:perf_hooks'; +import pg from 'pg'; import { connect, type Client } from './client.js'; +const { escapeIdentifier } = pg; + const dsn = process.env.PGQUE_TEST_DSN; const batchSizes = [1, 100, 1000] as const; const repeats = Number.parseInt(process.env.PGQUE_BENCH_REPEATS ?? '3', 10); @@ -112,13 +115,25 @@ async function verifyCount(client: Client, queue: string, expected: number): Pro ); const table = tableResult.rows[0]?.current_event_table; if (!table) throw new Error(`current_event_table returned no row for ${queue}`); - const countResult = await client.rawPool.query<{ count: string }>(`select count(*) from ${table}`); - const got = Number.parseInt(countResult.rows[0]?.count ?? 'NaN', 10); - if (got !== expected) { - throw new Error(`${queue}: expected ${expected} events, got ${got}`); + // count(*) is int8, which the pgque pool parses to BigInt (OID 20). + const countResult = await client.rawPool.query<{ count: bigint }>( + `select count(*) from ${quoteQualifiedIdent(table)}`, + ); + const got = countResult.rows[0]?.count; + if (got !== BigInt(expected)) { + throw new Error(`${queue}: expected ${expected} events, got ${got ?? 'no row'}`); } } +// Quote a possibly schema-qualified relation name (e.g. "pgque.event_1_0") +// part by part so it is safe to splice into SQL as an identifier. +function quoteQualifiedIdent(name: string): string { + return name + .split('.') + .map((part) => escapeIdentifier(part)) + .join('.'); +} + function median(values: number[]): number { const sorted = [...values].sort((a, b) => a - b); const mid = Math.floor(sorted.length / 2); diff --git a/clients/typescript/test/consumer.test.ts b/clients/typescript/test/consumer.test.ts index 294bb98f..0e687cc4 100644 --- a/clients/typescript/test/consumer.test.ts +++ b/clients/typescript/test/consumer.test.ts @@ -456,6 +456,104 @@ describe('Consumer (in-memory mocks)', () => { ).toThrow(/retryAfter/); }); + it('sleeps pollInterval before re-polling after a nack failure', async () => { + const msg: Message = { + msgId: 4n, + batchId: 102n, + type: 'will_fail', + payload: '{}', + retryCount: null, + createdAt: new Date(), + extra1: null, + extra2: null, + extra3: null, + extra4: null, + }; + + // receive always returns the same batch (PgQ redelivers an unfinished + // batch instantly) and nack always fails, so an unfixed loop re-polls + // at full speed. With a 60s pollInterval, a backoff-respecting loop + // must call receive exactly once within the observation window. + const fakeClient = { + receive: vi.fn(async () => [msg]), + ack: vi.fn(async () => undefined), + nack: vi.fn(async () => { + throw new Error('synthetic persistent nack failure'); + }), + }; + + const consumer = new Consumer(fakeClient as unknown as Client, 'q', 'c', { + pollInterval: 60_000, + logger: { warn: () => undefined, error: () => undefined }, + }); + consumer.handle('will_fail', async () => { + throw new Error('handler boom'); + }); + + const ac = new AbortController(); + const startPromise = consumer.start(ac.signal); + const deadline = Date.now() + 4000; + while (Date.now() < deadline && fakeClient.nack.mock.calls.length === 0) { + await sleep(10); + } + // Give an unfixed (hot-looping) consumer time to re-poll. + await sleep(300); + ac.abort(); + await startPromise; + + expect(fakeClient.receive).toHaveBeenCalledTimes(1); + expect(fakeClient.nack).toHaveBeenCalledTimes(1); + expect(fakeClient.ack).toHaveBeenCalledTimes(0); + }); + + it('sleeps pollInterval before re-polling after an ack error', async () => { + const msg: Message = { + msgId: 5n, + batchId: 103n, + type: 'fine', + payload: '{}', + retryCount: null, + createdAt: new Date(), + extra1: null, + extra2: null, + extra3: null, + extra4: null, + }; + + let handlerCalls = 0; + const fakeClient = { + receive: vi.fn(async () => [msg]), + ack: vi.fn(async () => { + throw new Error('synthetic ack failure'); + }), + nack: vi.fn(async () => undefined), + }; + + const consumer = new Consumer(fakeClient as unknown as Client, 'q', 'c', { + pollInterval: 60_000, + logger: { warn: () => undefined, error: () => undefined }, + }); + consumer.handle('fine', async () => { + handlerCalls += 1; + }); + + const ac = new AbortController(); + const startPromise = consumer.start(ac.signal); + const deadline = Date.now() + 4000; + while (Date.now() < deadline && fakeClient.ack.mock.calls.length === 0) { + await sleep(10); + } + // Give an unfixed (hot-looping) consumer time to re-poll and re-run + // the handler with duplicate side effects. + await sleep(300); + ac.abort(); + await startPromise; + + expect(fakeClient.receive).toHaveBeenCalledTimes(1); + expect(fakeClient.ack).toHaveBeenCalledTimes(1); + expect(handlerCalls).toBe(1); + }); + it('does not call ack when nack fails for an unknown event type', async () => { const msg: Message = { msgId: 2n,