From 7755ec0b2895ba18752a50a1a4935e01163aabb8 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Fri, 22 May 2026 09:41:48 -0600 Subject: [PATCH] fix(replication): stop connecting to removed nodes after remove_node MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three related bugs caused replication to keep trying to connect to a removed node's hostname indefinitely after remove_node: 1. NodeReplicationConnection.connect() did not check intentionallyUnsubscribed at entry. If a retry timer fired after unsubscribe() was called, connect() would open a new socket to the removed node, reset the close handler's intentional flag, and loop again. Now connect() returns immediately when intentionallyUnsubscribed is true. 2. The once('finished') cleanup in getSubscriptionConnection deleted dbConnections[dbName] unconditionally. When a node is removed and re-added (same URL), the old connection's deferred 'finished' event would delete the new connection's entry, making future unsubscribeFromNode() calls unable to find and stop it. Fixed with an identity check before deleting. 3. onNodeUpdate's delete path never called nodeMap.delete(hostname), so removed nodes accumulated in nodeMap indefinitely. Also added socket null-safety in unsubscribe() (?.close instead of .close) for the edge case where unsubscribe() is called before connect() runs. Adds integration test: kill peer → remove_node → restart peer → verify node1 does NOT reconnect, directly exercising the retry-loop halt. Co-Authored-By: Claude Sonnet 4.6 --- .../cluster/replicationReconnect.test.mjs | 37 +++++++++++++++++++ replication/replicationConnection.ts | 7 +++- replication/replicator.ts | 4 +- replication/subscriptionManager.ts | 1 + 4 files changed, 46 insertions(+), 3 deletions(-) diff --git a/integrationTests/cluster/replicationReconnect.test.mjs b/integrationTests/cluster/replicationReconnect.test.mjs index 062eeecaa..247691698 100644 --- a/integrationTests/cluster/replicationReconnect.test.mjs +++ b/integrationTests/cluster/replicationReconnect.test.mjs @@ -152,6 +152,43 @@ suite('Replication Reconnect', { timeout: 120000 }, (ctx) => { ); }); + test('remove_node while peer is down stops retries permanently', async () => { + // Ensure replication is active. + let status = await sendOperation(ctx.nodes[1], { operation: 'cluster_status' }); + if (status.connections.length === 0) { + await sendOperation(ctx.nodes[1], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[0].hostname, + authorization: ctx.nodes[1].admin, + }); + await waitForConnected(ctx.nodes[1], 1); + } + + // Kill node0 to put node1 into the retry-connect state. + await killHarper({ harper: ctx.nodes[0] }); + // Brief pause to let node1 detect the disconnect and schedule a retry. + await delay(800); + + // remove_node while the peer is unreachable. This must stop all retry + // attempts — the intentionallyUnsubscribed guard in connect() ensures the + // pending retry timer fires but returns immediately rather than opening a + // new socket. + await sendOperation(ctx.nodes[1], { + operation: 'remove_node', + hostname: ctx.nodes[0].hostname, + }); + const afterRemove = await waitForConnected(ctx.nodes[1], 0, false); + equal(afterRemove.connections.length, 0, 'expected 0 connections immediately after remove_node'); + + // Restart node0. If the retry loop was not properly stopped, node1 would + // reconnect once node0 is reachable again. Verify it does not. + ctx.nodes[0] = (await startHarper({ harper: ctx.nodes[0] })).harper; + await delay(2000); + status = await sendOperation(ctx.nodes[1], { operation: 'cluster_status' }); + equal(status.connections.length, 0, 'node1 must not reconnect to a removed node even after that node restarts'); + }); + test('kill + restart of peer recovers replication connectivity', async () => { // Assume previous test left replication active. If not, re-establish. let status = await sendOperation(ctx.nodes[1], { operation: 'cluster_status' }); diff --git a/replication/replicationConnection.ts b/replication/replicationConnection.ts index ab7a3cfbf..9fbb42865 100644 --- a/replication/replicationConnection.ts +++ b/replication/replicationConnection.ts @@ -234,6 +234,7 @@ export class NodeReplicationConnection extends EventEmitter { } async connect() { + if (this.intentionallyUnsubscribed) return; if (!this.session) this.resetSession(); // TODO: Need to do this specifically for each node this.socket = await createWebSocket(this.url, { serverName: this.nodeName, authorization: this.authorization }); @@ -359,7 +360,7 @@ export class NodeReplicationConnection extends EventEmitter { } unsubscribe() { this.intentionallyUnsubscribed = true; - this.socket.close(1008, 'No longer subscribed'); + this.socket?.close(1008, 'No longer subscribed'); } getRecord(request) { @@ -1156,7 +1157,9 @@ export function replicateOverWS(ws: WebSocket, options: any, authorization: any) const matchesSubscription = (excludedNodes && timeRange === undefined) || // if it is in the list, we check the timestamps to verify it matches - (timeRange && (timeRange as any).startTime < localTime && (!(timeRange as any).endTime || (timeRange as any).endTime > localTime)); + (timeRange && + (timeRange as any).startTime < localTime && + (!(timeRange as any).endTime || (timeRange as any).endTime > localTime)); if (!matchesSubscription) { if (DEBUG_MODE) logger.trace?.( diff --git a/replication/replicator.ts b/replication/replicator.ts index 370a00438..f43ba60e8 100644 --- a/replication/replicator.ts +++ b/replication/replicator.ts @@ -476,7 +476,9 @@ function getSubscriptionConnection( (connection = new NodeReplicationConnection(connectingUrl, subscription, dbName, nodeName, authorization)) ); connection.connect(); - connection.once('finished', () => dbConnections.delete(dbName)); + connection.once('finished', () => { + if (dbConnections.get(dbName) === connection) dbConnections.delete(dbName); + }); return connection; } } diff --git a/replication/subscriptionManager.ts b/replication/subscriptionManager.ts index 81e29e06c..548818cf4 100644 --- a/replication/subscriptionManager.ts +++ b/replication/subscriptionManager.ts @@ -146,6 +146,7 @@ export async function startOnMainThread(options) { logger.info('Setting up node replication for', node); if (!node) { // deleted node + nodeMap.delete(hostname); for (const [url, dbReplicationWorkers] of connectionReplicationMap) { let foundNode; for (const [_database, { nodes }] of dbReplicationWorkers) {