From 7c870231e3e3d73b82275fd0acbdcc9248101313 Mon Sep 17 00:00:00 2001 From: Josh Bowling Date: Tue, 20 Aug 2024 19:21:29 +0900 Subject: [PATCH] Add on notification listener support Change-type: minor --- src/database-layer/db.ts | 59 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/src/database-layer/db.ts b/src/database-layer/db.ts index 51c82f301..a25c0f638 100644 --- a/src/database-layer/db.ts +++ b/src/database-layer/db.ts @@ -106,6 +106,13 @@ export interface Database extends BaseDatabase { ) => Promise; transaction: TransactionFn; readTransaction: TransactionFn; + on?: ( + name: 'notification', + fn: (...args: any[]) => Promise, + options?: { + channel?: string; + }, + ) => void; } interface EngineParams { @@ -706,9 +713,61 @@ if (maybePg != null) { `); } } + + // Connect and listen for notifications + async function listen( + channel: string, + fn: (...args: any[]) => Promise, + ) { + let listenerClient: Pg.PoolClient | null = null; + + // Clean up and reconnect listener + const reconnect = () => { + try { + listenerClient?.release(); + } catch (err) { + // Ignore listener client release errors + } + setTimeout(() => { + void listen(channel, fn); + }, 1000); + }; + + try { + listenerClient = await pool.connect(); + listenerClient.on('end', reconnect); + listenerClient.on('notification', (msg) => { + if (msg.channel === channel) { + void fn(msg).catch((err) => { + console.error(`Error handling message for '${channel}':`, err); + }); + } + }); + await listenerClient.query(`LISTEN "${channel}"`); + } catch (err) { + console.error( + `Error setting up listener client for '${channel}':`, + err, + ); + reconnect(); + } + } + return { engine: Engines.postgres, executeSql: atomicExecuteSql, + on: async (name, fn, options) => { + if (name !== 'notification') { + throw new Error(`Unsupported listener type: ${name}`); + } else if (options?.channel == null) { + throw new Error('Missing channel option for notification listener'); + } else if (options.channel.includes('"')) { + throw new Error( + `Invalid channel name for task LISTEN: ${options.channel}`, + ); + } + await listen(options.channel, fn); + }, transaction: createTransaction(async (stackTraceErr, timeoutMS) => { const client = await pool.connect(); const tx = new PostgresTx(client, false, stackTraceErr, timeoutMS);