diff --git a/integrationTests/cluster/blobSaveRejectionContainment.test.mjs b/integrationTests/cluster/blobSaveRejectionContainment.test.mjs new file mode 100644 index 000000000..8327aada4 --- /dev/null +++ b/integrationTests/cluster/blobSaveRejectionContainment.test.mjs @@ -0,0 +1,209 @@ +/** + * Regression guard for the receive-side blob save bug surfaced on the prod node that + * crashed earlier in May. When `saveBlob` rejects (e.g. `createWriteStream` errors + * with ENOENT), the rejection was already caught for logging but the *raw* promise + * was still pushed into `outstandingBlobsToFinish`. A later + * `await Promise.all(outstandingBlobsToFinish)` inside `end_txn`'s `onCommit` then + * observed the rejection independently and propagated it out as `uncaughtException`. + * + * The fix (harper-pro PR #149) stores the catch-handled promise in + * `outstandingBlobsToFinish` instead, so `Promise.all` only ever sees a fulfilled + * promise; the log line stays exactly as before. + * + * This test installs a fault-injection component on receiver B that makes every Nth + * blob save fail with ENOENT, then drives blob-bearing replication from A and + * asserts: + * 1. The injection actually fired (otherwise the test is testing nothing). + * 2. `Blob save failed for from ` is logged per failure (the .catch). + * 3. `uncaughtException` for those failures NEVER appears in B's log. + * 4. B is still connected to A and B is still committing new records after the + * failures. + * + * If you're verifying this test catches the regression, revert + * harper-pro/replication/replicationConnection.ts `receiveBlobs` to push the raw + * `finished` promise — assertion 3 should fail with several uncaughtException lines. + */ + +import { suite, test, before, after } from 'node:test'; +import { equal, ok } from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import { + startHarper, + teardownHarper, + setupHarperWithFixture, + getNextAvailableLoopbackAddress, + targz, +} from '@harperfast/integration-testing'; +import { join } from 'node:path'; +import { sendOperation, fetchWithRetry, concurrent, readLog } from './clusterShared.mjs'; + +process.env.HARPER_INTEGRATION_TEST_INSTALL_SCRIPT = join( + import.meta.dirname ?? module.path, + '..', + '..', + 'dist', + 'bin', + 'harper.js' +); + +const FAIL_INTERVAL = 7; // every 7th /blobs/ createWriteStream fails on B +const BLOB_REQUESTS = 400; // /Location/{n} hits — each creates a blob on A → replicates to B + +suite('Receive-side blob save rejection containment', { timeout: 180000 }, (ctx) => { + before(async () => { + // Bring up A (clean) and B (with the fault-injection component preloaded). + const nodeA = { name: ctx.name, harper: { hostname: await getNextAvailableLoopbackAddress() } }; + const nodeB = { name: ctx.name, harper: { hostname: await getNextAvailableLoopbackAddress() } }; + const sharedConfig = (host) => ({ + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: host + ':9933' }, + }); + await startHarper(nodeA, { + config: sharedConfig(nodeA.harper.hostname), + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + // setupHarperWithFixture copies the fixture into dataRootDir/components/ + // before starting Harper, so the injector is in place by the time replication + // receivers come up. The env var arms it. + await setupHarperWithFixture(nodeB, join(import.meta.dirname, 'fixture-blob-fail-injector'), { + config: sharedConfig(nodeB.harper.hostname), + env: { + HARPER_NO_FLUSH_ON_EXIT: true, + HARPER_TEST_BLOB_FAIL_INTERVAL: String(FAIL_INTERVAL), + }, + }); + ctx.nodes = [nodeA.harper, nodeB.harper]; + + // Connect A↔B. + const tokenResp = await sendOperation(ctx.nodes[0], { + operation: 'create_authentication_tokens', + authorization: ctx.nodes[0].admin, + }); + await sendOperation(ctx.nodes[1], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[0].hostname, + authorization: 'Bearer ' + tokenResp.operation_token, + }); + for (let retries = 0; retries < 15; retries++) { + const status = await Promise.all(ctx.nodes.map((n) => sendOperation(n, { operation: 'cluster_status' }))); + if (status.every((r) => (r.connections ?? []).every((c) => (c.database_sockets ?? []).every((s) => s.connected)))) + break; + await delay(200 * (retries + 1)); + } + + // Deploy a blob-bearing component to A. `replicated: true` causes it to be + // installed on B as well, where the injector + LargeLocation coexist. + // Using fixture-large-blob-source rather than the shared `fixture/` because + // the latter's blobs are 7,500 bytes — under Harper's FILE_STORAGE_THRESHOLD + // (8192) so they're stored inline and never hit createWriteStream on B, + // defeating this test. LargeLocation produces 50 KB blobs which always + // go through the file-backed write path. + const payload = await targz(join(import.meta.dirname, 'fixture-large-blob-source')); + const deployResp = await sendOperation(ctx.nodes[0], { + operation: 'deploy_component', + project: 'large-blob-source', + payload, + replicated: true, + restart: true, + }); + equal(deployResp.message, 'Successfully deployed: large-blob-source, restarting Harper'); + // Give both nodes time to come back up after the restart. + await delay(35000); + + // Sanity: confirm B logged the injector banner. If this isn't there, the test + // would silently pass without exercising the failure path. + const bootLog = await readLog(ctx.nodes[1]); + ok( + bootLog.includes('[blob-fail-injector] installed'), + 'fault injector did not load on B — test would not exercise the failure path' + ); + }); + + after(async () => { + if (!ctx.nodes) return; + await Promise.all(ctx.nodes.map((n) => teardownHarper({ harper: n }))); + }); + + test('blob save ENOENT is logged exactly once and never escapes as uncaughtException', async () => { + const [A, B] = ctx.nodes; + + // Generate blob-bearing replication traffic by hitting /LargeLocation/{id} on A. + // Each request triggers `sourcedFrom.get(id)` on A → record + 50 KB streamed + // blob committed → replicated to B → B's createWriteStream is patched to fail + // every FAIL_INTERVALth call. + const { execute, finish } = concurrent( + () => fetchWithRetry(A.httpURL + '/LargeLocation/' + Math.floor(Math.random() * BLOB_REQUESTS)), + 20 + ); + for (let i = 0; i < BLOB_REQUESTS; i++) await execute(); + await finish(); + + // Let replication drain. The fixture's `sourcedFrom.get` deliberately delays + // inside its async generator so the stream takes some real wall-clock time. + await delay(8000); + + const log = await readLog(B); + + // Assertion 1: the injector actually injected something (otherwise the rest is moot). + const injectionMatches = log.match(/\[blob-fail-injector\] /g) ?? []; + ok( + injectionMatches.length >= 1, + `expected the injector to have logged at least once, found ${injectionMatches.length}` + ); + + // Assertion 2: the .catch in receiveBlobs logged per-failure. + const saveFailedMatches = log.match(/\[error\] \[replication\]: Blob save failed for /g) ?? []; + ok(saveFailedMatches.length > 0, 'expected at least one `Blob save failed` line — injection did not fire'); + + // Assertion 3 — the regression itself: no `uncaughtException` with a blob payload. + // Match both the bare "uncaughtException" and the typical accompanying "Blob error". + const uncaughtBlobLines = log + .split('\n') + .filter((line) => /uncaughtException/.test(line) && /(Blob|ENOENT.*blobs)/.test(line)); + equal( + uncaughtBlobLines.length, + 0, + `blob save failure escaped as uncaughtException ${uncaughtBlobLines.length} time(s):\n` + + uncaughtBlobLines.slice(0, 5).join('\n') + ); + + // Assertion 4: B is still connected to A and still committing — failures didn't + // break the channel. + const status = await sendOperation(B, { operation: 'cluster_status' }); + const aConn = (status.connections ?? []).find((c) => (c.url ?? c.name ?? '').includes(A.hostname)); + ok(aConn, `B should still see A in its cluster_status connections`); + ok( + (aConn.database_sockets ?? []).every((s) => s.connected), + 'B should still report A connected on every database socket' + ); + + // Liveness: a fresh write on A should still propagate to B. We check via + // `describe_table` on both sides — a direct, unambiguous count comparison + // that doesn't depend on REST routing semantics for a `sourcedFrom` table + // (where GET on a partial record can re-invoke the cache miss handler). + const beforeB = (await sendOperation(B, { operation: 'describe_table', table: 'LargeLocation' })).record_count; + await sendOperation(A, { + operation: 'upsert', + database: 'data', + table: 'LargeLocation', + records: [{ id: 999_999, name: 'liveness probe' }], + }); + let liveness = false; + for (let r = 0; r < 20; r++) { + const afterB = (await sendOperation(B, { operation: 'describe_table', table: 'LargeLocation' })).record_count; + if (afterB > beforeB) { + liveness = true; + break; + } + await delay(500); + } + ok(liveness, 'replication remained broken after blob save failures — liveness probe did not arrive on B'); + + console.log( + `blob save containment: ${saveFailedMatches.length} injected ENOENTs, ` + + `${uncaughtBlobLines.length} escaped as uncaughtException` + ); + }); +}); diff --git a/integrationTests/cluster/clusterShared.mjs b/integrationTests/cluster/clusterShared.mjs index b2fc69066..244f78f35 100644 --- a/integrationTests/cluster/clusterShared.mjs +++ b/integrationTests/cluster/clusterShared.mjs @@ -58,3 +58,138 @@ export function concurrent(task, concurrency = 20) { }, }; } + +/** + * Read the hdb.log file for a given Harper node. + * + * When the integration-testing harness is configured with + * `HARPER_INTEGRATION_TEST_LOG_DIR` (as it is in CI), Harper's `logging.root` is + * redirected to a per-suite directory exposed on `ctx.harper.logDir` rather than + * `{dataRootDir}/log`. We check both so the helper works locally and in CI. + * + * Reads the full file each time — fine for short replays, callers needing only + * recent lines can filter by timestamp themselves. + */ +export async function readLog(node) { + const { readFile } = await import('node:fs/promises'); + const { join } = await import('node:path'); + const candidates = []; + if (node.logDir) candidates.push(join(node.logDir, 'hdb.log')); + if (node.dataRootDir) candidates.push(join(node.dataRootDir, 'log', 'hdb.log')); + for (const path of candidates) { + try { + return await readFile(path, 'utf8'); + } catch (err) { + if (err.code !== 'ENOENT') throw err; + } + } + return ''; +} + +/** + * Poll `cluster_status` on `receiver` until it reports a `lastReceivedVersion` for + * `source` greater than the version captured *now* on `source` itself. Returns the + * final receiver-side version when caught up, throws on timeout. + * + * @param {Object} receiver - The catching-up Harper node + * @param {Object} source - The Harper node we expect to be replicating *from* + * @param {Object} [opts] + * @param {number} [opts.timeoutMs=120000] + * @param {number} [opts.pollMs=500] + */ +export async function waitForCatchUp(receiver, source, opts = {}) { + const timeoutMs = opts.timeoutMs ?? 120000; + const pollMs = opts.pollMs ?? 500; + // Capture source's version threshold up front. Catch-up = receiver's lastReceived + // for this connection >= sourceTarget. + const sourceStatus = await sendOperation(source, { operation: 'cluster_status' }); + // We want a version that's been written on `source` (i.e. its own outgoing replication + // state). Use the highest `lastReceivedVersion` it tracks across its connections as a + // proxy for "writes have flowed through" — or just stamp `Date.now()` if no peers yet. + let sourceTarget = 0; + for (const conn of sourceStatus.connections ?? []) { + for (const sock of conn.database_sockets ?? []) { + if (typeof sock.lastReceivedVersion === 'number' && sock.lastReceivedVersion > sourceTarget) { + sourceTarget = sock.lastReceivedVersion; + } + } + } + // If we couldn't infer one, fall back to a wall-clock-ish stamp; replication versions + // are timestamp-derived so this is a safe upper bound for "before the test started". + if (sourceTarget === 0) sourceTarget = Date.now() - 60_000; + + const sourceHostname = source.hostname; + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const receiverStatus = await sendOperation(receiver, { operation: 'cluster_status' }); + const sourceConn = (receiverStatus.connections ?? []).find((c) => (c.url ?? c.name ?? '').includes(sourceHostname)); + if (sourceConn) { + const versions = (sourceConn.database_sockets ?? []) + .map((s) => s.lastReceivedVersion) + .filter((v) => typeof v === 'number'); + if (versions.length && Math.min(...versions) >= sourceTarget) return Math.min(...versions); + } + await delay(pollMs); + } + throw new Error(`Timed out after ${timeoutMs}ms waiting for ${receiver.hostname} to catch up to ${sourceHostname}`); +} + +/** + * Snapshot of memory state on a single node. The shape is stable for tests so we don't + * depend on `system_information`'s evolving structure: main-process RSS plus the most + * informative per-thread heap metric. (Per-worker RSS isn't reported individually by + * Harper; we use heap+external as a proxy for in-flight allocation pressure inside + * the worker.) + * + * @typedef {Object} NodeMemorySnapshot + * @property {number} t - Date.now() at sample time + * @property {number} rss - main process resident-set in bytes (process.memoryUsage().rss) + * @property {Array<{threadId:number,heapUsed:number,externalMemory:number,arrayBuffers:number}>} threads + */ + +/** + * Fetch a single memory snapshot via `system_information`. + * Returns `null` if the call fails (transient during restart) — callers should treat + * a few `null`s near a kill/restart as normal. + * + * @param {Object} node + * @returns {Promise} + */ +export async function getMemoryInfo(node) { + try { + const info = await sendOperation(node, { + operation: 'system_information', + attributes: ['memory', 'threads'], + }); + const threads = (info.threads ?? []).map((t) => ({ + threadId: t.threadId ?? 0, + heapUsed: t.heapUsed ?? 0, + externalMemory: t.externalMemory ?? 0, + arrayBuffers: t.arrayBuffers ?? 0, + })); + // system_information.memory contains the spread of process.memoryUsage() on the + // main thread — `rss` is the field we care about for total footprint. + const rss = info.memory?.rss ?? 0; + return { t: Date.now(), rss, threads }; + } catch { + return null; + } +} + +/** + * Compute peak resident-set and peak per-worker heap+external across a series of + * snapshots, ignoring `null` entries (returned when sampling races a restart). + */ +export function peakMemory(samples) { + let peakRss = 0; + let peakWorkerHeapExt = 0; + for (const s of samples) { + if (!s) continue; + if (s.rss > peakRss) peakRss = s.rss; + for (const t of s.threads) { + const used = (t.heapUsed || 0) + (t.externalMemory || 0) + (t.arrayBuffers || 0); + if (used > peakWorkerHeapExt) peakWorkerHeapExt = used; + } + } + return { peakRss, peakWorkerHeapExt }; +} diff --git a/integrationTests/cluster/fixture-blob-fail-injector/config.yaml b/integrationTests/cluster/fixture-blob-fail-injector/config.yaml new file mode 100644 index 000000000..2a373242e --- /dev/null +++ b/integrationTests/cluster/fixture-blob-fail-injector/config.yaml @@ -0,0 +1,2 @@ +jsResource: + files: resources.js # entry module — runs at component load diff --git a/integrationTests/cluster/fixture-blob-fail-injector/resources.js b/integrationTests/cluster/fixture-blob-fail-injector/resources.js new file mode 100644 index 000000000..6ac72269e --- /dev/null +++ b/integrationTests/cluster/fixture-blob-fail-injector/resources.js @@ -0,0 +1,47 @@ +// Test-only component: monkey-patches `fs.createWriteStream` so that every Nth call +// targeting the blob storage tree fails asynchronously with `ENOENT`. Drives the +// receive-side blob save path in `replication/replicationConnection.ts:receiveBlobs` +// into its rejection branch on demand. Used by `blobSaveRejectionContainment.test.mjs`. +// +// We patch the CJS module object obtained via `createRequire` rather than mutating +// an ESM namespace (which is frozen). Harper's dist code uses `require('node:fs')`, +// so the live property is what `createWriteStream` callers look up at call time. +// Toggle on with `HARPER_TEST_BLOB_FAIL_INTERVAL=`. +import { createRequire } from 'node:module'; + +const interval = Number.parseInt(process.env.HARPER_TEST_BLOB_FAIL_INTERVAL || '0', 10); +if (Number.isFinite(interval) && interval > 0) { + const require = createRequire(import.meta.url); + const fs = require('node:fs'); + const { Writable } = require('node:stream'); + const realCreateWriteStream = fs.createWriteStream; + let counter = 0; + fs.createWriteStream = function patchedCreateWriteStream(path) { + if (typeof path === 'string' && path.includes('/blobs/')) { + counter++; + if (counter % interval === 0) { + // Emit ENOENT on next tick so the caller's listeners are wired first — + // matches how real createWriteStream surfaces fs.open failures. + const stream = new Writable({ + write(_chunk, _enc, cb) { + cb(new Error('test-injected: stream torn down')); + }, + }); + stream.fd = null; + process.nextTick(() => { + const err = new Error("ENOENT: no such file or directory, open '" + path + "'"); + err.code = 'ENOENT'; + err.errno = -2; + err.syscall = 'open'; + err.path = path; + stream.emit('error', err); + }); + return stream; + } + } + return realCreateWriteStream.apply(this, arguments); + }; + // One-line marker so tests can assert the patch actually installed. + // Plain console.log goes to the test runner via the harness's log redirect. + console.log('[blob-fail-injector] installed; failing every ' + interval + 'th /blobs/ createWriteStream'); +} diff --git a/integrationTests/cluster/fixture-large-blob-source/config.yaml b/integrationTests/cluster/fixture-large-blob-source/config.yaml new file mode 100644 index 000000000..fb8a45440 --- /dev/null +++ b/integrationTests/cluster/fixture-large-blob-source/config.yaml @@ -0,0 +1,5 @@ +rest: true +graphqlSchema: + files: '*.graphql' +jsResource: + files: resources.js diff --git a/integrationTests/cluster/fixture-large-blob-source/resources.js b/integrationTests/cluster/fixture-large-blob-source/resources.js new file mode 100644 index 000000000..aefafe526 --- /dev/null +++ b/integrationTests/cluster/fixture-large-blob-source/resources.js @@ -0,0 +1,25 @@ +import { randomBytes } from 'node:crypto'; +import { Readable } from 'node:stream'; + +// Streamed blob source whose payload is large enough (~50 KB) to exceed Harper's +// FILE_STORAGE_THRESHOLD (8192 bytes), guaranteeing the receiver's blob save goes +// through `createWriteStream` — which the companion fixture-blob-fail-injector +// monkey-patches. Smaller blobs are stored inline within the record and never +// touch the filesystem on the receive side, which would defeat the test. +tables.LargeLocation.sourcedFrom({ + get(id) { + const image = createBlob( + Readable.from( + (async function* () { + // 50 chunks × 1024 bytes = 51200 bytes per blob + for (let i = 0; i < 50; i++) yield randomBytes(1024); + })() + ) + ); + return { + id, + name: 'large location ' + id, + image, + }; + }, +}); diff --git a/integrationTests/cluster/fixture-large-blob-source/schema.graphql b/integrationTests/cluster/fixture-large-blob-source/schema.graphql new file mode 100644 index 000000000..2e2d5d1e8 --- /dev/null +++ b/integrationTests/cluster/fixture-large-blob-source/schema.graphql @@ -0,0 +1,5 @@ +type LargeLocation @table @export { + id: Long @primaryKey + name: String @indexed + image: Blob +} diff --git a/integrationTests/cluster/receiveBacklogMemory.test.mjs b/integrationTests/cluster/receiveBacklogMemory.test.mjs new file mode 100644 index 000000000..1132d32e5 --- /dev/null +++ b/integrationTests/cluster/receiveBacklogMemory.test.mjs @@ -0,0 +1,211 @@ +/** + * Regression guard for the receive-side memory crash that knocked a production node + * off the cluster in early May. Reproducer: peer A goes offline, A's leader writes + * thousands of records (in some cases inside a single large transaction → single + * large WS message), peer A restarts, and decoding the backlog synchronously in + * `onWSMessage` overruns the 2 GB old-gen limit. + * + * Fix landed in harper-pro main as `replication_receiveEventHighWaterMark`: the + * per-record `do { ... } while (...)` loop now checks the consumer queue length + * and awaits drain when it exceeds the HWM, pausing the WS in the meantime. + * + * This test forces the same backlog shape (big single-message batches via multi- + * record upserts), restarts the catching-up node, and asserts that: + * 1. The worker process never restarts during catch-up (no ERR_WORKER_OUT_OF_MEMORY). + * 2. Peak resident-set stays well under what an unbounded decode would produce. + * 3. Catch-up actually completes (no progress = no fix, just a no-op). + * + * The bound (1.5 GB peak RSS) is intentionally generous; the bug burst past 2 GB + * inside ~25s and either OOM'd or got killed. Anything under that is "the + * backpressure is doing something." + */ + +import { suite, test, before, after } from 'node:test'; +import { ok } from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import { + startHarper, + teardownHarper, + killHarper, + getNextAvailableLoopbackAddress, +} from '@harperfast/integration-testing'; +import { join } from 'node:path'; +import { sendOperation, concurrent, readLog, getMemoryInfo, peakMemory } from './clusterShared.mjs'; + +process.env.HARPER_INTEGRATION_TEST_INSTALL_SCRIPT = join( + import.meta.dirname ?? module.path, + '..', + '..', + 'dist', + 'bin', + 'harper.js' +); + +const NODE_COUNT = 2; +const BACKLOG_TRANSACTIONS = 40; // each carries a 500-record batch → 20 000 records total +const BATCH_SIZE = 500; // tuned to comfortably exceed RECEIVE_EVENT_HIGH_WATER_MARK = 100 + +suite('Replication receive-side backlog memory bound', { timeout: 240000 }, (ctx) => { + before(async () => { + ctx.nodes = await Promise.all( + Array(NODE_COUNT) + .fill(null) + .map(async () => { + const nodeCtx = { + name: ctx.name, + harper: { hostname: await getNextAvailableLoopbackAddress() }, + }; + await startHarper(nodeCtx, { + config: { + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: nodeCtx.harper.hostname + ':9933' }, + }, + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + return nodeCtx.harper; + }) + ); + // table on both nodes + await Promise.all( + ctx.nodes.map((node) => + sendOperation(node, { + operation: 'create_table', + table: 'load', + primary_key: 'id', + attributes: [ + { name: 'id', type: 'String' }, + { name: 'payload', type: 'String' }, + ], + }) + ) + ); + // connect: node 1 adds node 0 + const tokenResp = await sendOperation(ctx.nodes[0], { + operation: 'create_authentication_tokens', + authorization: ctx.nodes[0].admin, + }); + await sendOperation(ctx.nodes[1], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[0].hostname, + authorization: 'Bearer ' + tokenResp.operation_token, + }); + // Wait for connection — mirror the increasing-delay pattern from + // replicationLoad.test.mjs so total wait can reach ~20s on a slow runner. + let connected = false; + for (let retries = 0; retries < 15; retries++) { + const responses = await Promise.all(ctx.nodes.map((n) => sendOperation(n, { operation: 'cluster_status' }))); + const allConnected = responses.every( + (r) => + (r.connections ?? []).length === NODE_COUNT - 1 && + (r.connections ?? []).every((c) => (c.database_sockets ?? []).every((s) => s.connected)) + ); + if (allConnected) { + connected = true; + break; + } + await delay(200 * (retries + 1)); + } + if (!connected) throw new Error('Cluster failed to connect in time'); + }); + + after(async () => { + if (!ctx.nodes) return; + await Promise.all(ctx.nodes.map((n) => teardownHarper({ harper: n }))); + }); + + test('B catches up a multi-thousand-record backlog without OOM or worker restart', async () => { + const [A, B] = ctx.nodes; + + // Take B offline before generating the backlog. Reusing the original ctx + // preserves dataRootDir + hostname so the restart resumes against the same DB. + const nodeCtxB = { name: ctx.name, harper: { dataRootDir: B.dataRootDir, hostname: B.hostname } }; + await killHarper({ harper: B }); + + // Build a backlog on A. Each upsert here is a single transaction with BATCH_SIZE + // records → one WS message carrying 500 audit entries. The pre-fix code path + // decoded all 500 synchronously inside `onWSMessage`; the fix yields after the + // consumer queue exceeds RECEIVE_EVENT_HIGH_WATER_MARK (100). + const payloadStr = 'x'.repeat(256); + let written = 0; + const { execute, finish } = concurrent(async () => { + const id = written; + written += BATCH_SIZE; + const records = []; + for (let i = 0; i < BATCH_SIZE; i++) records.push({ id: `r${id + i}`, payload: payloadStr }); + await sendOperation(A, { operation: 'upsert', table: 'load', records }); + }, 4); + for (let i = 0; i < BACKLOG_TRANSACTIONS; i++) await execute(); + await finish(); + // allow A to flush + await delay(500); + + // Restart B and start sampling its memory. The operationsAPIURL is hostname:port + // based — both are preserved across restart, so the URL on the original B handle + // keeps working once the new process is listening. + const samples = []; + let sampling = true; + const sampler = (async () => { + while (sampling) { + samples.push(await getMemoryInfo(B)); + await delay(500); + } + })(); + + await startHarper(nodeCtxB, { + config: { + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: nodeCtxB.harper.hostname + ':9933' }, + }, + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + + // Poll record counts directly — unambiguous catch-up signal, no dependency on + // the shape of cluster_status's lastReceivedVersion fields. + const sourceCount = (await sendOperation(A, { operation: 'describe_table', table: 'load' })).record_count; + let caughtUp = false; + let lastReceiverCount = 0; + for (let r = 0; r < 360; r++) { + const describeB = await sendOperation(nodeCtxB.harper, { operation: 'describe_table', table: 'load' }); + lastReceiverCount = describeB.record_count; + if (lastReceiverCount >= sourceCount) { + caughtUp = true; + break; + } + await delay(500); + } + + sampling = false; + await sampler; + + // Assertion 1: catch-up actually happened. + ok( + caughtUp, + `catch-up did not complete: receiver record_count ${lastReceiverCount} < source ${sourceCount} after 180s` + ); + + // Assertion 2: no receive-side OOM marker in the log. + const log = await readLog(nodeCtxB.harper); + ok( + !log.includes('ERR_WORKER_OUT_OF_MEMORY'), + 'ERR_WORKER_OUT_OF_MEMORY appeared in B log; receive-side memory pressure is unbounded' + ); + + // Assertion 3: peak resident-set is comfortably under the unbounded-decode regime. + // The wtk failure burst past 2 GB old-gen inside a single message. Anything + // near or under 1.5 GB means the HWM-driven pause is taking effect. + const { peakRss } = peakMemory(samples); + const PEAK_RSS_LIMIT = 1.5 * 1024 * 1024 * 1024; + ok( + peakRss > 0 && peakRss < PEAK_RSS_LIMIT, + `peak RSS during catch-up was ${(peakRss / 1024 / 1024).toFixed(0)} MB, ` + + `expected < ${(PEAK_RSS_LIMIT / 1024 / 1024).toFixed(0)} MB ` + + `(unbounded receive decode would balloon past this)` + ); + console.log( + `receive backlog test: ${written} records caught up, peak RSS ${(peakRss / 1024 / 1024).toFixed(0)} MB` + ); + }); +});