Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions integrationTests/cluster/replicationReconnect.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,43 @@ suite('Replication Reconnect', { timeout: 120000 }, (ctx) => {
);
});

test('remove_node while peer is down stops retries permanently', async () => {
// Ensure replication is active.
let status = await sendOperation(ctx.nodes[1], { operation: 'cluster_status' });
if (status.connections.length === 0) {
await sendOperation(ctx.nodes[1], {
operation: 'add_node',
rejectUnauthorized: false,
hostname: ctx.nodes[0].hostname,
authorization: ctx.nodes[1].admin,
});
await waitForConnected(ctx.nodes[1], 1);
}

// Kill node0 to put node1 into the retry-connect state.
await killHarper({ harper: ctx.nodes[0] });
// Brief pause to let node1 detect the disconnect and schedule a retry.
await delay(800);

// remove_node while the peer is unreachable. This must stop all retry
// attempts — the intentionallyUnsubscribed guard in connect() ensures the
// pending retry timer fires but returns immediately rather than opening a
// new socket.
await sendOperation(ctx.nodes[1], {
operation: 'remove_node',
hostname: ctx.nodes[0].hostname,
});
const afterRemove = await waitForConnected(ctx.nodes[1], 0, false);
equal(afterRemove.connections.length, 0, 'expected 0 connections immediately after remove_node');

// Restart node0. If the retry loop was not properly stopped, node1 would
// reconnect once node0 is reachable again. Verify it does not.
ctx.nodes[0] = (await startHarper({ harper: ctx.nodes[0] })).harper;
await delay(2000);
status = await sendOperation(ctx.nodes[1], { operation: 'cluster_status' });
equal(status.connections.length, 0, 'node1 must not reconnect to a removed node even after that node restarts');
});

test('kill + restart of peer recovers replication connectivity', async () => {
// Assume previous test left replication active. If not, re-establish.
let status = await sendOperation(ctx.nodes[1], { operation: 'cluster_status' });
Expand Down
7 changes: 5 additions & 2 deletions replication/replicationConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@
}

async connect() {
if (this.intentionallyUnsubscribed) return;
if (!this.session) this.resetSession();
// TODO: Need to do this specifically for each node
this.socket = await createWebSocket(this.url, { serverName: this.nodeName, authorization: this.authorization });
Expand All @@ -241,7 +242,7 @@
let session;
logger.debug?.(`Connecting to ${this.url}, db: ${this.databaseName}, process ${process.pid}`);
this.socket.on('open', () => {
this.socket._socket.unref();

Check failure on line 245 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v22)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 245 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v20)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 245 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v25)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 245 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v24)

Property '_socket' does not exist on type 'WebSocket'.
// in normal startup, just use info, but adjust log level to warn if we were previously disconnected, because there was a warn message on the disconnect and we want to keep symmetry
logger[this.isConnected ? 'info' : 'warn']?.(`Connected to ${this.url}, db: ${this.databaseName}`);
this.retries = 0;
Expand Down Expand Up @@ -287,13 +288,13 @@
}
});
this.socket.on('error', (error) => {
if (error.code === 'SELF_SIGNED_CERT_IN_CHAIN') {

Check failure on line 291 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v22)

Property 'code' does not exist on type 'Error'.

Check failure on line 291 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v20)

Property 'code' does not exist on type 'Error'.

Check failure on line 291 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v25)

Property 'code' does not exist on type 'Error'.

Check failure on line 291 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v24)

Property 'code' does not exist on type 'Error'.
logger.warn?.(
`Can not connect to ${this.url}, this server does not have a certificate authority for the certificate provided by ${this.url}`
);
error.isHandled = true;

Check failure on line 295 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v22)

Property 'isHandled' does not exist on type 'Error'.

Check failure on line 295 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v20)

Property 'isHandled' does not exist on type 'Error'.

Check failure on line 295 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v25)

Property 'isHandled' does not exist on type 'Error'.

Check failure on line 295 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v24)

Property 'isHandled' does not exist on type 'Error'.
} else if (error.code !== 'ECONNREFUSED') {

Check failure on line 296 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v22)

Property 'code' does not exist on type 'Error'.

Check failure on line 296 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v20)

Property 'code' does not exist on type 'Error'.

Check failure on line 296 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v25)

Property 'code' does not exist on type 'Error'.

Check failure on line 296 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v24)

Property 'code' does not exist on type 'Error'.
if (error.code === 'UNABLE_TO_VERIFY_LEAF_SIGNATURE')

Check failure on line 297 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v22)

Property 'code' does not exist on type 'Error'.

Check failure on line 297 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v20)

Property 'code' does not exist on type 'Error'.

Check failure on line 297 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v25)

Property 'code' does not exist on type 'Error'.

Check failure on line 297 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v24)

Property 'code' does not exist on type 'Error'.
logger.error?.(
`Can not connect to ${this.url}, the certificate provided by ${this.url} is not trusted, this node needs to be added to the cluster, or a certificate authority needs to be added`
);
Expand Down Expand Up @@ -359,7 +360,7 @@
}
unsubscribe() {
this.intentionallyUnsubscribed = true;
this.socket.close(1008, 'No longer subscribed');
this.socket?.close(1008, 'No longer subscribed');
}

getRecord(request) {
Expand Down Expand Up @@ -423,13 +424,13 @@
if (options.url) {
const sendPing = () => {
// if we have not received a message in the last ping interval, we should terminate the connection (but check to make sure we aren't just waiting for other data to flow)
if (lastPingTime && bytesRead === ws._socket?.bytesRead && bytesWritten === ws._socket?.bytesWritten)

Check failure on line 427 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v22)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 427 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v22)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 427 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v20)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 427 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v20)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 427 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v25)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 427 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v25)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 427 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v24)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 427 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v24)

Property '_socket' does not exist on type 'WebSocket'.
ws.terminate(); // timeout
else {
lastPingTime = performance.now();
ws.ping();
bytesRead = ws._socket?.bytesRead;

Check failure on line 432 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v22)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 432 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v20)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 432 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v25)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 432 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v24)

Property '_socket' does not exist on type 'WebSocket'.
bytesWritten = ws._socket?.bytesWritten;

Check failure on line 433 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v22)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 433 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v20)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 433 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v25)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 433 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v24)

Property '_socket' does not exist on type 'WebSocket'.
}
};
sendPingInterval = setInterval(sendPing, PING_INTERVAL).unref();
Expand All @@ -437,7 +438,7 @@
} else {
resetPingTimer();
}
ws._socket?.setMaxListeners(200); // we should allow a lot of drain listeners for concurrent blob streams

Check failure on line 441 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v22)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 441 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v20)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 441 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v25)

Property '_socket' does not exist on type 'WebSocket'.

Check failure on line 441 in replication/replicationConnection.ts

View workflow job for this annotation

GitHub Actions / Build Harper Pro (Node.js v24)

Property '_socket' does not exist on type 'WebSocket'.
function resetPingTimer() {
clearTimeout(receivePingTimer);
bytesRead = ws._socket?.bytesRead;
Expand Down Expand Up @@ -1156,7 +1157,9 @@
const matchesSubscription =
(excludedNodes && timeRange === undefined) ||
// if it is in the list, we check the timestamps to verify it matches
(timeRange && (timeRange as any).startTime < localTime && (!(timeRange as any).endTime || (timeRange as any).endTime > localTime));
(timeRange &&
(timeRange as any).startTime < localTime &&
(!(timeRange as any).endTime || (timeRange as any).endTime > localTime));
if (!matchesSubscription) {
if (DEBUG_MODE)
logger.trace?.(
Expand Down
4 changes: 3 additions & 1 deletion replication/replicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,9 @@ function getSubscriptionConnection(
(connection = new NodeReplicationConnection(connectingUrl, subscription, dbName, nodeName, authorization))
);
connection.connect();
connection.once('finished', () => dbConnections.delete(dbName));
connection.once('finished', () => {
if (dbConnections.get(dbName) === connection) dbConnections.delete(dbName);
});
return connection;
}
}
Expand Down
1 change: 1 addition & 0 deletions replication/subscriptionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ export async function startOnMainThread(options) {
logger.info('Setting up node replication for', node);
if (!node) {
// deleted node
nodeMap.delete(hostname);
for (const [url, dbReplicationWorkers] of connectionReplicationMap) {
let foundNode;
for (const [_database, { nodes }] of dbReplicationWorkers) {
Expand Down