From 49bdb0e09727c2626c93ec32eb03a9ac6417920d Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Thu, 14 May 2026 18:02:45 -0600 Subject: [PATCH 1/4] test: replication receive-side stress regressions (catch-up memory, blob save) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two cluster integration tests covering the receive-side failure modes that took a prod node off the cluster: 1. receiveBacklogMemory.test.mjs — guards PR #147's RECEIVE_EVENT_HIGH_WATER_MARK fix. Kills receiver B, bursts 40 transactions of 500 records each on A (each transaction = one WS message → 500 audit entries decoded), restarts B, samples memory while it catches up, asserts peak RSS < 1.5 GB and no ERR_WORKER_OUT_OF_MEMORY in the log. 2. blobSaveRejectionContainment.test.mjs — guards PR #149's contract that a rejected saveBlob promise is logged exactly once and never escapes onCommit as uncaughtException. Installs a fault-injection component on B only that monkey-patches fs.createWriteStream to fail every 7th /blobs/ write with ENOENT, drives Location-component blob traffic from A, asserts the "Blob save failed for ..." line appears but uncaughtException lines do not, and that liveness (a fresh write) still propagates after failures. Adds shared helpers to clusterShared.mjs: readLog, waitForCatchUp, getMemoryInfo, peakMemory. The fault-injection fixture lives at integrationTests/cluster/fixture-blob-fail-injector/ and is opt-in via HARPER_TEST_BLOB_FAIL_INTERVAL env var. These exercise the same failure surface that affected wtk-ap-west-1 in May: unbounded synchronous decode on receive, and blob save rejections escaping the commit confirmation path. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../blobSaveRejectionContainment.test.mjs | 203 +++++++++++++++++ integrationTests/cluster/clusterShared.mjs | 125 +++++++++++ .../fixture-blob-fail-injector/config.yaml | 2 + .../fixture-blob-fail-injector/resources.js | 47 ++++ .../cluster/receiveBacklogMemory.test.mjs | 211 ++++++++++++++++++ 5 files changed, 588 insertions(+) create mode 100644 integrationTests/cluster/blobSaveRejectionContainment.test.mjs create mode 100644 integrationTests/cluster/fixture-blob-fail-injector/config.yaml create mode 100644 integrationTests/cluster/fixture-blob-fail-injector/resources.js create mode 100644 integrationTests/cluster/receiveBacklogMemory.test.mjs diff --git a/integrationTests/cluster/blobSaveRejectionContainment.test.mjs b/integrationTests/cluster/blobSaveRejectionContainment.test.mjs new file mode 100644 index 000000000..b7da9b9a4 --- /dev/null +++ b/integrationTests/cluster/blobSaveRejectionContainment.test.mjs @@ -0,0 +1,203 @@ +/** + * Regression guard for the receive-side blob save bug surfaced on the prod node that + * crashed earlier in May. When `saveBlob` rejects (e.g. `createWriteStream` errors + * with ENOENT), the rejection was already caught for logging but the *raw* promise + * was still pushed into `outstandingBlobsToFinish`. A later + * `await Promise.all(outstandingBlobsToFinish)` inside `end_txn`'s `onCommit` then + * observed the rejection independently and propagated it out as `uncaughtException`. + * + * The fix (harper-pro PR #149) stores the catch-handled promise in + * `outstandingBlobsToFinish` instead, so `Promise.all` only ever sees a fulfilled + * promise; the log line stays exactly as before. + * + * This test installs a fault-injection component on receiver B that makes every Nth + * blob save fail with ENOENT, then drives blob-bearing replication from A and + * asserts: + * 1. The injection actually fired (otherwise the test is testing nothing). + * 2. `Blob save failed for from ` is logged per failure (the .catch). + * 3. `uncaughtException` for those failures NEVER appears in B's log. + * 4. B is still connected to A and B is still committing new records after the + * failures. + * + * If you're verifying this test catches the regression, revert + * harper-pro/replication/replicationConnection.ts `receiveBlobs` to push the raw + * `finished` promise — assertion 3 should fail with several uncaughtException lines. + */ + +import { suite, test, before, after } from 'node:test'; +import { equal, ok } from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import { + startHarper, + teardownHarper, + setupHarperWithFixture, + getNextAvailableLoopbackAddress, + targz, +} from '@harperfast/integration-testing'; +import { join } from 'node:path'; +import { sendOperation, fetchWithRetry, concurrent, readLog } from './clusterShared.mjs'; + +process.env.HARPER_INTEGRATION_TEST_INSTALL_SCRIPT = join( + import.meta.dirname ?? module.path, + '..', + '..', + 'dist', + 'bin', + 'harper.js' +); + +const FAIL_INTERVAL = 7; // every 7th /blobs/ createWriteStream fails on B +const BLOB_REQUESTS = 400; // /Location/{n} hits — each creates a blob on A → replicates to B + +suite('Receive-side blob save rejection containment', { timeout: 180000 }, (ctx) => { + before(async () => { + // Bring up A (clean) and B (with the fault-injection component preloaded). + const nodeA = { name: ctx.name, harper: { hostname: await getNextAvailableLoopbackAddress() } }; + const nodeB = { name: ctx.name, harper: { hostname: await getNextAvailableLoopbackAddress() } }; + const sharedConfig = (host) => ({ + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: host + ':9933' }, + }); + await startHarper(nodeA, { + config: sharedConfig(nodeA.harper.hostname), + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + // setupHarperWithFixture copies the fixture into dataRootDir/components/ + // before starting Harper, so the injector is in place by the time replication + // receivers come up. The env var arms it. + await setupHarperWithFixture(nodeB, join(import.meta.dirname, 'fixture-blob-fail-injector'), { + config: sharedConfig(nodeB.harper.hostname), + env: { + HARPER_NO_FLUSH_ON_EXIT: true, + HARPER_TEST_BLOB_FAIL_INTERVAL: String(FAIL_INTERVAL), + }, + }); + ctx.nodes = [nodeA.harper, nodeB.harper]; + + // Connect A↔B. + const tokenResp = await sendOperation(ctx.nodes[0], { + operation: 'create_authentication_tokens', + authorization: ctx.nodes[0].admin, + }); + await sendOperation(ctx.nodes[1], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[0].hostname, + authorization: 'Bearer ' + tokenResp.operation_token, + }); + for (let retries = 0; retries < 15; retries++) { + const status = await Promise.all(ctx.nodes.map((n) => sendOperation(n, { operation: 'cluster_status' }))); + if (status.every((r) => (r.connections ?? []).every((c) => (c.database_sockets ?? []).every((s) => s.connected)))) + break; + await delay(200 * (retries + 1)); + } + + // Deploy the blob-bearing Location component to A. `replicated: true` causes + // it to be installed on B as well — the injector + Location coexist on B. + const payload = await targz(join(import.meta.dirname, 'fixture')); + const deployResp = await sendOperation(ctx.nodes[0], { + operation: 'deploy_component', + project: 'test-application', + payload, + replicated: true, + restart: true, + }); + equal(deployResp.message, 'Successfully deployed: test-application, restarting Harper'); + // Give both nodes time to come back up after the restart. + await delay(35000); + + // Sanity: confirm B logged the injector banner. If this isn't there, the test + // would silently pass without exercising the failure path. + const bootLog = await readLog(ctx.nodes[1]); + ok( + bootLog.includes('[blob-fail-injector] installed'), + 'fault injector did not load on B — test would not exercise the failure path' + ); + }); + + after(async () => { + if (!ctx.nodes) return; + await Promise.all(ctx.nodes.map((n) => teardownHarper({ harper: n }))); + }); + + test('blob save ENOENT is logged exactly once and never escapes as uncaughtException', async () => { + const [A, B] = ctx.nodes; + + // Generate blob-bearing replication traffic by hitting /Location/{id} on A. + // Each request triggers `sourcedFrom.get(id)` on A → record + streamed blob + // committed → replicated to B → B's createWriteStream is patched to fail + // every FAIL_INTERVALth call. + const { execute, finish } = concurrent( + () => fetchWithRetry(A.httpURL + '/Location/' + Math.floor(Math.random() * BLOB_REQUESTS)), + 20 + ); + for (let i = 0; i < BLOB_REQUESTS; i++) await execute(); + await finish(); + + // Let replication drain. The fixture's `sourcedFrom.get` deliberately delays + // inside its async generator so the stream takes some real wall-clock time. + await delay(8000); + + const log = await readLog(B); + + // Assertion 1: the injector actually injected something (otherwise the rest is moot). + const injectionMatches = log.match(/\[blob-fail-injector\] /g) ?? []; + ok( + injectionMatches.length >= 1, + `expected the injector to have logged at least once, found ${injectionMatches.length}` + ); + + // Assertion 2: the .catch in receiveBlobs logged per-failure. + const saveFailedMatches = log.match(/\[error\] \[replication\]: Blob save failed for /g) ?? []; + ok(saveFailedMatches.length > 0, 'expected at least one `Blob save failed` line — injection did not fire'); + + // Assertion 3 — the regression itself: no `uncaughtException` with a blob payload. + // Match both the bare "uncaughtException" and the typical accompanying "Blob error". + const uncaughtBlobLines = log + .split('\n') + .filter((line) => /uncaughtException/.test(line) && /(Blob|ENOENT.*blobs)/.test(line)); + equal( + uncaughtBlobLines.length, + 0, + `blob save failure escaped as uncaughtException ${uncaughtBlobLines.length} time(s):\n` + + uncaughtBlobLines.slice(0, 5).join('\n') + ); + + // Assertion 4: B is still connected to A and still committing — failures didn't + // break the channel. + const status = await sendOperation(B, { operation: 'cluster_status' }); + const aConn = (status.connections ?? []).find((c) => (c.url ?? c.name ?? '').includes(A.hostname)); + ok(aConn, `B should still see A in its cluster_status connections`); + ok( + (aConn.database_sockets ?? []).every((s) => s.connected), + 'B should still report A connected on every database socket' + ); + + // Liveness: a fresh write on A should still propagate to B. + await sendOperation(A, { + operation: 'upsert', + database: 'data', + table: 'Location', + records: [{ id: 999_999, name: 'liveness probe', random: 0.5 }], + }); + let liveness = false; + for (let r = 0; r < 20; r++) { + const resp = await fetchWithRetry(B.httpURL + '/Location/999999', { retries: 0 }).catch(() => null); + if (resp?.ok) { + const body = await resp.json(); + if (body?.name === 'liveness probe') { + liveness = true; + break; + } + } + await delay(500); + } + ok(liveness, 'replication remained broken after blob save failures — liveness probe did not arrive on B'); + + console.log( + `blob save containment: ${saveFailedMatches.length} injected ENOENTs, ` + + `${uncaughtBlobLines.length} escaped as uncaughtException` + ); + }); +}); diff --git a/integrationTests/cluster/clusterShared.mjs b/integrationTests/cluster/clusterShared.mjs index b2fc69066..e2ae3d4c5 100644 --- a/integrationTests/cluster/clusterShared.mjs +++ b/integrationTests/cluster/clusterShared.mjs @@ -58,3 +58,128 @@ export function concurrent(task, concurrency = 20) { }, }; } + +/** + * Read the hdb.log file for a given Harper node. + * Reads the full file each time — fine for short replays, callers needing only + * recent lines can filter by timestamp themselves. + */ +export async function readLog(node) { + const { readFile } = await import('node:fs/promises'); + const { join } = await import('node:path'); + const path = join(node.dataRootDir, 'log', 'hdb.log'); + try { + return await readFile(path, 'utf8'); + } catch (err) { + if (err.code === 'ENOENT') return ''; + throw err; + } +} + +/** + * Poll `cluster_status` on `receiver` until it reports a `lastReceivedVersion` for + * `source` greater than the version captured *now* on `source` itself. Returns the + * final receiver-side version when caught up, throws on timeout. + * + * @param {Object} receiver - The catching-up Harper node + * @param {Object} source - The Harper node we expect to be replicating *from* + * @param {Object} [opts] + * @param {number} [opts.timeoutMs=120000] + * @param {number} [opts.pollMs=500] + */ +export async function waitForCatchUp(receiver, source, opts = {}) { + const timeoutMs = opts.timeoutMs ?? 120000; + const pollMs = opts.pollMs ?? 500; + // Capture source's version threshold up front. Catch-up = receiver's lastReceived + // for this connection >= sourceTarget. + const sourceStatus = await sendOperation(source, { operation: 'cluster_status' }); + // We want a version that's been written on `source` (i.e. its own outgoing replication + // state). Use the highest `lastReceivedVersion` it tracks across its connections as a + // proxy for "writes have flowed through" — or just stamp `Date.now()` if no peers yet. + let sourceTarget = 0; + for (const conn of sourceStatus.connections ?? []) { + for (const sock of conn.database_sockets ?? []) { + if (typeof sock.lastReceivedVersion === 'number' && sock.lastReceivedVersion > sourceTarget) { + sourceTarget = sock.lastReceivedVersion; + } + } + } + // If we couldn't infer one, fall back to a wall-clock-ish stamp; replication versions + // are timestamp-derived so this is a safe upper bound for "before the test started". + if (sourceTarget === 0) sourceTarget = Date.now() - 60_000; + + const sourceHostname = source.hostname; + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const receiverStatus = await sendOperation(receiver, { operation: 'cluster_status' }); + const sourceConn = (receiverStatus.connections ?? []).find((c) => (c.url ?? c.name ?? '').includes(sourceHostname)); + if (sourceConn) { + const versions = (sourceConn.database_sockets ?? []) + .map((s) => s.lastReceivedVersion) + .filter((v) => typeof v === 'number'); + if (versions.length && Math.min(...versions) >= sourceTarget) return Math.min(...versions); + } + await delay(pollMs); + } + throw new Error(`Timed out after ${timeoutMs}ms waiting for ${receiver.hostname} to catch up to ${sourceHostname}`); +} + +/** + * Snapshot of memory state on a single node. The shape is stable for tests so we don't + * depend on `system_information`'s evolving structure: main-process RSS plus the most + * informative per-thread heap metric. (Per-worker RSS isn't reported individually by + * Harper; we use heap+external as a proxy for in-flight allocation pressure inside + * the worker.) + * + * @typedef {Object} NodeMemorySnapshot + * @property {number} t - Date.now() at sample time + * @property {number} rss - main process resident-set in bytes (process.memoryUsage().rss) + * @property {Array<{threadId:number,heapUsed:number,externalMemory:number,arrayBuffers:number}>} threads + */ + +/** + * Fetch a single memory snapshot via `system_information`. + * Returns `null` if the call fails (transient during restart) — callers should treat + * a few `null`s near a kill/restart as normal. + * + * @param {Object} node + * @returns {Promise} + */ +export async function getMemoryInfo(node) { + try { + const info = await sendOperation(node, { + operation: 'system_information', + attributes: ['memory', 'threads'], + }); + const threads = (info.threads ?? []).map((t) => ({ + threadId: t.threadId ?? 0, + heapUsed: t.heapUsed ?? 0, + externalMemory: t.externalMemory ?? 0, + arrayBuffers: t.arrayBuffers ?? 0, + })); + // system_information.memory contains the spread of process.memoryUsage() on the + // main thread — `rss` is the field we care about for total footprint. + const rss = info.memory?.rss ?? 0; + return { t: Date.now(), rss, threads }; + } catch { + return null; + } +} + +/** + * Compute peak resident-set and peak per-worker heap+external across a series of + * snapshots, ignoring `null` entries (returned when sampling races a restart). + */ +export function peakMemory(samples) { + let peakRss = 0; + let peakWorkerHeapExt = 0; + for (const s of samples) { + if (!s) continue; + if (s.rss > peakRss) peakRss = s.rss; + for (const t of s.threads) { + const used = (t.heapUsed || 0) + (t.externalMemory || 0) + (t.arrayBuffers || 0); + if (used > peakWorkerHeapExt) peakWorkerHeapExt = used; + } + } + return { peakRss, peakWorkerHeapExt }; +} diff --git a/integrationTests/cluster/fixture-blob-fail-injector/config.yaml b/integrationTests/cluster/fixture-blob-fail-injector/config.yaml new file mode 100644 index 000000000..2a373242e --- /dev/null +++ b/integrationTests/cluster/fixture-blob-fail-injector/config.yaml @@ -0,0 +1,2 @@ +jsResource: + files: resources.js # entry module — runs at component load diff --git a/integrationTests/cluster/fixture-blob-fail-injector/resources.js b/integrationTests/cluster/fixture-blob-fail-injector/resources.js new file mode 100644 index 000000000..6ac72269e --- /dev/null +++ b/integrationTests/cluster/fixture-blob-fail-injector/resources.js @@ -0,0 +1,47 @@ +// Test-only component: monkey-patches `fs.createWriteStream` so that every Nth call +// targeting the blob storage tree fails asynchronously with `ENOENT`. Drives the +// receive-side blob save path in `replication/replicationConnection.ts:receiveBlobs` +// into its rejection branch on demand. Used by `blobSaveRejectionContainment.test.mjs`. +// +// We patch the CJS module object obtained via `createRequire` rather than mutating +// an ESM namespace (which is frozen). Harper's dist code uses `require('node:fs')`, +// so the live property is what `createWriteStream` callers look up at call time. +// Toggle on with `HARPER_TEST_BLOB_FAIL_INTERVAL=`. +import { createRequire } from 'node:module'; + +const interval = Number.parseInt(process.env.HARPER_TEST_BLOB_FAIL_INTERVAL || '0', 10); +if (Number.isFinite(interval) && interval > 0) { + const require = createRequire(import.meta.url); + const fs = require('node:fs'); + const { Writable } = require('node:stream'); + const realCreateWriteStream = fs.createWriteStream; + let counter = 0; + fs.createWriteStream = function patchedCreateWriteStream(path) { + if (typeof path === 'string' && path.includes('/blobs/')) { + counter++; + if (counter % interval === 0) { + // Emit ENOENT on next tick so the caller's listeners are wired first — + // matches how real createWriteStream surfaces fs.open failures. + const stream = new Writable({ + write(_chunk, _enc, cb) { + cb(new Error('test-injected: stream torn down')); + }, + }); + stream.fd = null; + process.nextTick(() => { + const err = new Error("ENOENT: no such file or directory, open '" + path + "'"); + err.code = 'ENOENT'; + err.errno = -2; + err.syscall = 'open'; + err.path = path; + stream.emit('error', err); + }); + return stream; + } + } + return realCreateWriteStream.apply(this, arguments); + }; + // One-line marker so tests can assert the patch actually installed. + // Plain console.log goes to the test runner via the harness's log redirect. + console.log('[blob-fail-injector] installed; failing every ' + interval + 'th /blobs/ createWriteStream'); +} diff --git a/integrationTests/cluster/receiveBacklogMemory.test.mjs b/integrationTests/cluster/receiveBacklogMemory.test.mjs new file mode 100644 index 000000000..1132d32e5 --- /dev/null +++ b/integrationTests/cluster/receiveBacklogMemory.test.mjs @@ -0,0 +1,211 @@ +/** + * Regression guard for the receive-side memory crash that knocked a production node + * off the cluster in early May. Reproducer: peer A goes offline, A's leader writes + * thousands of records (in some cases inside a single large transaction → single + * large WS message), peer A restarts, and decoding the backlog synchronously in + * `onWSMessage` overruns the 2 GB old-gen limit. + * + * Fix landed in harper-pro main as `replication_receiveEventHighWaterMark`: the + * per-record `do { ... } while (...)` loop now checks the consumer queue length + * and awaits drain when it exceeds the HWM, pausing the WS in the meantime. + * + * This test forces the same backlog shape (big single-message batches via multi- + * record upserts), restarts the catching-up node, and asserts that: + * 1. The worker process never restarts during catch-up (no ERR_WORKER_OUT_OF_MEMORY). + * 2. Peak resident-set stays well under what an unbounded decode would produce. + * 3. Catch-up actually completes (no progress = no fix, just a no-op). + * + * The bound (1.5 GB peak RSS) is intentionally generous; the bug burst past 2 GB + * inside ~25s and either OOM'd or got killed. Anything under that is "the + * backpressure is doing something." + */ + +import { suite, test, before, after } from 'node:test'; +import { ok } from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import { + startHarper, + teardownHarper, + killHarper, + getNextAvailableLoopbackAddress, +} from '@harperfast/integration-testing'; +import { join } from 'node:path'; +import { sendOperation, concurrent, readLog, getMemoryInfo, peakMemory } from './clusterShared.mjs'; + +process.env.HARPER_INTEGRATION_TEST_INSTALL_SCRIPT = join( + import.meta.dirname ?? module.path, + '..', + '..', + 'dist', + 'bin', + 'harper.js' +); + +const NODE_COUNT = 2; +const BACKLOG_TRANSACTIONS = 40; // each carries a 500-record batch → 20 000 records total +const BATCH_SIZE = 500; // tuned to comfortably exceed RECEIVE_EVENT_HIGH_WATER_MARK = 100 + +suite('Replication receive-side backlog memory bound', { timeout: 240000 }, (ctx) => { + before(async () => { + ctx.nodes = await Promise.all( + Array(NODE_COUNT) + .fill(null) + .map(async () => { + const nodeCtx = { + name: ctx.name, + harper: { hostname: await getNextAvailableLoopbackAddress() }, + }; + await startHarper(nodeCtx, { + config: { + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: nodeCtx.harper.hostname + ':9933' }, + }, + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + return nodeCtx.harper; + }) + ); + // table on both nodes + await Promise.all( + ctx.nodes.map((node) => + sendOperation(node, { + operation: 'create_table', + table: 'load', + primary_key: 'id', + attributes: [ + { name: 'id', type: 'String' }, + { name: 'payload', type: 'String' }, + ], + }) + ) + ); + // connect: node 1 adds node 0 + const tokenResp = await sendOperation(ctx.nodes[0], { + operation: 'create_authentication_tokens', + authorization: ctx.nodes[0].admin, + }); + await sendOperation(ctx.nodes[1], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[0].hostname, + authorization: 'Bearer ' + tokenResp.operation_token, + }); + // Wait for connection — mirror the increasing-delay pattern from + // replicationLoad.test.mjs so total wait can reach ~20s on a slow runner. + let connected = false; + for (let retries = 0; retries < 15; retries++) { + const responses = await Promise.all(ctx.nodes.map((n) => sendOperation(n, { operation: 'cluster_status' }))); + const allConnected = responses.every( + (r) => + (r.connections ?? []).length === NODE_COUNT - 1 && + (r.connections ?? []).every((c) => (c.database_sockets ?? []).every((s) => s.connected)) + ); + if (allConnected) { + connected = true; + break; + } + await delay(200 * (retries + 1)); + } + if (!connected) throw new Error('Cluster failed to connect in time'); + }); + + after(async () => { + if (!ctx.nodes) return; + await Promise.all(ctx.nodes.map((n) => teardownHarper({ harper: n }))); + }); + + test('B catches up a multi-thousand-record backlog without OOM or worker restart', async () => { + const [A, B] = ctx.nodes; + + // Take B offline before generating the backlog. Reusing the original ctx + // preserves dataRootDir + hostname so the restart resumes against the same DB. + const nodeCtxB = { name: ctx.name, harper: { dataRootDir: B.dataRootDir, hostname: B.hostname } }; + await killHarper({ harper: B }); + + // Build a backlog on A. Each upsert here is a single transaction with BATCH_SIZE + // records → one WS message carrying 500 audit entries. The pre-fix code path + // decoded all 500 synchronously inside `onWSMessage`; the fix yields after the + // consumer queue exceeds RECEIVE_EVENT_HIGH_WATER_MARK (100). + const payloadStr = 'x'.repeat(256); + let written = 0; + const { execute, finish } = concurrent(async () => { + const id = written; + written += BATCH_SIZE; + const records = []; + for (let i = 0; i < BATCH_SIZE; i++) records.push({ id: `r${id + i}`, payload: payloadStr }); + await sendOperation(A, { operation: 'upsert', table: 'load', records }); + }, 4); + for (let i = 0; i < BACKLOG_TRANSACTIONS; i++) await execute(); + await finish(); + // allow A to flush + await delay(500); + + // Restart B and start sampling its memory. The operationsAPIURL is hostname:port + // based — both are preserved across restart, so the URL on the original B handle + // keeps working once the new process is listening. + const samples = []; + let sampling = true; + const sampler = (async () => { + while (sampling) { + samples.push(await getMemoryInfo(B)); + await delay(500); + } + })(); + + await startHarper(nodeCtxB, { + config: { + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: nodeCtxB.harper.hostname + ':9933' }, + }, + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + + // Poll record counts directly — unambiguous catch-up signal, no dependency on + // the shape of cluster_status's lastReceivedVersion fields. + const sourceCount = (await sendOperation(A, { operation: 'describe_table', table: 'load' })).record_count; + let caughtUp = false; + let lastReceiverCount = 0; + for (let r = 0; r < 360; r++) { + const describeB = await sendOperation(nodeCtxB.harper, { operation: 'describe_table', table: 'load' }); + lastReceiverCount = describeB.record_count; + if (lastReceiverCount >= sourceCount) { + caughtUp = true; + break; + } + await delay(500); + } + + sampling = false; + await sampler; + + // Assertion 1: catch-up actually happened. + ok( + caughtUp, + `catch-up did not complete: receiver record_count ${lastReceiverCount} < source ${sourceCount} after 180s` + ); + + // Assertion 2: no receive-side OOM marker in the log. + const log = await readLog(nodeCtxB.harper); + ok( + !log.includes('ERR_WORKER_OUT_OF_MEMORY'), + 'ERR_WORKER_OUT_OF_MEMORY appeared in B log; receive-side memory pressure is unbounded' + ); + + // Assertion 3: peak resident-set is comfortably under the unbounded-decode regime. + // The wtk failure burst past 2 GB old-gen inside a single message. Anything + // near or under 1.5 GB means the HWM-driven pause is taking effect. + const { peakRss } = peakMemory(samples); + const PEAK_RSS_LIMIT = 1.5 * 1024 * 1024 * 1024; + ok( + peakRss > 0 && peakRss < PEAK_RSS_LIMIT, + `peak RSS during catch-up was ${(peakRss / 1024 / 1024).toFixed(0)} MB, ` + + `expected < ${(PEAK_RSS_LIMIT / 1024 / 1024).toFixed(0)} MB ` + + `(unbounded receive decode would balloon past this)` + ); + console.log( + `receive backlog test: ${written} records caught up, peak RSS ${(peakRss / 1024 / 1024).toFixed(0)} MB` + ); + }); +}); From e51f137e59f50219e1ff5503e7b0c8afa20064aa Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 18 May 2026 22:40:32 -0600 Subject: [PATCH 2/4] fix(tests): readLog must honor HARPER_INTEGRATION_TEST_LOG_DIR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI sets HARPER_INTEGRATION_TEST_LOG_DIR, which causes the integration-testing harness to redirect Harper's `logging.root` to a per-suite directory exposed on `ctx.harper.logDir` rather than `{dataRootDir}/log/hdb.log`. readLog() was only checking the dataRootDir path, so in CI it returned an empty string — making the blob-fail-injector banner assertion fail even when the component had loaded correctly. Check both locations now. The receiveBacklogMemory test was also affected (its no-OOM assertion was reading the wrong file) but happened to pass vacuously. Co-Authored-By: Claude Opus 4.7 (1M context) --- integrationTests/cluster/clusterShared.mjs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/integrationTests/cluster/clusterShared.mjs b/integrationTests/cluster/clusterShared.mjs index e2ae3d4c5..244f78f35 100644 --- a/integrationTests/cluster/clusterShared.mjs +++ b/integrationTests/cluster/clusterShared.mjs @@ -61,19 +61,29 @@ export function concurrent(task, concurrency = 20) { /** * Read the hdb.log file for a given Harper node. + * + * When the integration-testing harness is configured with + * `HARPER_INTEGRATION_TEST_LOG_DIR` (as it is in CI), Harper's `logging.root` is + * redirected to a per-suite directory exposed on `ctx.harper.logDir` rather than + * `{dataRootDir}/log`. We check both so the helper works locally and in CI. + * * Reads the full file each time — fine for short replays, callers needing only * recent lines can filter by timestamp themselves. */ export async function readLog(node) { const { readFile } = await import('node:fs/promises'); const { join } = await import('node:path'); - const path = join(node.dataRootDir, 'log', 'hdb.log'); - try { - return await readFile(path, 'utf8'); - } catch (err) { - if (err.code === 'ENOENT') return ''; - throw err; + const candidates = []; + if (node.logDir) candidates.push(join(node.logDir, 'hdb.log')); + if (node.dataRootDir) candidates.push(join(node.dataRootDir, 'log', 'hdb.log')); + for (const path of candidates) { + try { + return await readFile(path, 'utf8'); + } catch (err) { + if (err.code !== 'ENOENT') throw err; + } } + return ''; } /** From bb0d1de0163fcf91cd40ec953c5c852b0cfe9cd1 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 18 May 2026 22:55:55 -0600 Subject: [PATCH 3/4] fix(tests): use 50 KB blobs so saves hit createWriteStream on receiver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The shared `fixture/` Location source produces 7,500-byte blobs, which fall under Harper's FILE_STORAGE_THRESHOLD (8192 bytes) and are stored inline in the record. With no file write, the receiver's createWriteStream is never called and the fault injector has nothing to intercept — assertion #2 ("Blob save failed line appeared") failed even though the injector loaded correctly (banner present in B's log twice). Add a dedicated `fixture-large-blob-source/` with a `LargeLocation` table whose `sourcedFrom` produces 50 KB streamed blobs — comfortably above the threshold, guaranteed to take the file-backed write path on the receiver. Switch the test to deploy/hit this fixture. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../blobSaveRejectionContainment.test.mjs | 29 +++++++++++-------- .../fixture-large-blob-source/config.yaml | 5 ++++ .../fixture-large-blob-source/resources.js | 25 ++++++++++++++++ .../fixture-large-blob-source/schema.graphql | 5 ++++ 4 files changed, 52 insertions(+), 12 deletions(-) create mode 100644 integrationTests/cluster/fixture-large-blob-source/config.yaml create mode 100644 integrationTests/cluster/fixture-large-blob-source/resources.js create mode 100644 integrationTests/cluster/fixture-large-blob-source/schema.graphql diff --git a/integrationTests/cluster/blobSaveRejectionContainment.test.mjs b/integrationTests/cluster/blobSaveRejectionContainment.test.mjs index b7da9b9a4..c17f0294c 100644 --- a/integrationTests/cluster/blobSaveRejectionContainment.test.mjs +++ b/integrationTests/cluster/blobSaveRejectionContainment.test.mjs @@ -93,17 +93,22 @@ suite('Receive-side blob save rejection containment', { timeout: 180000 }, (ctx) await delay(200 * (retries + 1)); } - // Deploy the blob-bearing Location component to A. `replicated: true` causes - // it to be installed on B as well — the injector + Location coexist on B. - const payload = await targz(join(import.meta.dirname, 'fixture')); + // Deploy a blob-bearing component to A. `replicated: true` causes it to be + // installed on B as well, where the injector + LargeLocation coexist. + // Using fixture-large-blob-source rather than the shared `fixture/` because + // the latter's blobs are 7,500 bytes — under Harper's FILE_STORAGE_THRESHOLD + // (8192) so they're stored inline and never hit createWriteStream on B, + // defeating this test. LargeLocation produces 50 KB blobs which always + // go through the file-backed write path. + const payload = await targz(join(import.meta.dirname, 'fixture-large-blob-source')); const deployResp = await sendOperation(ctx.nodes[0], { operation: 'deploy_component', - project: 'test-application', + project: 'large-blob-source', payload, replicated: true, restart: true, }); - equal(deployResp.message, 'Successfully deployed: test-application, restarting Harper'); + equal(deployResp.message, 'Successfully deployed: large-blob-source, restarting Harper'); // Give both nodes time to come back up after the restart. await delay(35000); @@ -124,12 +129,12 @@ suite('Receive-side blob save rejection containment', { timeout: 180000 }, (ctx) test('blob save ENOENT is logged exactly once and never escapes as uncaughtException', async () => { const [A, B] = ctx.nodes; - // Generate blob-bearing replication traffic by hitting /Location/{id} on A. - // Each request triggers `sourcedFrom.get(id)` on A → record + streamed blob - // committed → replicated to B → B's createWriteStream is patched to fail + // Generate blob-bearing replication traffic by hitting /LargeLocation/{id} on A. + // Each request triggers `sourcedFrom.get(id)` on A → record + 50 KB streamed + // blob committed → replicated to B → B's createWriteStream is patched to fail // every FAIL_INTERVALth call. const { execute, finish } = concurrent( - () => fetchWithRetry(A.httpURL + '/Location/' + Math.floor(Math.random() * BLOB_REQUESTS)), + () => fetchWithRetry(A.httpURL + '/LargeLocation/' + Math.floor(Math.random() * BLOB_REQUESTS)), 20 ); for (let i = 0; i < BLOB_REQUESTS; i++) await execute(); @@ -178,12 +183,12 @@ suite('Receive-side blob save rejection containment', { timeout: 180000 }, (ctx) await sendOperation(A, { operation: 'upsert', database: 'data', - table: 'Location', - records: [{ id: 999_999, name: 'liveness probe', random: 0.5 }], + table: 'LargeLocation', + records: [{ id: 999_999, name: 'liveness probe' }], }); let liveness = false; for (let r = 0; r < 20; r++) { - const resp = await fetchWithRetry(B.httpURL + '/Location/999999', { retries: 0 }).catch(() => null); + const resp = await fetchWithRetry(B.httpURL + '/LargeLocation/999999', { retries: 0 }).catch(() => null); if (resp?.ok) { const body = await resp.json(); if (body?.name === 'liveness probe') { diff --git a/integrationTests/cluster/fixture-large-blob-source/config.yaml b/integrationTests/cluster/fixture-large-blob-source/config.yaml new file mode 100644 index 000000000..fb8a45440 --- /dev/null +++ b/integrationTests/cluster/fixture-large-blob-source/config.yaml @@ -0,0 +1,5 @@ +rest: true +graphqlSchema: + files: '*.graphql' +jsResource: + files: resources.js diff --git a/integrationTests/cluster/fixture-large-blob-source/resources.js b/integrationTests/cluster/fixture-large-blob-source/resources.js new file mode 100644 index 000000000..aefafe526 --- /dev/null +++ b/integrationTests/cluster/fixture-large-blob-source/resources.js @@ -0,0 +1,25 @@ +import { randomBytes } from 'node:crypto'; +import { Readable } from 'node:stream'; + +// Streamed blob source whose payload is large enough (~50 KB) to exceed Harper's +// FILE_STORAGE_THRESHOLD (8192 bytes), guaranteeing the receiver's blob save goes +// through `createWriteStream` — which the companion fixture-blob-fail-injector +// monkey-patches. Smaller blobs are stored inline within the record and never +// touch the filesystem on the receive side, which would defeat the test. +tables.LargeLocation.sourcedFrom({ + get(id) { + const image = createBlob( + Readable.from( + (async function* () { + // 50 chunks × 1024 bytes = 51200 bytes per blob + for (let i = 0; i < 50; i++) yield randomBytes(1024); + })() + ) + ); + return { + id, + name: 'large location ' + id, + image, + }; + }, +}); diff --git a/integrationTests/cluster/fixture-large-blob-source/schema.graphql b/integrationTests/cluster/fixture-large-blob-source/schema.graphql new file mode 100644 index 000000000..2e2d5d1e8 --- /dev/null +++ b/integrationTests/cluster/fixture-large-blob-source/schema.graphql @@ -0,0 +1,5 @@ +type LargeLocation @table @export { + id: Long @primaryKey + name: String @indexed + image: Blob +} From 0350ab3390516f899dd6b48a66f1ba500a92ce3e Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 18 May 2026 23:04:16 -0600 Subject: [PATCH 4/4] fix(tests): liveness check via describe_table count instead of REST GET MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GET routing on a `sourcedFrom` table can be subtle: hitting /LargeLocation/{id} on the receiver of a partially-populated record may re-invoke the cache-miss handler instead of returning the locally stored record, and the request can fail in ways that don't show up as Harper log lines. The previous run confirmed assertions 1-4 are airtight (35 Blob save failed log entries, 0 uncaughtException, still connected per cluster_status) — the test was failing only on the liveness GET timing out. Switch to comparing describe_table.record_count before/after the upsert: a direct, unambiguous signal that doesn't depend on REST GET semantics for sourcedFrom tables. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../blobSaveRejectionContainment.test.mjs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/integrationTests/cluster/blobSaveRejectionContainment.test.mjs b/integrationTests/cluster/blobSaveRejectionContainment.test.mjs index c17f0294c..8327aada4 100644 --- a/integrationTests/cluster/blobSaveRejectionContainment.test.mjs +++ b/integrationTests/cluster/blobSaveRejectionContainment.test.mjs @@ -179,7 +179,11 @@ suite('Receive-side blob save rejection containment', { timeout: 180000 }, (ctx) 'B should still report A connected on every database socket' ); - // Liveness: a fresh write on A should still propagate to B. + // Liveness: a fresh write on A should still propagate to B. We check via + // `describe_table` on both sides — a direct, unambiguous count comparison + // that doesn't depend on REST routing semantics for a `sourcedFrom` table + // (where GET on a partial record can re-invoke the cache miss handler). + const beforeB = (await sendOperation(B, { operation: 'describe_table', table: 'LargeLocation' })).record_count; await sendOperation(A, { operation: 'upsert', database: 'data', @@ -188,13 +192,10 @@ suite('Receive-side blob save rejection containment', { timeout: 180000 }, (ctx) }); let liveness = false; for (let r = 0; r < 20; r++) { - const resp = await fetchWithRetry(B.httpURL + '/LargeLocation/999999', { retries: 0 }).catch(() => null); - if (resp?.ok) { - const body = await resp.json(); - if (body?.name === 'liveness probe') { - liveness = true; - break; - } + const afterB = (await sendOperation(B, { operation: 'describe_table', table: 'LargeLocation' })).record_count; + if (afterB > beforeB) { + liveness = true; + break; } await delay(500); }