diff --git a/integrationTests/cluster/replicationTopology.test.mjs b/integrationTests/cluster/replicationTopology.test.mjs index 5bd01e775..3ece55421 100644 --- a/integrationTests/cluster/replicationTopology.test.mjs +++ b/integrationTests/cluster/replicationTopology.test.mjs @@ -264,6 +264,81 @@ suite('Replication Topology', { timeout: 120000 }, (ctx) => { equal(response.length, 1); equal(response[0].name, 'test while disconnected'); }); + test('replicate per-record expiration so records evict on receivers', async () => { + // Create a TTL table — schema replication propagates the table-level + // expiration to every node so the cleanup scanner is armed everywhere. + // Records written on node 0 must arrive on every other node carrying + // their expiresAt metadata so they evict roughly when the sender said + // they should. + const TTL_SECONDS = 1; + await sendOperation(ctx.nodes[0], { + operation: 'create_table', + table: 'ttl_test', + primary_key: 'id', + expiration: TTL_SECONDS, + attributes: [ + { name: 'id', type: 'ID' }, + { name: 'name', type: 'String' }, + ], + }); + // wait for the table to materialize on every node before we write + for (let i = 1; i < NODE_COUNT; i++) { + let retries = 0; + let described; + do { + await delay(100); + described = await sendOperation(ctx.nodes[i], { operation: 'describe_database' }); + } while (!described?.data?.ttl_test && retries++ < 20); + } + // write several records in a single upsert so they share a txn batch on + // the wire. Past bugs only set the expiresAt metadata for the first + // record in such a batch, leaving the rest with the receiver's TTL. + const ttlIds = ['ttl-1', 'ttl-2', 'ttl-3', 'ttl-4']; + const writtenAt = Date.now(); + await sendOperation(ctx.nodes[0], { + operation: 'upsert', + table: 'ttl_test', + records: ttlIds.map((id) => ({ id, name: 'expires soon' })), + replicatedConfirmation: NODE_COUNT - 1, + }); + // the records should be present on every node before TTL elapses + for (let i = 0; i < NODE_COUNT; i++) { + let response; + let retries = 0; + do { + await delay(50); + response = await sendOperation(ctx.nodes[i], { + operation: 'search_by_id', + table: 'ttl_test', + get_attributes: ['id', 'name'], + ids: ttlIds, + }); + } while (response.length < ttlIds.length && retries++ < 10); + equal( + response.length, + ttlIds.length, + `Node ${i} ${ctx.nodes[i].hostname} did not replicate TTL records (got ${response.length})` + ); + } + // wait long enough for the sender's expiresAt to pass plus a scan tick. + // If the receiver's entry metadata is missing expiresAt, records will + // stick around even after this timeout and the assertion below will fail. + const elapsed = Date.now() - writtenAt; + await delay(Math.max(0, TTL_SECONDS * 1000 - elapsed) + 1500); + for (let i = 0; i < NODE_COUNT; i++) { + const response = await sendOperation(ctx.nodes[i], { + operation: 'search_by_id', + table: 'ttl_test', + get_attributes: ['id', 'name'], + ids: ttlIds, + }); + equal( + response.length, + 0, + `Node ${i} ${ctx.nodes[i].hostname} did not evict all replicated records past expiresAt (still has ${response.length})` + ); + } + }); test('Replicate data from a legacy node', async () => { const legacyPath = process.env.HARPER_LEGACY_VERSION_PATH; if (!legacyPath) return; @@ -307,6 +382,49 @@ suite('Replication Topology', { timeout: 120000 }, (ctx) => { table: 'test', records: [{ id: 'old-data-1', name: 'old data test' }], }); + // Also create a TTL table on the legacy node and write a record that + // expires shortly. This exercises the cross-version path where the + // legacy peer is the producer of the audit-log expiresAt metadata. + // We create the table on the v5 nodes first (matching attributes and + // expiration) so they each have a cleanup scanner armed; the v4 peer's + // schema sync will see no attribute changes and leave the v5 TTL config + // intact. Without this, the v5 receivers inherit the v4 table without + // the table-level expiration property (v4 does not transmit it) and + // nothing reaps the records past expiry. + const LEGACY_TTL_SECONDS = 2; + await sendOperation(ctx.nodes[0], { + operation: 'create_table', + table: 'legacy_ttl', + primary_key: 'id', + expiration: LEGACY_TTL_SECONDS, + attributes: [ + { name: 'id', type: 'ID' }, + { name: 'name', type: 'String' }, + ], + }); + // give the v5 cluster a moment to propagate the new table (and its + // expiration setting) to the rest of the v5 nodes via schema sync. + await delay(200); + await sendOperation(legacyContext.harper, { + operation: 'create_table', + table: 'legacy_ttl', + primary_key: 'id', + expiration: LEGACY_TTL_SECONDS, + attributes: [ + { name: 'id', type: 'ID' }, + { name: 'name', type: 'String' }, + ], + }); + // write several records in a single upsert so they share a txn batch, + // exercising the case where only the first record in a batch carried + // the expiresAt metadata on the receiver. + const legacyTtlIds = ['legacy-ttl-1', 'legacy-ttl-2', 'legacy-ttl-3', 'legacy-ttl-4']; + const legacyTtlWrittenAt = Date.now(); + await sendOperation(legacyContext.harper, { + operation: 'upsert', + table: 'legacy_ttl', + records: legacyTtlIds.map((id) => ({ id, name: 'legacy expires soon' })), + }); await sendOperation(ctx.nodes[0], { // connect the central node operation: 'add_node', @@ -343,5 +461,47 @@ suite('Replication Topology', { timeout: 120000 }, (ctx) => { equal(response.length, 1, `Node ${i} ${ctx.nodes[i].hostname} did not replicate data from legacy node`); equal(response[0].name, 'old data test'); } + // Schema replication carries the legacy_ttl table (including its + // expiration setting) onto the v5 nodes, which is what arms the cleanup + // scanner there. Wait for the records to land, then verify they evict + // at the expiresAt the legacy peer set. + for (let i = 0; i < NODE_COUNT; i++) { + retries = 0; + do { + response = await sendOperation(ctx.nodes[i], { + operation: 'search_by_id', + table: 'legacy_ttl', + get_attributes: ['id', 'name'], + ids: legacyTtlIds, + }); + if (retries++ > 20) break; + await delay(100); + } while (response.length < legacyTtlIds.length && Date.now() - legacyTtlWrittenAt < LEGACY_TTL_SECONDS * 1000); + // it is acceptable for records to have already expired before being + // observed (test machines can be slow), so we only assert presence if + // we are still inside the TTL window. + if (Date.now() - legacyTtlWrittenAt < LEGACY_TTL_SECONDS * 1000) { + equal( + response.length, + legacyTtlIds.length, + `Node ${i} did not receive all legacy_ttl records before expiry (got ${response.length})` + ); + } + } + const elapsed = Date.now() - legacyTtlWrittenAt; + await delay(Math.max(0, LEGACY_TTL_SECONDS * 1000 - elapsed) + 2000); + for (let i = 0; i < NODE_COUNT; i++) { + response = await sendOperation(ctx.nodes[i], { + operation: 'search_by_id', + table: 'legacy_ttl', + get_attributes: ['id', 'name'], + ids: legacyTtlIds, + }); + equal( + response.length, + 0, + `Node ${i} ${ctx.nodes[i].hostname} did not evict all legacy-ttl records past expiresAt (still has ${response.length})` + ); + } }); });