Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions integrationTests/cluster/replicationTopology.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,81 @@ suite('Replication Topology', { timeout: 120000 }, (ctx) => {
equal(response.length, 1);
equal(response[0].name, 'test while disconnected');
});
test('replicate per-record expiration so records evict on receivers', async () => {
// Create a TTL table — schema replication propagates the table-level
// expiration to every node so the cleanup scanner is armed everywhere.
// Records written on node 0 must arrive on every other node carrying
// their expiresAt metadata so they evict roughly when the sender said
// they should.
const TTL_SECONDS = 1;
await sendOperation(ctx.nodes[0], {
operation: 'create_table',
table: 'ttl_test',
primary_key: 'id',
expiration: TTL_SECONDS,
attributes: [
{ name: 'id', type: 'ID' },
{ name: 'name', type: 'String' },
],
});
// wait for the table to materialize on every node before we write
for (let i = 1; i < NODE_COUNT; i++) {
let retries = 0;
let described;
do {
await delay(100);
described = await sendOperation(ctx.nodes[i], { operation: 'describe_database' });
} while (!described?.data?.ttl_test && retries++ < 20);
}
// write several records in a single upsert so they share a txn batch on
// the wire. Past bugs only set the expiresAt metadata for the first
// record in such a batch, leaving the rest with the receiver's TTL.
const ttlIds = ['ttl-1', 'ttl-2', 'ttl-3', 'ttl-4'];
const writtenAt = Date.now();
await sendOperation(ctx.nodes[0], {
operation: 'upsert',
table: 'ttl_test',
records: ttlIds.map((id) => ({ id, name: 'expires soon' })),
replicatedConfirmation: NODE_COUNT - 1,
});
// the records should be present on every node before TTL elapses
for (let i = 0; i < NODE_COUNT; i++) {
let response;
let retries = 0;
do {
await delay(50);
response = await sendOperation(ctx.nodes[i], {
operation: 'search_by_id',
table: 'ttl_test',
get_attributes: ['id', 'name'],
ids: ttlIds,
});
} while (response.length < ttlIds.length && retries++ < 10);
equal(
response.length,
ttlIds.length,
`Node ${i} ${ctx.nodes[i].hostname} did not replicate TTL records (got ${response.length})`
);
}
// wait long enough for the sender's expiresAt to pass plus a scan tick.
// If the receiver's entry metadata is missing expiresAt, records will
// stick around even after this timeout and the assertion below will fail.
const elapsed = Date.now() - writtenAt;
await delay(Math.max(0, TTL_SECONDS * 1000 - elapsed) + 1500);
for (let i = 0; i < NODE_COUNT; i++) {
const response = await sendOperation(ctx.nodes[i], {
operation: 'search_by_id',
table: 'ttl_test',
get_attributes: ['id', 'name'],
ids: ttlIds,
});
equal(
response.length,
0,
`Node ${i} ${ctx.nodes[i].hostname} did not evict all replicated records past expiresAt (still has ${response.length})`
);
}
});
test('Replicate data from a legacy node', async () => {
const legacyPath = process.env.HARPER_LEGACY_VERSION_PATH;
if (!legacyPath) return;
Expand Down Expand Up @@ -307,6 +382,49 @@ suite('Replication Topology', { timeout: 120000 }, (ctx) => {
table: 'test',
records: [{ id: 'old-data-1', name: 'old data test' }],
});
// Also create a TTL table on the legacy node and write a record that
// expires shortly. This exercises the cross-version path where the
// legacy peer is the producer of the audit-log expiresAt metadata.
// We create the table on the v5 nodes first (matching attributes and
// expiration) so they each have a cleanup scanner armed; the v4 peer's
// schema sync will see no attribute changes and leave the v5 TTL config
// intact. Without this, the v5 receivers inherit the v4 table without
// the table-level expiration property (v4 does not transmit it) and
// nothing reaps the records past expiry.
const LEGACY_TTL_SECONDS = 2;
await sendOperation(ctx.nodes[0], {
operation: 'create_table',
table: 'legacy_ttl',
primary_key: 'id',
expiration: LEGACY_TTL_SECONDS,
attributes: [
{ name: 'id', type: 'ID' },
{ name: 'name', type: 'String' },
],
});
// give the v5 cluster a moment to propagate the new table (and its
// expiration setting) to the rest of the v5 nodes via schema sync.
await delay(200);
await sendOperation(legacyContext.harper, {
operation: 'create_table',
table: 'legacy_ttl',
primary_key: 'id',
expiration: LEGACY_TTL_SECONDS,
attributes: [
{ name: 'id', type: 'ID' },
{ name: 'name', type: 'String' },
],
});
// write several records in a single upsert so they share a txn batch,
// exercising the case where only the first record in a batch carried
// the expiresAt metadata on the receiver.
const legacyTtlIds = ['legacy-ttl-1', 'legacy-ttl-2', 'legacy-ttl-3', 'legacy-ttl-4'];
const legacyTtlWrittenAt = Date.now();
await sendOperation(legacyContext.harper, {
operation: 'upsert',
table: 'legacy_ttl',
records: legacyTtlIds.map((id) => ({ id, name: 'legacy expires soon' })),
});
await sendOperation(ctx.nodes[0], {
// connect the central node
operation: 'add_node',
Expand Down Expand Up @@ -343,5 +461,47 @@ suite('Replication Topology', { timeout: 120000 }, (ctx) => {
equal(response.length, 1, `Node ${i} ${ctx.nodes[i].hostname} did not replicate data from legacy node`);
equal(response[0].name, 'old data test');
}
// Schema replication carries the legacy_ttl table (including its
// expiration setting) onto the v5 nodes, which is what arms the cleanup
// scanner there. Wait for the records to land, then verify they evict
// at the expiresAt the legacy peer set.
for (let i = 0; i < NODE_COUNT; i++) {
retries = 0;
do {
response = await sendOperation(ctx.nodes[i], {
operation: 'search_by_id',
table: 'legacy_ttl',
get_attributes: ['id', 'name'],
ids: legacyTtlIds,
});
if (retries++ > 20) break;
await delay(100);
} while (response.length < legacyTtlIds.length && Date.now() - legacyTtlWrittenAt < LEGACY_TTL_SECONDS * 1000);
// it is acceptable for records to have already expired before being
// observed (test machines can be slow), so we only assert presence if
// we are still inside the TTL window.
if (Date.now() - legacyTtlWrittenAt < LEGACY_TTL_SECONDS * 1000) {
equal(
response.length,
legacyTtlIds.length,
`Node ${i} did not receive all legacy_ttl records before expiry (got ${response.length})`
);
}
}
const elapsed = Date.now() - legacyTtlWrittenAt;
await delay(Math.max(0, LEGACY_TTL_SECONDS * 1000 - elapsed) + 2000);
for (let i = 0; i < NODE_COUNT; i++) {
response = await sendOperation(ctx.nodes[i], {
operation: 'search_by_id',
table: 'legacy_ttl',
get_attributes: ['id', 'name'],
ids: legacyTtlIds,
});
equal(
response.length,
0,
`Node ${i} ${ctx.nodes[i].hostname} did not evict all legacy-ttl records past expiresAt (still has ${response.length})`
);
}
});
});
Loading