Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions clients/typescript/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,14 @@ export class Consumer {
if (batchId !== null) {
if (anyNackFailed) {
// At least one required nack failed. Skip ack so PgQ redelivers
// the batch on the next poll instead of advancing the consumer
// past messages we couldn't route.
// the batch instead of advancing the consumer past messages we
// couldn't route. The batch is unfinished, so `next_batch` would
// return it again immediately — sleep one poll interval before
// re-polling to avoid a hot loop that re-runs every handler.
this.logger.error(
`pgque: skipping ack for batch ${batchId}; one or more nacks failed and the batch will be redelivered`,
);
await sleep(this.pollIntervalMs, signal);
} else {
try {
const n = await this.client.ack(batchId);
Expand All @@ -145,7 +148,10 @@ export class Consumer {
);
}
} catch (err) {
// Unfinished batch: same redelivery situation as the failed-nack
// path above, so back off one poll interval before re-polling.
this.logger.error(`pgque: ack error: ${formatErr(err)}`);
await sleep(this.pollIntervalMs, signal);
}
}
}
Expand Down
23 changes: 19 additions & 4 deletions clients/typescript/src/producer_bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

import { randomBytes } from 'node:crypto';
import { performance } from 'node:perf_hooks';
import pg from 'pg';
import { connect, type Client } from './client.js';

const { escapeIdentifier } = pg;

const dsn = process.env.PGQUE_TEST_DSN;
const batchSizes = [1, 100, 1000] as const;
const repeats = Number.parseInt(process.env.PGQUE_BENCH_REPEATS ?? '3', 10);
Expand Down Expand Up @@ -112,13 +115,25 @@ async function verifyCount(client: Client, queue: string, expected: number): Pro
);
const table = tableResult.rows[0]?.current_event_table;
if (!table) throw new Error(`current_event_table returned no row for ${queue}`);
const countResult = await client.rawPool.query<{ count: string }>(`select count(*) from ${table}`);
const got = Number.parseInt(countResult.rows[0]?.count ?? 'NaN', 10);
if (got !== expected) {
throw new Error(`${queue}: expected ${expected} events, got ${got}`);
// count(*) is int8, which the pgque pool parses to BigInt (OID 20).
const countResult = await client.rawPool.query<{ count: bigint }>(
`select count(*) from ${quoteQualifiedIdent(table)}`,
);
const got = countResult.rows[0]?.count;
if (got !== BigInt(expected)) {
throw new Error(`${queue}: expected ${expected} events, got ${got ?? 'no row'}`);
}
}

// Quote a possibly schema-qualified relation name (e.g. "pgque.event_1_0")
// part by part so it is safe to splice into SQL as an identifier.
function quoteQualifiedIdent(name: string): string {
return name
.split('.')
.map((part) => escapeIdentifier(part))
.join('.');
}

function median(values: number[]): number {
const sorted = [...values].sort((a, b) => a - b);
const mid = Math.floor(sorted.length / 2);
Expand Down
98 changes: 98 additions & 0 deletions clients/typescript/test/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,104 @@ describe('Consumer (in-memory mocks)', () => {
).toThrow(/retryAfter/);
});

it('sleeps pollInterval before re-polling after a nack failure', async () => {
const msg: Message = {
msgId: 4n,
batchId: 102n,
type: 'will_fail',
payload: '{}',
retryCount: null,
createdAt: new Date(),
extra1: null,
extra2: null,
extra3: null,
extra4: null,
};

// receive always returns the same batch (PgQ redelivers an unfinished
// batch instantly) and nack always fails, so an unfixed loop re-polls
// at full speed. With a 60s pollInterval, a backoff-respecting loop
// must call receive exactly once within the observation window.
const fakeClient = {
receive: vi.fn(async () => [msg]),
ack: vi.fn(async () => undefined),
nack: vi.fn(async () => {
throw new Error('synthetic persistent nack failure');
}),
};

const consumer = new Consumer(fakeClient as unknown as Client, 'q', 'c', {
pollInterval: 60_000,
logger: { warn: () => undefined, error: () => undefined },
});
consumer.handle('will_fail', async () => {
throw new Error('handler boom');
});

const ac = new AbortController();
const startPromise = consumer.start(ac.signal);
const deadline = Date.now() + 4000;
while (Date.now() < deadline && fakeClient.nack.mock.calls.length === 0) {
await sleep(10);
}
// Give an unfixed (hot-looping) consumer time to re-poll.
await sleep(300);
ac.abort();
await startPromise;

expect(fakeClient.receive).toHaveBeenCalledTimes(1);
expect(fakeClient.nack).toHaveBeenCalledTimes(1);
expect(fakeClient.ack).toHaveBeenCalledTimes(0);
});

it('sleeps pollInterval before re-polling after an ack error', async () => {
const msg: Message = {
msgId: 5n,
batchId: 103n,
type: 'fine',
payload: '{}',
retryCount: null,
createdAt: new Date(),
extra1: null,
extra2: null,
extra3: null,
extra4: null,
};

let handlerCalls = 0;
const fakeClient = {
receive: vi.fn(async () => [msg]),
ack: vi.fn(async () => {
throw new Error('synthetic ack failure');
}),
nack: vi.fn(async () => undefined),
};

const consumer = new Consumer(fakeClient as unknown as Client, 'q', 'c', {
pollInterval: 60_000,
logger: { warn: () => undefined, error: () => undefined },
});
consumer.handle('fine', async () => {
handlerCalls += 1;
});

const ac = new AbortController();
const startPromise = consumer.start(ac.signal);
const deadline = Date.now() + 4000;
while (Date.now() < deadline && fakeClient.ack.mock.calls.length === 0) {
await sleep(10);
}
// Give an unfixed (hot-looping) consumer time to re-poll and re-run
// the handler with duplicate side effects.
await sleep(300);
ac.abort();
await startPromise;

expect(fakeClient.receive).toHaveBeenCalledTimes(1);
expect(fakeClient.ack).toHaveBeenCalledTimes(1);
expect(handlerCalls).toBe(1);
});

it('does not call ack when nack fails for an unknown event type', async () => {
const msg: Message = {
msgId: 2n,
Expand Down
Loading