diff --git a/integrationTests/cluster/replicationReconnect.test.mjs b/integrationTests/cluster/replicationReconnect.test.mjs index 062eeecaa..247691698 100644 --- a/integrationTests/cluster/replicationReconnect.test.mjs +++ b/integrationTests/cluster/replicationReconnect.test.mjs @@ -152,6 +152,43 @@ suite('Replication Reconnect', { timeout: 120000 }, (ctx) => { ); }); + test('remove_node while peer is down stops retries permanently', async () => { + // Ensure replication is active. + let status = await sendOperation(ctx.nodes[1], { operation: 'cluster_status' }); + if (status.connections.length === 0) { + await sendOperation(ctx.nodes[1], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[0].hostname, + authorization: ctx.nodes[1].admin, + }); + await waitForConnected(ctx.nodes[1], 1); + } + + // Kill node0 to put node1 into the retry-connect state. + await killHarper({ harper: ctx.nodes[0] }); + // Brief pause to let node1 detect the disconnect and schedule a retry. + await delay(800); + + // remove_node while the peer is unreachable. This must stop all retry + // attempts — the intentionallyUnsubscribed guard in connect() ensures the + // pending retry timer fires but returns immediately rather than opening a + // new socket. + await sendOperation(ctx.nodes[1], { + operation: 'remove_node', + hostname: ctx.nodes[0].hostname, + }); + const afterRemove = await waitForConnected(ctx.nodes[1], 0, false); + equal(afterRemove.connections.length, 0, 'expected 0 connections immediately after remove_node'); + + // Restart node0. If the retry loop was not properly stopped, node1 would + // reconnect once node0 is reachable again. Verify it does not. + ctx.nodes[0] = (await startHarper({ harper: ctx.nodes[0] })).harper; + await delay(2000); + status = await sendOperation(ctx.nodes[1], { operation: 'cluster_status' }); + equal(status.connections.length, 0, 'node1 must not reconnect to a removed node even after that node restarts'); + }); + test('kill + restart of peer recovers replication connectivity', async () => { // Assume previous test left replication active. If not, re-establish. let status = await sendOperation(ctx.nodes[1], { operation: 'cluster_status' }); diff --git a/replication/replicationConnection.ts b/replication/replicationConnection.ts index ab7a3cfbf..9fbb42865 100644 --- a/replication/replicationConnection.ts +++ b/replication/replicationConnection.ts @@ -234,6 +234,7 @@ export class NodeReplicationConnection extends EventEmitter { } async connect() { + if (this.intentionallyUnsubscribed) return; if (!this.session) this.resetSession(); // TODO: Need to do this specifically for each node this.socket = await createWebSocket(this.url, { serverName: this.nodeName, authorization: this.authorization }); @@ -359,7 +360,7 @@ export class NodeReplicationConnection extends EventEmitter { } unsubscribe() { this.intentionallyUnsubscribed = true; - this.socket.close(1008, 'No longer subscribed'); + this.socket?.close(1008, 'No longer subscribed'); } getRecord(request) { @@ -1156,7 +1157,9 @@ export function replicateOverWS(ws: WebSocket, options: any, authorization: any) const matchesSubscription = (excludedNodes && timeRange === undefined) || // if it is in the list, we check the timestamps to verify it matches - (timeRange && (timeRange as any).startTime < localTime && (!(timeRange as any).endTime || (timeRange as any).endTime > localTime)); + (timeRange && + (timeRange as any).startTime < localTime && + (!(timeRange as any).endTime || (timeRange as any).endTime > localTime)); if (!matchesSubscription) { if (DEBUG_MODE) logger.trace?.( diff --git a/replication/replicator.ts b/replication/replicator.ts index 370a00438..f43ba60e8 100644 --- a/replication/replicator.ts +++ b/replication/replicator.ts @@ -476,7 +476,9 @@ function getSubscriptionConnection( (connection = new NodeReplicationConnection(connectingUrl, subscription, dbName, nodeName, authorization)) ); connection.connect(); - connection.once('finished', () => dbConnections.delete(dbName)); + connection.once('finished', () => { + if (dbConnections.get(dbName) === connection) dbConnections.delete(dbName); + }); return connection; } } diff --git a/replication/subscriptionManager.ts b/replication/subscriptionManager.ts index 81e29e06c..548818cf4 100644 --- a/replication/subscriptionManager.ts +++ b/replication/subscriptionManager.ts @@ -146,6 +146,7 @@ export async function startOnMainThread(options) { logger.info('Setting up node replication for', node); if (!node) { // deleted node + nodeMap.delete(hostname); for (const [url, dbReplicationWorkers] of connectionReplicationMap) { let foundNode; for (const [_database, { nodes }] of dbReplicationWorkers) {