diff --git a/.github/workflows/stress-tests.yaml b/.github/workflows/stress-tests.yaml new file mode 100644 index 000000000..daf5d0213 --- /dev/null +++ b/.github/workflows/stress-tests.yaml @@ -0,0 +1,122 @@ +name: Stress Tests + +# Long-running replication stress regressions. These are gated on the +# HARPER_RUN_STRESS_TESTS env var so they never run in the normal +# integration matrix (they'd blow the 15-minute shard timeout). Triggered +# manually from the Actions tab or on a weekly cadence. + +on: + workflow_dispatch: + inputs: + node-version: + description: 'Node.js version' + required: true + type: choice + default: '24' + options: + - '22' + - '24' + - '25' + soak-minutes: + description: 'Soak duration in minutes (Priority 1)' + required: false + default: '240' + orphan-minutes: + description: 'Orphan-race duration in minutes (Priority 3)' + required: false + default: '60' + adversity-minutes: + description: 'Rapid-reconnect duration in minutes (Priority 4)' + required: false + default: '30' + schedule: + # 06:11 UTC on Sundays (off-peak, off the canonical :00 mark) + - cron: '11 6 * * 0' + +jobs: + build: + name: Build Harper Pro (Node.js v${{ inputs.node-version || '24' }}) + runs-on: ubuntu-latest + timeout-minutes: 30 + steps: + - name: Checkout code + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + submodules: 'recursive' + - name: Setup Node.js ${{ inputs.node-version || '24' }} + uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0 + with: + node-version: ${{ inputs.node-version || '24' }} + package-manager-cache: false + - name: Install dependencies + run: npm install + - name: Build + run: npm run build || true # tolerate the same pre-existing TS errors as the regular integration workflow + - name: Upload build artifacts + uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 + with: + name: harper-stress-build-${{ inputs.node-version || '24' }} + path: | + dist/ + static/ + node_modules/ + package.json + retention-days: 1 + + stress: + name: Stress ${{ matrix.test.name }} (Node.js v${{ inputs.node-version || '24' }}) + needs: build + runs-on: ubuntu-latest + # Each test gets its own slot; the soak's worst case (240 min) drives + # the overall budget. Other tests will finish well before this. + timeout-minutes: 260 + strategy: + fail-fast: false + matrix: + test: + - name: 'worker-exit-cascade' + file: 'integrationTests/stress/workerExitCascade.test.mjs' + env_vars: '' + - name: 'soak-rolling-restarts' + file: 'integrationTests/stress/soakWithRollingRestarts.test.mjs' + env_vars: 'HARPER_STRESS_SOAK_MINUTES' + - name: 'blob-orphan-race' + file: 'integrationTests/stress/blobOrphanRace.test.mjs' + env_vars: 'HARPER_STRESS_ORPHAN_MINUTES' + - name: 'rapid-reconnect-adversity' + file: 'integrationTests/stress/rapidReconnectAdversity.test.mjs' + env_vars: 'HARPER_STRESS_ADVERSITY_MINUTES' + steps: + - name: Checkout code + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + submodules: 'recursive' + - name: Setup Node.js ${{ inputs.node-version || '24' }} + uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0 + with: + node-version: ${{ inputs.node-version || '24' }} + package-manager-cache: false + - name: Download build artifacts + uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8.0.1 + with: + name: harper-stress-build-${{ inputs.node-version || '24' }} + - name: Relink bin scripts + run: npm install --ignore-scripts + - name: Run ${{ matrix.test.name }} + env: + HARPER_RUN_STRESS_TESTS: '1' + HARPER_INTEGRATION_TEST_LOG_DIR: /tmp/harper-integration-test-logs + # Per-test duration knobs, sourced from workflow inputs (or scheduled defaults) + HARPER_STRESS_SOAK_MINUTES: ${{ inputs.soak-minutes || '240' }} + HARPER_STRESS_ORPHAN_MINUTES: ${{ inputs.orphan-minutes || '60' }} + HARPER_STRESS_ADVERSITY_MINUTES: ${{ inputs.adversity-minutes || '30' }} + run: | + node --experimental-test-coverage=false integrationTests/run.mjs ${{ matrix.test.file }} + - name: Upload Harper server logs + if: always() + uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 + with: + name: stress-logs-${{ matrix.test.name }}-node-${{ inputs.node-version || '24' }} + path: /tmp/harper-integration-test-logs/ + retention-days: 7 + if-no-files-found: ignore diff --git a/integrationTests/stress/backlogRecovery.test.mjs b/integrationTests/stress/backlogRecovery.test.mjs new file mode 100644 index 000000000..8bc91b7c3 --- /dev/null +++ b/integrationTests/stress/backlogRecovery.test.mjs @@ -0,0 +1,304 @@ +/** + * Cold-resume backlog recovery. + * + * Background: in production we periodically see a node go offline for an + * extended window (network blip, planned maintenance, OOM kill) and rejoin + * with a large backlog of audit entries queued at its peers. The peers + * have been buffering those entries waiting for the node to come back. The + * stress questions are: + * - Do peers cap their per-peer backlog memory so they don't OOM while + * the absent node is gone? + * - When the absent node returns, does it drain the backlog without OOMing + * itself, and converge with the rest of the cluster in bounded time? + * + * This is the *receive*-side mirror of soakWithRollingRestarts (which + * primarily exercises sender-side behavior during many short kill/restart + * cycles). This test exercises a single long absence — minutes, not seconds. + * + * Mechanism: + * - 4-node mesh (A, B, C, D) wired identically. + * - Wait until cluster is stable, then stop B (clean teardown — not a + * SIGKILL, because we're not testing crash recovery here, we're testing + * backlog accumulation while a peer is reachable-but-missing). + * - Drive heavy churn on A+C+D for HARPER_STRESS_BACKLOG_OFFLINE_MINUTES + * (default 5 minutes locally / 30 minutes in CI). Sample per-node RSS. + * - Restart B. Sample its RSS during catch-up. + * - Wait for convergence; assert. + * + * Assertions: + * 1. Peak RSS on A, C, D during the offline window stays under + * PEER_RSS_CAP_MB (default 1500 MB). Catches "peer buffers backlog + * forever" regressions. + * 2. Peak RSS on B during catch-up stays under CATCHUP_RSS_CAP_MB + * (default 1500 MB). Catches "node loads entire backlog into memory + * to apply" regressions. + * 3. After restart, B converges with peers within CATCHUP_BUDGET_SECS + * (default 180 s for local, scaled with offline duration in CI). + * 4. Zero uncaughtException, zero OOM, zero blob orphan markers on any node. + * 5. Zero `MaxListenersExceededWarning` in node logs. + * + * Run: + * HARPER_RUN_STRESS_TESTS=1 HARPER_STRESS_BACKLOG_OFFLINE_MINUTES=5 \ + * npm run test:integration -- integrationTests/stress/backlogRecovery.test.mjs + */ + +import { suite, test, before, after } from 'node:test'; +import { ok } from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import { join } from 'node:path'; +import { + startHarper, + teardownHarper, + killHarper, + getNextAvailableLoopbackAddress, + targz, +} from '@harperfast/integration-testing'; +import { + stressEnabled, + sendOperation, + fetchWithRetry, + concurrent, + readLog, + waitForAllConnected, + sampleMetrics, + summariseSamples, + mb, + prerenderId, +} from './stressShared.mjs'; + +process.env.HARPER_INTEGRATION_TEST_INSTALL_SCRIPT = join( + import.meta.dirname ?? module.path, + '..', + '..', + 'dist', + 'bin', + 'harper.js' +); + +if (!stressEnabled()) { + suite('Backlog recovery (skipped)', () => { + test('skipped — set HARPER_RUN_STRESS_TESTS=1 to enable', { skip: true }, () => {}); + }); +} else { + const THREADS_PER_NODE = 4; + const KEYSPACE = Number(process.env.HARPER_STRESS_BACKLOG_KEYS ?? 800); + const OFFLINE_MINUTES = Number(process.env.HARPER_STRESS_BACKLOG_OFFLINE_MINUTES ?? 5); + const PEER_RSS_CAP_MB = Number(process.env.HARPER_STRESS_BACKLOG_PEER_CAP_MB ?? 1500); + const CATCHUP_RSS_CAP_MB = Number(process.env.HARPER_STRESS_BACKLOG_CATCHUP_CAP_MB ?? 1500); + const CATCHUP_BUDGET_SECS = Number( + process.env.HARPER_STRESS_BACKLOG_CATCHUP_BUDGET_SECS ?? Math.max(180, OFFLINE_MINUTES * 30) + ); + const SUITE_TIMEOUT_MS = (OFFLINE_MINUTES * 60 + CATCHUP_BUDGET_SECS + 240) * 1000; + + suite('Cold-resume backlog recovery', { timeout: SUITE_TIMEOUT_MS }, (ctx) => { + before(async () => { + const cfg = (host) => ({ + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: host + ':9933' }, + threads: { count: THREADS_PER_NODE }, + }); + ctx.nodes = await Promise.all( + [0, 1, 2, 3].map(async () => { + const node = { name: ctx.name, harper: { hostname: await getNextAvailableLoopbackAddress() } }; + await startHarper(node, { config: cfg(node.harper.hostname), env: { HARPER_NO_FLUSH_ON_EXIT: true } }); + return node.harper; + }) + ); + + const tokenResp = await sendOperation(ctx.nodes[0], { + operation: 'create_authentication_tokens', + authorization: ctx.nodes[0].admin, + }); + for (let i = 1; i < ctx.nodes.length; i++) { + await sendOperation(ctx.nodes[i], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[0].hostname, + authorization: 'Bearer ' + tokenResp.operation_token, + }); + } + for (let i = 1; i < ctx.nodes.length; i++) { + await waitForAllConnected(ctx.nodes[i], { timeoutMs: 60_000 }); + } + + const payload = await targz(join(import.meta.dirname, 'fixture-prerender-workload')); + await sendOperation(ctx.nodes[0], { + operation: 'deploy_component', + project: 'prerender-workload', + payload, + replicated: true, + restart: true, + }); + await delay(40_000); + for (let i = 1; i < ctx.nodes.length; i++) { + await waitForAllConnected(ctx.nodes[i], { timeoutMs: 90_000 }); + } + }); + + after(async () => { + if (!ctx.nodes) return; + await Promise.all(ctx.nodes.map((n) => teardownHarper({ harper: n }).catch(() => null))); + }); + + test('peer stays online and absent node catches up without OOM', async () => { + const [A, , C, D] = ctx.nodes; + let B = ctx.nodes[1]; + + // Sample memory on peers A/C/D for the whole offline window. + const peerSamplers = [A, C, D].map((n) => sampleMetrics(n, { intervalMs: 2000 })); + + // Take B offline by killing the process (killHarper preserves dataRootDir + // so we can restart in place). We're not testing the crash-recovery seam + // here (that's replayCatchupSeam) — we want peers to backlog while B is + // gone, then we bring B back at the same data dir and watch it catch up. + console.log(`[backlog] killing B (${B.hostname}) for ${OFFLINE_MINUTES}m offline window`); + await killHarper({ harper: B }); + + // Churn on the surviving three. Driver pulls equally from A/C/D so + // the backlog isn't entirely on one peer's queue. + let stopChurn = false; + let writes = 0; + const peers = [A, C, D]; + const driver = concurrent(async () => { + if (stopChurn) return; + const id = prerenderId(writes++ % KEYSPACE); + const peer = peers[writes % peers.length]; + try { + await fetchWithRetry(peer.httpURL + '/Prerender/' + encodeURIComponent(id), { retries: 1 }); + } catch { + // transient errors on a peer mid-load are fine + } + }, 12); + + const churnLoop = (async () => { + while (!stopChurn) { + await driver.execute(); + await delay(10); + } + await driver.finish(); + })(); + + const endOffline = Date.now() + OFFLINE_MINUTES * 60_000; + while (Date.now() < endOffline) { + const remaining = Math.ceil((endOffline - Date.now()) / 60_000); + console.log(`[backlog] B offline; writes=${writes} remainingMins=${remaining}`); + await delay(30_000); + } + + // Stop driving new traffic — but keep the peer samplers running + // through catch-up so we capture both the peer-buffering peak and + // the drain. + stopChurn = true; + await churnLoop; + console.log(`[backlog] offline window done; total writes=${writes}; restarting B`); + + // Restart B at the same hostname so peers route to it. + const restartCtx = { + name: ctx.name, + harper: { dataRootDir: B.dataRootDir, hostname: B.hostname }, + }; + await startHarper(restartCtx, { + config: { + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: B.hostname + ':9933' }, + threads: { count: THREADS_PER_NODE }, + }, + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + ctx.nodes[1] = restartCtx.harper; + B = restartCtx.harper; + + const catchupSampler = sampleMetrics(B, { intervalMs: 2000 }); + + // Poll for convergence — describe_table on all four. Trip the break + // only when every node reports a positive, identical record_count + // (the `counts.A > 0` guard rules out an all-error all-(-1) state). + const catchupDeadline = Date.now() + CATCHUP_BUDGET_SECS * 1000; + let counts = { A: -1, B: -1, C: -1, D: -1 }; + let convergedAt = null; + while (Date.now() < catchupDeadline) { + const [a, b, c, d] = await Promise.all( + [A, B, C, D].map((n) => + sendOperation(n, { operation: 'describe_table', table: 'Prerender' }).catch(() => null) + ) + ); + counts = { + A: a?.record_count ?? -1, + B: b?.record_count ?? -1, + C: c?.record_count ?? -1, + D: d?.record_count ?? -1, + }; + const vals = Object.values(counts); + if (counts.A > 0 && vals.every((v) => v === vals[0])) { + convergedAt = Date.now(); + break; + } + const minLocal = Math.min(...vals); + const maxLocal = Math.max(...vals); + console.log(`[backlog] catchup poll: ${JSON.stringify(counts)} gap=${maxLocal - minLocal}`); + await delay(3000); + } + + const peerSummaries = peerSamplers.map((s) => summariseSamples(s.stop())); + const catchupSummary = summariseSamples(catchupSampler.stop()); + console.log( + `[backlog] peer peaks: A=${mb(peerSummaries[0].peakRss)} C=${mb(peerSummaries[1].peakRss)} ` + + `D=${mb(peerSummaries[2].peakRss)}; B catchup peak: ${mb(catchupSummary.peakRss)}` + ); + + const [logA, logB, logC, logD] = await Promise.all([readLog(A), readLog(B), readLog(C), readLog(D)]); + + // (1+2) Memory caps. + for (const [name, summary, cap] of [ + ['A', peerSummaries[0], PEER_RSS_CAP_MB], + ['C', peerSummaries[1], PEER_RSS_CAP_MB], + ['D', peerSummaries[2], PEER_RSS_CAP_MB], + ['B (catchup)', catchupSummary, CATCHUP_RSS_CAP_MB], + ]) { + const peakMb = summary.peakRss / 1024 / 1024; + ok( + peakMb < cap, + `${name} peak RSS ${peakMb.toFixed(0)} MB exceeded cap ${cap} MB (samples=${summary.sampleCount})` + ); + } + + // (3) Convergence. + ok( + convergedAt !== null, + `B did not converge within ${CATCHUP_BUDGET_SECS}s after restart; final: ${JSON.stringify(counts)}` + ); + + // (4) No uncaught / OOM / orphan. + const uncaughtRe = /\[error\]: uncaughtException/g; + const orphanRe = /\[error\] \[replication\]: Error sending blob.*ENOENT/g; + const oomRe = /JavaScript heap out of memory|FATAL ERROR.*Allocation failed/g; + for (const [name, log] of [ + ['A', logA], + ['B', logB], + ['C', logC], + ['D', logD], + ]) { + const u = (log.match(uncaughtRe) ?? []).length; + const o = (log.match(orphanRe) ?? []).length; + const m = (log.match(oomRe) ?? []).length; + ok(u === 0, `${name} logged ${u} uncaughtException`); + ok(o === 0, `${name} logged ${o} blob orphan markers`); + ok(m === 0, `${name} logged ${m} OOM markers`); + } + + // Note: not asserting on MaxListenersExceededWarning here — that's + // rapidReconnectAdversity's job. Harper's manageThreads.restartWorkers + // emits this once during deploy_component+restart:true startup + // (separate latent issue), and we don't want to entangle the catchup + // regression with that. + + const catchupDurationSecs = + convergedAt !== null ? Math.round((convergedAt - (catchupDeadline - CATCHUP_BUDGET_SECS * 1000)) / 1000) : -1; + console.log( + `[backlog] completed: writes=${writes} catchupSecs=${catchupDurationSecs} ` + `counts=${JSON.stringify(counts)}` + ); + }); + }); +} diff --git a/integrationTests/stress/blobOrphanRace.test.mjs b/integrationTests/stress/blobOrphanRace.test.mjs new file mode 100644 index 000000000..b1095a258 --- /dev/null +++ b/integrationTests/stress/blobOrphanRace.test.mjs @@ -0,0 +1,267 @@ +/** + * Blob orphan race investigation — attempts to reproduce the most mysterious + * wtk symptom: qub-ap-south-1 was emitting bursts of + * [error] [replication]: Error sending blob ... ENOENT '/blobs/prerender/0/d/36b' + * meaning some node had an audit-log reference to a blob whose file no + * longer existed on disk on the sender. We don't know exactly how the + * orphan was created in prod; this test sets up conditions where one is + * *most likely* to form and then checks every node for the error signature. + * + * Hypothesis: a sender produces a streamed blob, broadcasts it to peers, + * then immediately overwrites the same record (which schedules the old + * blob file for cleanup). If the per-record cleanup runs before the peer + * has acknowledged receiving the prior blob, the sender retries the blob + * but its file is gone. The receiver gets a partial / error blob. + * + * Mechanism in this test: + * - 2 nodes (A leader, B receiver). Both run the prerender-workload + * fixture so the table has `sourcedFrom` with mixed-size blobs. + * - A drives heavy churn against a *small* key space — say 100 ids — + * cycling so the same record is overwritten many times per minute. + * Each overwrite supersedes the prior blob file. The small key space + * keeps cumulative storage modest while maximizing supersede frequency. + * - B is restarted mid-test once, simulating real-world reconnect delay + * — when B reconnects, A's pending-send queue has to navigate the + * blob files that have churned since. + * - Run for HARPER_STRESS_ORPHAN_MINUTES (default 15 locally / 60 in CI). + * + * Assertions: + * 1. NO `Error sending blob ... ENOENT` lines on A. If we hit even one, + * we've reproduced the orphan bug in a controlled environment — file + * a follow-up issue with the test as repro. + * 2. NO `uncaughtException` on either node. + * 3. After test ends and a 60s drain, A and B agree on record_count and + * on the hash of each blob's content via the REST GET endpoint. + * + * Even if we don't reproduce the orphan, this is still a useful regression + * guard for the blob-cleanup-vs-replication race surface area. + * + * Run: + * HARPER_RUN_STRESS_TESTS=1 HARPER_STRESS_ORPHAN_MINUTES=10 \ + * npm run test:integration -- integrationTests/stress/blobOrphanRace.test.mjs + */ + +import { suite, test, before, after } from 'node:test'; +import { ok } from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import { join } from 'node:path'; +import { + startHarper, + teardownHarper, + killHarper, + getNextAvailableLoopbackAddress, + targz, +} from '@harperfast/integration-testing'; +import { + stressEnabled, + sendOperation, + fetchWithRetry, + concurrent, + readLog, + waitForAllConnected, + prerenderId, +} from './stressShared.mjs'; + +process.env.HARPER_INTEGRATION_TEST_INSTALL_SCRIPT = join( + import.meta.dirname ?? module.path, + '..', + '..', + 'dist', + 'bin', + 'harper.js' +); + +if (!stressEnabled()) { + suite('Blob orphan race (skipped)', () => { + test('skipped — set HARPER_RUN_STRESS_TESTS=1 to enable', { skip: true }, () => {}); + }); +} else { + const THREADS_PER_NODE = 2; + const KEYSPACE = Number(process.env.HARPER_STRESS_ORPHAN_KEYS ?? 100); + const TOTAL_MINUTES = Number(process.env.HARPER_STRESS_ORPHAN_MINUTES ?? 15); + const SUITE_TIMEOUT_MS = (TOTAL_MINUTES + 4) * 60_000; + + suite('Blob orphan race under heavy churn', { timeout: SUITE_TIMEOUT_MS }, (ctx) => { + before(async () => { + const cfg = (host) => ({ + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: host + ':9933' }, + threads: { count: THREADS_PER_NODE }, + }); + ctx.nodes = await Promise.all( + [0, 1].map(async () => { + const node = { name: ctx.name, harper: { hostname: await getNextAvailableLoopbackAddress() } }; + await startHarper(node, { config: cfg(node.harper.hostname), env: { HARPER_NO_FLUSH_ON_EXIT: true } }); + return node.harper; + }) + ); + const tokenResp = await sendOperation(ctx.nodes[0], { + operation: 'create_authentication_tokens', + authorization: ctx.nodes[0].admin, + }); + await sendOperation(ctx.nodes[1], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[0].hostname, + authorization: 'Bearer ' + tokenResp.operation_token, + }); + await waitForAllConnected(ctx.nodes[1], { timeoutMs: 60_000 }); + + const payload = await targz(join(import.meta.dirname, 'fixture-prerender-workload')); + await sendOperation(ctx.nodes[0], { + operation: 'deploy_component', + project: 'prerender-workload', + payload, + replicated: true, + restart: true, + }); + await delay(40_000); + await waitForAllConnected(ctx.nodes[1], { timeoutMs: 90_000 }); + }); + + after(async () => { + if (!ctx.nodes) return; + await Promise.all(ctx.nodes.map((n) => teardownHarper({ harper: n }).catch(() => null))); + }); + + test('heavy supersede churn does not orphan blobs on the sender', async () => { + const A = ctx.nodes[0]; + // `B` reassigned after mid-test restart below so post-restart + // reads (operations, readLog) see the new harper handle. + let B = ctx.nodes[1]; + const startedAt = Date.now(); + const endAt = startedAt + TOTAL_MINUTES * 60_000; + + // Decide the restart timing — once near the middle so we exercise + // the reconnect path during the churn window. + const restartAt = startedAt + Math.floor((TOTAL_MINUTES * 60_000) / 2); + let restarted = false; + + // Churn driver: cycle through KEYSPACE ids, hit each on A to + // trigger sourcedFrom (creating/recreating the blob). The small + // keyspace means each id is overwritten frequently. + let stopChurn = false; + let writes = 0; + const driver = concurrent(async () => { + if (stopChurn) return; + const id = prerenderId(writes++ % KEYSPACE); + try { + await fetchWithRetry(A.httpURL + '/Prerender/' + encodeURIComponent(id), { retries: 1 }); + } catch { + // Transient errors during the kill window are fine. + } + }, 8); + const churnLoop = (async () => { + while (!stopChurn) { + await driver.execute(); + await delay(20); + } + await driver.finish(); + })(); + + // Main timeline: wait for restart point, do a single kill+restart + // of B mid-test, then continue churn till endAt. + while (Date.now() < endAt) { + if (!restarted && Date.now() >= restartAt) { + console.log(`[orphan] mid-test restart of B (${B.hostname})`); + await killHarper({ harper: B }); + // Capture the new harper handle and update ctx.nodes[1] (and + // the local `B` alias via reassignment below) so all later + // references — including `readLog(B)` at assertion time — + // see the post-restart context. The `logDir` path happens + // to be hostname-derived and stable across restarts, so + // log capture would work either way; this is still the + // right hygiene, matching the soak + adversity tests. + const restartCtx = { + name: ctx.name, + harper: { dataRootDir: B.dataRootDir, hostname: B.hostname }, + }; + await startHarper(restartCtx, { + config: { + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: B.hostname + ':9933' }, + threads: { count: THREADS_PER_NODE }, + }, + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + ctx.nodes[1] = restartCtx.harper; + B = restartCtx.harper; + restarted = true; + } + const minsLeft = Math.ceil((endAt - Date.now()) / 60_000); + console.log(`[orphan] t=${Math.round((Date.now() - startedAt) / 1000)}s writes=${writes} restMins=${minsLeft}`); + await delay(30_000); + } + + stopChurn = true; + await churnLoop; + + // Drain time — let B catch up everything queued. Poll for convergence + // rather than blanket-sleeping so a slow drain doesn't burn the budget. + console.log('[orphan] stopping churn; waiting for convergence (up to 120s)'); + const drainDeadline = Date.now() + 120_000; + let aCount = -1; + let bCount = -1; + while (Date.now() < drainDeadline) { + const [a, b] = await Promise.all([ + sendOperation(A, { operation: 'describe_table', table: 'Prerender' }).catch(() => null), + sendOperation(B, { operation: 'describe_table', table: 'Prerender' }).catch(() => null), + ]); + aCount = a?.record_count ?? -1; + bCount = b?.record_count ?? -1; + if (aCount > 0 && aCount === bCount) break; + await delay(2000); + } + console.log(`[orphan] convergence wait done: A=${aCount} B=${bCount}`); + + const logA = await readLog(A); + const logB = await readLog(B); + + // === Assertions === + + // (1) The whole point of the test: any blob orphan markers? + const orphanRe = /\[error\] \[replication\]: Error sending blob.*ENOENT/g; + const orphansA = logA.match(orphanRe) ?? []; + const orphansB = logB.match(orphanRe) ?? []; + if (orphansA.length > 0 || orphansB.length > 0) { + console.log(`[orphan] !!! REPRODUCED !!! A=${orphansA.length} B=${orphansB.length}`); + console.log('A sample:', orphansA.slice(0, 3)); + console.log('B sample:', orphansB.slice(0, 3)); + } + ok( + orphansA.length === 0, + `A produced ${orphansA.length} "Error sending blob ENOENT" — blob-orphan pattern reproduced. ` + + `Sample: ${orphansA[0]}` + ); + ok(orphansB.length === 0, `B produced ${orphansB.length} "Error sending blob ENOENT". Sample: ${orphansB[0]}`); + + // (2) No uncaughtException either side. + const uncaughtA = logA.match(/\[error\]: uncaughtException/g) ?? []; + const uncaughtB = logB.match(/\[error\]: uncaughtException/g) ?? []; + ok(uncaughtA.length === 0, `A logged ${uncaughtA.length} uncaughtException`); + ok(uncaughtB.length === 0, `B logged ${uncaughtB.length} uncaughtException`); + + // (3) Convergence on record_count after the drain loop above. + // Allow ≤1% drift to absorb the last few in-flight commits that may + // not have replicated by the deadline. + // + // We intentionally do NOT assert an upper bound against KEYSPACE — + // Harper's `sourcedFrom` cache may create new audit/record versions + // on each cache miss under high churn, so the live record_count can + // substantially exceed the unique-key count. The orphan repro is + // about blob lifecycle, not exact record cardinality. + const minCount = Math.min(aCount, bCount); + const maxCount = Math.max(aCount, bCount); + const drift = maxCount > 0 ? (maxCount - minCount) / maxCount : 0; + ok(drift < 0.01, `record_count diverged > 1%: A=${aCount} B=${bCount} drift ${(drift * 100).toFixed(2)}%`); + ok(maxCount > 0, `record_count must be > 0 (saw ${maxCount}); was the churn actually firing?`); + + console.log( + `[orphan] completed: writes=${writes} aCount=${aCount} bCount=${bCount} ` + + `orphans A=${orphansA.length} B=${orphansB.length}` + ); + }); + }); +} diff --git a/integrationTests/stress/fixture-prerender-workload/config.yaml b/integrationTests/stress/fixture-prerender-workload/config.yaml new file mode 100644 index 000000000..fb8a45440 --- /dev/null +++ b/integrationTests/stress/fixture-prerender-workload/config.yaml @@ -0,0 +1,5 @@ +rest: true +graphqlSchema: + files: '*.graphql' +jsResource: + files: resources.js diff --git a/integrationTests/stress/fixture-prerender-workload/resources.js b/integrationTests/stress/fixture-prerender-workload/resources.js new file mode 100644 index 000000000..b2dd8e26d --- /dev/null +++ b/integrationTests/stress/fixture-prerender-workload/resources.js @@ -0,0 +1,56 @@ +// Prerender-style cache fixture: a `sourcedFrom` table where each cache-miss GET +// produces a record + streamed blob whose size is intentionally bimodal to +// match the wtk production workload — about 60% of payloads land below +// Harper's FILE_STORAGE_THRESHOLD (8192 bytes, stored inline in the record) +// and 40% above (file-backed via createWriteStream). +// +// The mix exercises both blob storage paths in a single test run, which is +// what the production prerender table looked like (mix of small JSON-ish +// fragments and larger HTML pages). +import { randomBytes } from 'node:crypto'; +import { Readable } from 'node:stream'; + +// Deterministic-from-id sizing so the test gets a stable distribution rather +// than purely random outcomes — keeps assertions about "saw both paths" stable. +function payloadSizeFor(id) { + // Hash the id to a number in [0, 1000) + let h = 0; + const s = String(id); + for (let i = 0; i < s.length; i++) { + h = (h * 31 + s.charCodeAt(i)) | 0; + } + const bucket = Math.abs(h) % 1000; + // 60% small (1-4KB inline path), 40% large (16-64KB file path) + if (bucket < 600) return 1024 + (bucket % 3072); + return 16384 + ((bucket * 137) % 49152); +} + +tables.Prerender.sourcedFrom({ + get(id) { + const size = payloadSizeFor(id); + const chunkSize = 1024; + const chunks = Math.max(1, Math.ceil(size / chunkSize)); + const body = createBlob( + Readable.from( + (async function* () { + let remaining = size; + for (let i = 0; i < chunks; i++) { + const take = Math.min(chunkSize, remaining); + yield randomBytes(take); + remaining -= take; + } + })() + ) + ); + // Best-effort device extraction from id of the form "...|device" + const sepIdx = String(id).lastIndexOf('|'); + const device = sepIdx >= 0 ? String(id).slice(sepIdx + 1) : 'desktop'; + return { + id, + device, + random: Math.random(), + cached_at: new Date(), + body, + }; + }, +}); diff --git a/integrationTests/stress/fixture-prerender-workload/schema.graphql b/integrationTests/stress/fixture-prerender-workload/schema.graphql new file mode 100644 index 000000000..37398e581 --- /dev/null +++ b/integrationTests/stress/fixture-prerender-workload/schema.graphql @@ -0,0 +1,7 @@ +type Prerender @table @export { + id: String @primaryKey + device: String @indexed + random: Float + cached_at: Date + body: Blob +} diff --git a/integrationTests/stress/fixture-suicide-worker/config.yaml b/integrationTests/stress/fixture-suicide-worker/config.yaml new file mode 100644 index 000000000..c9012cc51 --- /dev/null +++ b/integrationTests/stress/fixture-suicide-worker/config.yaml @@ -0,0 +1,3 @@ +rest: true +jsResource: + files: resources.js diff --git a/integrationTests/stress/fixture-suicide-worker/resources.js b/integrationTests/stress/fixture-suicide-worker/resources.js new file mode 100644 index 000000000..9effba044 --- /dev/null +++ b/integrationTests/stress/fixture-suicide-worker/resources.js @@ -0,0 +1,29 @@ +// Test-only component that exposes a `/suicide-worker` endpoint. Hitting it +// kills the worker thread that received the request via process.exit(137) — +// the conventional exit code for SIGKILL-like termination. Harper's worker +// supervisor will respawn a fresh worker and trigger replication subscription +// reassignment, which is the codepath PR #147's stagger fix protects. +// +// Authorized via Harper's normal auth so the test driver must provide creds. +// Returns 200 immediately, then schedules the exit on the next tick so the +// response actually reaches the caller before the worker dies. +import { threadId } from 'node:worker_threads'; + +export class SuicideWorker extends Resource { + allowRead() { + return true; + } + get() { + // Schedule the exit asynchronously so the HTTP response can flush. + setImmediate(() => { + // 137 = 128 + 9 (SIGKILL convention), but we use process.exit so the + // node test harness still sees a clean exit code from the worker. + process.exit(137); + }); + return { + threadId, + pid: process.pid, + message: 'worker will exit shortly', + }; + } +} diff --git a/integrationTests/stress/partitionHealConvergence.test.mjs b/integrationTests/stress/partitionHealConvergence.test.mjs new file mode 100644 index 000000000..a8b8f2d09 --- /dev/null +++ b/integrationTests/stress/partitionHealConvergence.test.mjs @@ -0,0 +1,324 @@ +/** + * Partition + heal convergence (split-brain) test. + * + * ⚠️ CURRENTLY BLOCKED — requires a Harper-side change. See "Blocker" below. + * Test is gated on HARPER_STRESS_ALLOW_INSECURE_REPLICATION=1 in addition + * to HARPER_RUN_STRESS_TESTS=1 so it doesn't fail by default. + * + * Background: distributed systems claim to converge after a partition heals. + * Harper's replication should: each side accepts writes while disconnected, + * and once the connection is restored, both sides exchange catchup so all + * peers end up with the same content per key (under whatever conflict + * resolution Harper uses — at the moment, HLC last-write-wins). + * + * We can't drop loopback traffic with iptables (no NET_ADMIN, and the + * loopback path skips netfilter anyway), so we interpose a controllable + * TCP proxy between the two Harper nodes. The proxy forwards normally, but + * the test flips it to "blocked" mode to simulate a partition. Existing + * sockets are torn down; new ones are rejected. On `unblock()`, traffic + * resumes. + * + * Blocker: Harper's replication WS client validates the server certificate's + * SAN/altnames against the connect target hostname. Self-signed replication + * certs only list the node's own IP (e.g. 127.0.0.1, ::1), so when peer B + * dials A via the proxy at 127.0.0.3, the TLS handshake fails with + * "Hostname/IP does not match certificate's altnames" + * `NODE_TLS_REJECT_UNAUTHORIZED=0` does not bypass this; the WS client + * appears to validate independently. Options to unblock, in rough order + * of preference: + * (a) a `replication.rejectUnauthorized: false` / + * `replication.checkServerIdentity` config flag in Harper (clean fix). + * (b) bind the proxy to the SAME IP as A but a different PORT (e.g. + * 127.0.0.1:9934, forwarding to 127.0.0.1:9933). The TLS SAN check + * only validates the host, not the port, so 127.0.0.1 matches A's + * cert. Requires `add_node` to accept a non-default port — confirm + * the operations API supports `hostname: '127.0.0.1:9934'` or a + * separate `port` arg. + * (c) mint replication certs with a configurable SAN list covering proxy + * hostnames. + * Additional caveat surfaced by review: even after the TLS path opens, this + * file assumes a single bidirectional WS initiated by B carries both + * directions of replication. If Harper actually opens an independent A→B + * connection (gossip / mutual dial), blocking just the B→A proxy won't + * partition the cluster. Verify the topology before trusting a "passing" + * result, and add a second proxy / route A's outbound through the proxy if + * needed. + * + * Mechanism: + * - 2 nodes A and B. B's `add_node` points at a proxy address that forwards + * to A's real replication port. Since replication is a single bidirectional + * WS initiated by B, blocking that one proxy interrupts traffic in both + * directions — we don't need a second proxy. + * - Initial sync ensures both nodes see the deployed schema and connect. + * - Phase 1 (pre-partition): drive light churn on both A and B against + * overlapping keys 0..KEYSPACE/2 so we have a known baseline of replicated + * rows. + * - Phase 2 (partition): proxy.block() on both. Each side writes to keys in + * a side-specific range so post-heal we know which write came from where + * (id encodes the originating side). Run for HARPER_STRESS_PARTITION_SECS. + * - Phase 3 (heal): proxy.unblock() on both. Drive a brief tail of churn so + * new traffic crosses the heal. Stop. Wait for convergence. + * + * Assertions: + * 1. After heal, A.record_count === B.record_count (strict equality — for + * keys, not for blob contents). Catches split-brain where one side + * can't observe the other's writes. + * 2. Sampled keys: pull N specific record ids from each side, compare. They + * should match. Catches LWW divergence (different versions resolve to + * different "winners" on the two sides). + * 3. Zero uncaughtException on either side during partition or heal. + * 4. Catch-up "Replayed" message NOT expected (no crashes); but any + * `[error] [replication]` lines from the heal window are surfaced for + * review, not failed on (some reconnect noise is expected). + * + * Run: + * HARPER_RUN_STRESS_TESTS=1 \ + * npm run test:integration -- integrationTests/stress/partitionHealConvergence.test.mjs + */ + +import { suite, test, before, after } from 'node:test'; +import { ok, equal } from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import { join } from 'node:path'; +import { startHarper, teardownHarper, getNextAvailableLoopbackAddress, targz } from '@harperfast/integration-testing'; +import { + stressEnabled, + sendOperation, + fetchWithRetry, + concurrent, + readLog, + waitForAllConnected, +} from './stressShared.mjs'; +import { ReplicationProxy } from './replicationProxy.mjs'; + +process.env.HARPER_INTEGRATION_TEST_INSTALL_SCRIPT = join( + import.meta.dirname ?? module.path, + '..', + '..', + 'dist', + 'bin', + 'harper.js' +); + +if (!stressEnabled() || process.env.HARPER_STRESS_ALLOW_INSECURE_REPLICATION !== '1') { + suite('Partition + heal convergence (skipped)', () => { + test( + 'skipped — blocked on Harper replication TLS hostname validation. ' + + 'See file header for details. Re-enable by setting both ' + + 'HARPER_RUN_STRESS_TESTS=1 and HARPER_STRESS_ALLOW_INSECURE_REPLICATION=1 ' + + 'after Harper adds a replication TLS-skip config option.', + { skip: true }, + () => {} + ); + }); +} else { + const THREADS_PER_NODE = 2; + const KEYSPACE = Number(process.env.HARPER_STRESS_PARTITION_KEYS ?? 100); + const PARTITION_SECS = Number(process.env.HARPER_STRESS_PARTITION_SECS ?? 60); + const PRE_SECS = Number(process.env.HARPER_STRESS_PARTITION_PRE_SECS ?? 30); + const HEAL_TAIL_SECS = Number(process.env.HARPER_STRESS_PARTITION_HEAL_TAIL_SECS ?? 30); + const CONVERGE_BUDGET_SECS = Number(process.env.HARPER_STRESS_PARTITION_CONVERGE_BUDGET_SECS ?? 180); + const SUITE_TIMEOUT_MS = (PRE_SECS + PARTITION_SECS + HEAL_TAIL_SECS + CONVERGE_BUDGET_SECS + 240) * 1000; + + // Side-specific id encodes which side originated the write — partition rows + // can be recognized later. + const partitionId = (side, n) => `partition/${side}/${n}`; + + suite('Partition + heal convergence', { timeout: SUITE_TIMEOUT_MS }, (ctx) => { + before(async () => { + const aHost = await getNextAvailableLoopbackAddress(); + const bHost = await getNextAvailableLoopbackAddress(); + const proxyForAHost = await getNextAvailableLoopbackAddress(); + + ctx.proxy = new ReplicationProxy({ + listenHost: proxyForAHost, + listenPort: 9933, + targetHost: aHost, + targetPort: 9933, + }); + await ctx.proxy.start(); + + const cfg = (host) => ({ + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: host + ':9933' }, + threads: { count: THREADS_PER_NODE }, + }); + const nodeA = { name: ctx.name, harper: { hostname: aHost } }; + const nodeB = { name: ctx.name, harper: { hostname: bHost } }; + await startHarper(nodeA, { + config: cfg(aHost), + env: { HARPER_NO_FLUSH_ON_EXIT: true, NODE_TLS_REJECT_UNAUTHORIZED: '0' }, + }); + await startHarper(nodeB, { + config: cfg(bHost), + env: { HARPER_NO_FLUSH_ON_EXIT: true, NODE_TLS_REJECT_UNAUTHORIZED: '0' }, + }); + ctx.nodes = [nodeA.harper, nodeB.harper]; + + // Wire B → A through the proxy. Bidirectional WS rides this one socket. + const tokenA = await sendOperation(ctx.nodes[0], { + operation: 'create_authentication_tokens', + authorization: ctx.nodes[0].admin, + }); + await sendOperation(ctx.nodes[1], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: proxyForAHost, + authorization: 'Bearer ' + tokenA.operation_token, + }); + await waitForAllConnected(ctx.nodes[1], { timeoutMs: 60_000 }); + + const payload = await targz(join(import.meta.dirname, 'fixture-prerender-workload')); + await sendOperation(ctx.nodes[0], { + operation: 'deploy_component', + project: 'prerender-workload', + payload, + replicated: true, + restart: true, + }); + await delay(40_000); + await waitForAllConnected(ctx.nodes[1], { timeoutMs: 90_000 }); + }); + + after(async () => { + if (ctx.nodes) { + await Promise.all(ctx.nodes.map((n) => teardownHarper({ harper: n }).catch(() => null))); + } + if (ctx.proxy) { + await ctx.proxy.stop(); + } + }); + + test('split-brain writes converge on identical state after heal', async () => { + const [A, B] = ctx.nodes; + + // Phase 1: pre-partition baseline — small overlapping churn. + console.log(`[partition] phase 1 (pre, ${PRE_SECS}s): baseline overlapping churn`); + let writesA = 0; + let writesB = 0; + let phase = 'pre'; + const drive = async (node, side) => { + let n = side === 'A' ? writesA++ : writesB++; + let id; + if (phase === 'pre' || phase === 'heal') { + id = partitionId('shared', n % Math.max(1, Math.floor(KEYSPACE / 2))); + } else { + id = partitionId(side, n % Math.max(1, Math.floor(KEYSPACE / 2))); + } + try { + await fetchWithRetry(node.httpURL + '/Prerender/' + encodeURIComponent(id), { retries: 1 }); + } catch { + // Partition-induced errors are expected during phase 2 + } + }; + let stopChurn = false; + const driverA = concurrent(() => (stopChurn ? null : drive(A, 'A')), 6); + const driverB = concurrent(() => (stopChurn ? null : drive(B, 'B')), 6); + const churnLoop = (async () => { + while (!stopChurn) { + await Promise.all([driverA.execute(), driverB.execute()]); + await delay(25); + } + await Promise.all([driverA.finish(), driverB.finish()]); + })(); + + await delay(PRE_SECS * 1000); + + // Phase 2: partition. + phase = 'partition'; + console.log(`[partition] phase 2 (${PARTITION_SECS}s): blocking proxy — split-brain writes`); + ctx.proxy.block(); + await delay(PARTITION_SECS * 1000); + + // Phase 3: heal. + phase = 'heal'; + console.log(`[partition] phase 3 (${HEAL_TAIL_SECS}s): unblocking proxy — write tail`); + ctx.proxy.unblock(); + await delay(HEAL_TAIL_SECS * 1000); + + stopChurn = true; + await churnLoop; + + console.log(`[partition] churn stopped; total writesA=${writesA} writesB=${writesB}`); + + // Convergence wait — strict equality for record_count after partition. + const convergeDeadline = Date.now() + CONVERGE_BUDGET_SECS * 1000; + let counts = { A: -1, B: -1 }; + let convergedAt = null; + while (Date.now() < convergeDeadline) { + const [a, b] = await Promise.all([ + sendOperation(A, { operation: 'describe_table', table: 'Prerender' }).catch(() => null), + sendOperation(B, { operation: 'describe_table', table: 'Prerender' }).catch(() => null), + ]); + counts = { A: a?.record_count ?? -1, B: b?.record_count ?? -1 }; + if (counts.A > 0 && counts.A === counts.B) { + convergedAt = Date.now(); + break; + } + console.log(`[partition] converge poll: ${JSON.stringify(counts)}`); + await delay(3000); + } + + // Per-record sampling — pick a few side-A and side-B partition ids and + // check that both nodes report the same `random` field (record version + // proxy). Strict equality required. + const sampleIds = []; + for (let i = 0; i < Math.min(5, Math.floor(KEYSPACE / 2)); i++) { + sampleIds.push(partitionId('A', i)); + sampleIds.push(partitionId('B', i)); + sampleIds.push(partitionId('shared', i)); + } + const fetchRecord = async (node, id) => { + try { + const res = await fetchWithRetry(node.httpURL + '/Prerender/' + encodeURIComponent(id), { retries: 2 }); + if (!res.ok) return null; + const json = await res.json(); + return json?.random ?? null; + } catch { + return null; + } + }; + const sampleDiffs = []; + for (const id of sampleIds) { + const [vA, vB] = await Promise.all([fetchRecord(A, id), fetchRecord(B, id)]); + if (vA !== vB) sampleDiffs.push({ id, vA, vB }); + } + + const [logA, logB] = await Promise.all([readLog(A), readLog(B)]); + + // (1) record_count strictly equal post-heal. + ok( + convergedAt !== null, + `A and B did not converge on record_count within ${CONVERGE_BUDGET_SECS}s: ${JSON.stringify(counts)}` + ); + equal(counts.A, counts.B, `record_count mismatch post-heal: ${JSON.stringify(counts)}`); + + // (2) sampled keys agree. + ok( + sampleDiffs.length === 0, + `${sampleDiffs.length}/${sampleIds.length} sampled keys diverge between A and B: ` + + JSON.stringify(sampleDiffs.slice(0, 5)) + ); + + // (3) no uncaught. + const uncaughtRe = /\[error\]: uncaughtException/g; + for (const [name, log] of [ + ['A', logA], + ['B', logB], + ]) { + const u = (log.match(uncaughtRe) ?? []).length; + ok(u === 0, `${name} logged ${u} uncaughtException across partition/heal`); + } + + // Surface replication-error counts for review (not fail-on). + const replErrRe = /\[error\] \[replication\]/g; + const errA = (logA.match(replErrRe) ?? []).length; + const errB = (logB.match(replErrRe) ?? []).length; + console.log( + `[partition] completed: writesA=${writesA} writesB=${writesB} counts=${JSON.stringify(counts)} ` + + `replicationErrorLines A=${errA} B=${errB}` + ); + }); + }); +} diff --git a/integrationTests/stress/rapidReconnectAdversity.test.mjs b/integrationTests/stress/rapidReconnectAdversity.test.mjs new file mode 100644 index 000000000..5dede4dc7 --- /dev/null +++ b/integrationTests/stress/rapidReconnectAdversity.test.mjs @@ -0,0 +1,250 @@ +/** + * Rapid-reconnect adversity test — exercises the WS connect/disconnect + * paths in replicationConnection.ts at a much higher rate than the soak + * test, surfacing listener leaks, retry storms, and the (recently fixed) + * "schemaUpdateListener still pinned to global emitter after WS close" + * class of bug (harper-pro PR #161/#173). + * + * Originally scoped as a tc/netem + iptables network-adversity test, but + * those require NET_ADMIN, which we don't reliably have outside CI's + * `ubuntu-latest` runner. Forced WS closures via fast kill+restart cycles + * cover the same surface — reconnect, subscription resubscribe, blob + * stream resumption, listener cleanup — without needing root. + * + * Setup: 3 nodes, mesh, prerender workload. + * + * Sequence: + * - Steady write traffic on all nodes. + * - Every CYCLE_SECONDS (default 15s), pick a random node, kill+restart + * it. Wait the rest of the cycle, repeat. + * - Total duration HARPER_STRESS_ADVERSITY_MINUTES (default 10 locally / + * 30 in workflow). + * + * Assertions: + * 1. MaxListenersExceededWarning never appears (the recent fixes were + * explicitly about not leaking schemaUpdateListener / dropDatabase + * listeners on `databaseEventsEmitter` after WS close). + * 2. No `uncaughtException`. + * 3. No `ERR_WORKER_OUT_OF_MEMORY`. + * 4. Final convergence: every node has equal Prerender record_count. + * + * Run: + * HARPER_RUN_STRESS_TESTS=1 HARPER_STRESS_ADVERSITY_MINUTES=5 \ + * npm run test:integration -- \ + * integrationTests/stress/rapidReconnectAdversity.test.mjs + */ + +import { suite, test, before, after } from 'node:test'; +import { ok } from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import { join } from 'node:path'; +import { + startHarper, + teardownHarper, + killHarper, + getNextAvailableLoopbackAddress, + targz, +} from '@harperfast/integration-testing'; +import { + stressEnabled, + sendOperation, + trySendOperation, + fetchWithRetry, + concurrent, + readLog, + waitForAllConnected, + prerenderId, +} from './stressShared.mjs'; + +process.env.HARPER_INTEGRATION_TEST_INSTALL_SCRIPT = join( + import.meta.dirname ?? module.path, + '..', + '..', + 'dist', + 'bin', + 'harper.js' +); + +if (!stressEnabled()) { + suite('Rapid reconnect adversity (skipped)', () => { + test('skipped — set HARPER_RUN_STRESS_TESTS=1 to enable', { skip: true }, () => {}); + }); +} else { + const NODE_COUNT = 3; + const THREADS_PER_NODE = 2; + const TOTAL_MINUTES = Number(process.env.HARPER_STRESS_ADVERSITY_MINUTES ?? 10); + const CYCLE_SECONDS = Number(process.env.HARPER_STRESS_ADVERSITY_CYCLE_SECONDS ?? 15); + const TRAFFIC_RPS = Number(process.env.HARPER_STRESS_ADVERSITY_RPS ?? 10); + const SUITE_TIMEOUT_MS = (TOTAL_MINUTES + 4) * 60_000; + + suite('Rapid reconnect adversity', { timeout: SUITE_TIMEOUT_MS }, (ctx) => { + before(async () => { + const cfg = (host) => ({ + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: host + ':9933' }, + threads: { count: THREADS_PER_NODE }, + }); + ctx.nodes = await Promise.all( + Array.from({ length: NODE_COUNT }).map(async () => { + const node = { name: ctx.name, harper: { hostname: await getNextAvailableLoopbackAddress() } }; + await startHarper(node, { config: cfg(node.harper.hostname), env: { HARPER_NO_FLUSH_ON_EXIT: true } }); + return node.harper; + }) + ); + // Mesh + const tokens = await Promise.all( + ctx.nodes.map((n) => sendOperation(n, { operation: 'create_authentication_tokens', authorization: n.admin })) + ); + for (let i = 0; i < NODE_COUNT; i++) { + for (let j = 0; j < NODE_COUNT; j++) { + if (i === j) continue; + await sendOperation(ctx.nodes[i], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[j].hostname, + authorization: 'Bearer ' + tokens[j].operation_token, + }); + } + } + for (const n of ctx.nodes) await waitForAllConnected(n, { timeoutMs: 60_000 }); + + const payload = await targz(join(import.meta.dirname, 'fixture-prerender-workload')); + await sendOperation(ctx.nodes[0], { + operation: 'deploy_component', + project: 'prerender-workload', + payload, + replicated: true, + restart: true, + }); + await delay(35_000); + for (const n of ctx.nodes) await waitForAllConnected(n, { timeoutMs: 60_000 }); + }); + + after(async () => { + if (!ctx.nodes) return; + await Promise.all(ctx.nodes.map((n) => teardownHarper({ harper: n }).catch(() => null))); + }); + + test('rapid kill+restart cycles surface no listener leaks or uncaughts', async () => { + const startedAt = Date.now(); + const endAt = startedAt + TOTAL_MINUTES * 60_000; + + // Sustained but light traffic from all nodes simultaneously. + let stop = false; + let seq = 0; + const drivers = ctx.nodes.map((n) => + concurrent( + async () => { + if (stop) return; + const id = prerenderId(seq++ % 1500); + try { + await fetchWithRetry(n.httpURL + '/Prerender/' + encodeURIComponent(id), { retries: 1 }); + } catch { + /* expected during kill windows */ + } + }, + Math.max(1, Math.floor(TRAFFIC_RPS / NODE_COUNT)) + ) + ); + const feeders = drivers.map(async (driver) => { + while (!stop) { + await driver.execute(); + await delay(1000 / Math.max(1, TRAFFIC_RPS / NODE_COUNT)); + } + await driver.finish(); + }); + + let cycle = 0; + while (Date.now() < endAt) { + cycle++; + const idx = (cycle - 1) % NODE_COUNT; + const victim = ctx.nodes[idx]; + console.log( + `[adversity] cycle ${cycle} t=${Math.round((Date.now() - startedAt) / 1000)}s ` + + `killing node ${idx} (${victim.hostname})` + ); + await killHarper({ harper: victim }); + // Reuse the same context object — startHarper mutates ctx.harper + // in place to point at the new ChildProcess, so subsequent kills + // in this cycle hit the actual running process rather than the + // stale reference from the previous spawn. + const restartCtx = { + name: ctx.name, + harper: { dataRootDir: victim.dataRootDir, hostname: victim.hostname }, + }; + await startHarper(restartCtx, { + config: { + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: victim.hostname + ':9933' }, + threads: { count: THREADS_PER_NODE }, + }, + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + ctx.nodes[idx] = restartCtx.harper; + // Wait the rest of the cycle period before the next kill. + const elapsed = (Date.now() - startedAt) / 1000; + const nextCycleAt = cycle * CYCLE_SECONDS; + const sleepMs = Math.max(2000, (nextCycleAt - elapsed) * 1000); + if (Date.now() + sleepMs > endAt) break; + await delay(sleepMs); + } + + stop = true; + await Promise.all(feeders); + + // Settle. + await delay(20_000); + + // === Assertions === + + const logs = await Promise.all(ctx.nodes.map((n) => readLog(n))); + for (let i = 0; i < logs.length; i++) { + const log = logs[i]; + const node = ctx.nodes[i]; + + // (1) MaxListenersExceededWarning — the prior listener-leak fixes + // in main were specifically about not pinning listeners on + // databaseEventsEmitter across WS reconnects. + ok( + !log.includes('MaxListenersExceededWarning'), + `node ${i} (${node.hostname}) logged MaxListenersExceededWarning during reconnect cycles` + ); + + // (2) No uncaughtException + const uncaught = log.match(/\[error\]: uncaughtException/g) ?? []; + ok(uncaught.length === 0, `node ${i} logged ${uncaught.length} uncaughtException; first: ${uncaught[0]}`); + + // (3) No OOM + ok(!log.includes('ERR_WORKER_OUT_OF_MEMORY'), `node ${i} logged ERR_WORKER_OUT_OF_MEMORY`); + } + + // (4) Convergence — poll for a window since heavy restart churn may + // leave a brief lag at the moment we stop traffic. Then assert ≤1% + // drift across all nodes (matches the soak's relaxation; strict + // equality is too brittle under rapid restarts). + const deadline = Date.now() + 60_000; + let counts = []; + while (Date.now() < deadline) { + counts = await Promise.all( + ctx.nodes.map((n) => + trySendOperation(n, { operation: 'describe_table', table: 'Prerender' }).then((r) => r?.record_count ?? -1) + ) + ); + const max = Math.max(...counts); + const min = Math.min(...counts); + if (max > 0 && (max - min) / max < 0.01) break; + await delay(2000); + } + console.log(`[adversity] cycles=${cycle} final record_count=${counts.join(', ')}`); + const max = Math.max(...counts); + const min = Math.min(...counts); + const drift = max > 0 ? (max - min) / max : 0; + ok( + drift < 0.01, + `record_count diverged > 1% across nodes after ${cycle} kill cycles: ${JSON.stringify(counts)} drift=${(drift * 100).toFixed(2)}%` + ); + }); + }); +} diff --git a/integrationTests/stress/replayCatchupSeam.test.mjs b/integrationTests/stress/replayCatchupSeam.test.mjs new file mode 100644 index 000000000..a08f8be04 --- /dev/null +++ b/integrationTests/stress/replayCatchupSeam.test.mjs @@ -0,0 +1,255 @@ +/** + * Crash recovery seam — replayLogs vs replication catch-up. + * + * Background: when a Harper node SIGKILLs with HARPER_NO_FLUSH_ON_EXIT=true, + * its on-disk audit log carries the last few committed-but-unflushed entries. + * On restart, replayLogs.ts walks that tail and re-applies it to the table + * stores. Meanwhile, peers — which already have those same versions from + * live replication before the crash — kick off catch-up to push anything the + * crashed node missed *after* the last entry it saw. + * + * The seam between these two flows is the interesting part: the same audit + * version range can be visible to both replayLogs (locally) and catch-up + * (from peers). Apply rules must be idempotent — re-applying the same + * version must not produce duplicate rows, lost rows, or corrupt structures. + * Subtle bugs here would show up as record_count drift, "Error writing from + * replay of log" stack traces, or "msgpack/structure decode failed" entries. + * + * Mechanism: + * - 3-node mesh (A leader, B + C followers), all four-thread. + * - Drive churn against A's Prerender table (mixed inline + blob payloads) + * so the audit + replication traffic is realistic. + * - Once traffic is flowing, SIGKILL B with HARPER_NO_FLUSH_ON_EXIT so its + * audit tail is unflushed. Restart it immediately. Continue churn so + * catch-up overlaps with replay. + * - Stop traffic. Wait for convergence. + * + * Assertions: + * 1. record_count drift across all three nodes < 1 % (replay didn't double-apply + * and catch-up didn't lose rows). + * 2. B logs include at least one "Replayed N records" warn (i.e. the seam was + * actually exercised — if zero replays, the test isn't proving anything). + * 3. Zero "Error writing from replay of log" and zero "Error committing replay + * transaction" lines on any node. + * 4. Zero uncaughtException, zero blob orphan markers (since blobs are involved + * in the workload), zero OOM markers. + * + * Run: + * HARPER_RUN_STRESS_TESTS=1 \ + * npm run test:integration -- integrationTests/stress/replayCatchupSeam.test.mjs + */ + +import { suite, test, before, after } from 'node:test'; +import { ok } from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import { join } from 'node:path'; +import { + startHarper, + teardownHarper, + killHarper, + getNextAvailableLoopbackAddress, + targz, +} from '@harperfast/integration-testing'; +import { + stressEnabled, + sendOperation, + fetchWithRetry, + concurrent, + readLog, + waitForAllConnected, + prerenderId, +} from './stressShared.mjs'; + +process.env.HARPER_INTEGRATION_TEST_INSTALL_SCRIPT = join( + import.meta.dirname ?? module.path, + '..', + '..', + 'dist', + 'bin', + 'harper.js' +); + +if (!stressEnabled()) { + suite('Replay/catch-up seam (skipped)', () => { + test('skipped — set HARPER_RUN_STRESS_TESTS=1 to enable', { skip: true }, () => {}); + }); +} else { + const THREADS_PER_NODE = 4; + const KEYSPACE = Number(process.env.HARPER_STRESS_REPLAY_KEYS ?? 200); + const PRE_KILL_SECS = Number(process.env.HARPER_STRESS_REPLAY_PRE_KILL_SECS ?? 45); + const POST_KILL_SECS = Number(process.env.HARPER_STRESS_REPLAY_POST_KILL_SECS ?? 60); + const SUITE_TIMEOUT_MS = (PRE_KILL_SECS + POST_KILL_SECS + 240) * 1000; + + suite('Crash recovery: replayLogs vs catch-up seam', { timeout: SUITE_TIMEOUT_MS }, (ctx) => { + before(async () => { + const cfg = (host) => ({ + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: host + ':9933' }, + threads: { count: THREADS_PER_NODE }, + }); + ctx.nodes = await Promise.all( + [0, 1, 2].map(async () => { + const node = { name: ctx.name, harper: { hostname: await getNextAvailableLoopbackAddress() } }; + await startHarper(node, { config: cfg(node.harper.hostname), env: { HARPER_NO_FLUSH_ON_EXIT: true } }); + return node.harper; + }) + ); + + const tokenResp = await sendOperation(ctx.nodes[0], { + operation: 'create_authentication_tokens', + authorization: ctx.nodes[0].admin, + }); + for (let i = 1; i < ctx.nodes.length; i++) { + await sendOperation(ctx.nodes[i], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[0].hostname, + authorization: 'Bearer ' + tokenResp.operation_token, + }); + } + for (let i = 1; i < ctx.nodes.length; i++) { + await waitForAllConnected(ctx.nodes[i], { timeoutMs: 60_000 }); + } + + const payload = await targz(join(import.meta.dirname, 'fixture-prerender-workload')); + await sendOperation(ctx.nodes[0], { + operation: 'deploy_component', + project: 'prerender-workload', + payload, + replicated: true, + restart: true, + }); + await delay(40_000); + for (let i = 1; i < ctx.nodes.length; i++) { + await waitForAllConnected(ctx.nodes[i], { timeoutMs: 90_000 }); + } + }); + + after(async () => { + if (!ctx.nodes) return; + await Promise.all(ctx.nodes.map((n) => teardownHarper({ harper: n }).catch(() => null))); + }); + + test('post-crash replay overlaps with catch-up without duplicating or losing rows', async () => { + const A = ctx.nodes[0]; + let B = ctx.nodes[1]; + const C = ctx.nodes[2]; + + let stopChurn = false; + let writes = 0; + const driver = concurrent(async () => { + if (stopChurn) return; + const id = prerenderId(writes++ % KEYSPACE); + try { + await fetchWithRetry(A.httpURL + '/Prerender/' + encodeURIComponent(id), { retries: 1 }); + } catch { + // transient errors during the crash window are fine + } + }, 8); + const churnLoop = (async () => { + while (!stopChurn) { + await driver.execute(); + await delay(15); + } + await driver.finish(); + })(); + + console.log(`[replay] settling ${PRE_KILL_SECS}s of pre-kill churn`); + await delay(PRE_KILL_SECS * 1000); + + console.log(`[replay] SIGKILL B (${B.hostname}) with HARPER_NO_FLUSH_ON_EXIT`); + await killHarper({ harper: B }); + + const restartCtx = { + name: ctx.name, + harper: { dataRootDir: B.dataRootDir, hostname: B.hostname }, + }; + await startHarper(restartCtx, { + config: { + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: B.hostname + ':9933' }, + threads: { count: THREADS_PER_NODE }, + }, + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + ctx.nodes[1] = restartCtx.harper; + B = restartCtx.harper; + console.log(`[replay] B restarted; ${POST_KILL_SECS}s post-kill churn`); + + await delay(POST_KILL_SECS * 1000); + stopChurn = true; + await churnLoop; + + console.log('[replay] churn stopped; waiting for convergence (up to 120s)'); + const drainDeadline = Date.now() + 120_000; + let counts = { A: -1, B: -1, C: -1 }; + while (Date.now() < drainDeadline) { + const [a, b, c] = await Promise.all([ + sendOperation(A, { operation: 'describe_table', table: 'Prerender' }).catch(() => null), + sendOperation(B, { operation: 'describe_table', table: 'Prerender' }).catch(() => null), + sendOperation(C, { operation: 'describe_table', table: 'Prerender' }).catch(() => null), + ]); + counts = { + A: a?.record_count ?? -1, + B: b?.record_count ?? -1, + C: c?.record_count ?? -1, + }; + if (counts.A > 0 && counts.A === counts.B && counts.A === counts.C) break; + await delay(2000); + } + console.log(`[replay] convergence done: ${JSON.stringify(counts)}`); + + const [logA, logB, logC] = await Promise.all([readLog(A), readLog(B), readLog(C)]); + + // (1) Convergence — strict drift bound, but tolerate a few in-flight rows. + const vals = Object.values(counts); + const minCount = Math.min(...vals); + const maxCount = Math.max(...vals); + const drift = maxCount > 0 ? (maxCount - minCount) / maxCount : 0; + ok(drift < 0.01, `record_count diverged > 1%: ${JSON.stringify(counts)} drift ${(drift * 100).toFixed(2)}%`); + ok(maxCount > 0, `record_count must be > 0 (saw ${maxCount}); was churn firing?`); + + // (2) B must have actually replayed — otherwise the seam wasn't exercised. + const replayedRe = /\[warn\].*Replayed \d+ records in .* database/g; + const replayLines = logB.match(replayedRe) ?? []; + ok( + replayLines.length > 0, + `B did not log any "Replayed N records" — test did not exercise replay (HARPER_NO_FLUSH_ON_EXIT not honored, or kill happened before any unflushed writes). Sample of B log tail:\n${logB.slice(-2000)}` + ); + + // (3) No replay errors on any node. + const replayErrRe = /Error (writing from replay of log|committing replay transaction)/g; + const replayErrorsA = logA.match(replayErrRe) ?? []; + const replayErrorsB = logB.match(replayErrRe) ?? []; + const replayErrorsC = logC.match(replayErrRe) ?? []; + ok( + replayErrorsA.length === 0 && replayErrorsB.length === 0 && replayErrorsC.length === 0, + `replay errors: A=${replayErrorsA.length} B=${replayErrorsB.length} C=${replayErrorsC.length}. ` + + `Sample: ${replayErrorsB[0] ?? replayErrorsA[0] ?? replayErrorsC[0]}` + ); + + // (4) No uncaught / OOM / orphan markers. + const uncaughtRe = /\[error\]: uncaughtException/g; + const orphanRe = /\[error\] \[replication\]: Error sending blob.*ENOENT/g; + const oomRe = /JavaScript heap out of memory|FATAL ERROR.*Allocation failed/g; + for (const [name, log] of [ + ['A', logA], + ['B', logB], + ['C', logC], + ]) { + const u = (log.match(uncaughtRe) ?? []).length; + const o = (log.match(orphanRe) ?? []).length; + const m = (log.match(oomRe) ?? []).length; + ok(u === 0, `${name} logged ${u} uncaughtException`); + ok(o === 0, `${name} logged ${o} blob orphan markers`); + ok(m === 0, `${name} logged ${m} OOM markers`); + } + + console.log( + `[replay] completed: writes=${writes} counts=${JSON.stringify(counts)} ` + `replayLines=${replayLines.length}` + ); + }); + }); +} diff --git a/integrationTests/stress/replicationProxy.mjs b/integrationTests/stress/replicationProxy.mjs new file mode 100644 index 000000000..c703eda54 --- /dev/null +++ b/integrationTests/stress/replicationProxy.mjs @@ -0,0 +1,90 @@ +/** + * Controllable TCP proxy used by partition-tolerance tests. + * + * Replication on loopback can't be partitioned with iptables/tc (no NET_ADMIN, + * loopback bypasses netfilter), so we interpose a userspace proxy between two + * Harper nodes and toggle it from the test to simulate a network partition. + * + * Usage: + * const proxy = new ReplicationProxy({ listenHost, listenPort, targetHost, targetPort }); + * await proxy.start(); + * // ... later in the test: + * proxy.block(); // close existing connections, drop new ones until unblocked + * proxy.unblock(); // accept again + * await proxy.stop(); + * + * Connection semantics while blocked: + * - existing client sockets are destroyed (forces both ends to notice + * the partition immediately rather than waiting on a keepalive) + * - new incoming sockets are accepted-then-destroyed, so the client sees + * a fast connect-then-close (more "drop-y" than refused) + * + * No TLS termination — Harper's replication socket is already encrypted by + * Harper. The proxy is purely a TCP pipe. + */ + +import { createServer, connect } from 'node:net'; + +export class ReplicationProxy { + constructor({ listenHost, listenPort, targetHost, targetPort }) { + this.listenHost = listenHost; + this.listenPort = listenPort; + this.targetHost = targetHost; + this.targetPort = targetPort; + this.blocked = false; + this.connections = new Set(); + this.server = null; + } + + start() { + return new Promise((resolve, reject) => { + this.server = createServer((clientSocket) => { + if (this.blocked) { + clientSocket.destroy(); + return; + } + const upstream = connect({ host: this.targetHost, port: this.targetPort }); + const pair = { clientSocket, upstream, cleaned: false }; + this.connections.add(pair); + const cleanup = () => { + // Both sockets emit 'close' after the first .destroy(); guard so + // we only tear down once per pair instead of fanning out. + if (pair.cleaned) return; + pair.cleaned = true; + this.connections.delete(pair); + clientSocket.destroy(); + upstream.destroy(); + }; + clientSocket.on('error', cleanup); + upstream.on('error', cleanup); + clientSocket.on('close', cleanup); + upstream.on('close', cleanup); + clientSocket.pipe(upstream); + upstream.pipe(clientSocket); + }); + this.server.on('error', reject); + this.server.listen(this.listenPort, this.listenHost, () => resolve()); + }); + } + + block() { + this.blocked = true; + for (const pair of this.connections) { + pair.clientSocket.destroy(); + pair.upstream.destroy(); + } + this.connections.clear(); + } + + unblock() { + this.blocked = false; + } + + stop() { + this.block(); + return new Promise((resolve) => { + if (!this.server) return resolve(); + this.server.close(() => resolve()); + }); + } +} diff --git a/integrationTests/stress/slowConsumerBackpressure.test.mjs b/integrationTests/stress/slowConsumerBackpressure.test.mjs new file mode 100644 index 000000000..3fc909ce4 --- /dev/null +++ b/integrationTests/stress/slowConsumerBackpressure.test.mjs @@ -0,0 +1,262 @@ +/** + * Slow-consumer backpressure. + * + * Background: when a peer applies replication writes more slowly than the + * sender produces them, the sender's per-peer queue grows. Harper's + * replication uses backpressure signals (visible as `backPressurePercent` + * on each database socket in `cluster_status`) to throttle further sends. + * Two regressions we want to catch: + * 1. Sender doesn't cap the queue — RSS grows unboundedly until OOM. + * 2. Backpressure is signaled, but the sender ignores it — same outcome. + * + * This is hard to repro deterministically on loopback because everything + * is so fast. We force the asymmetry two ways: + * - A has 4 worker threads, B has 1 — apply parallelism diverges 4x. + * - The driver writes at high concurrency (16) so A's audit/replication + * producers are saturated. + * + * Even with this setup, on a fast machine B may still keep up. The test is + * still useful: it asserts A doesn't OOM during the run, that the cluster + * recovers to convergence after we stop, and that *if* backpressure was + * signaled at any point, it correlated with A's queue NOT growing without + * bound. Treat the "no backpressure observed at all" case as a soft warning + * (logged), not a fail — the asymmetry on loopback may not be enough to + * trigger it. + * + * Mechanism: + * - 2-node mesh (A: 4 threads, B: 1 thread). + * - Drive churn against A at concurrency 16 for HARPER_STRESS_SLOW_MINUTES. + * - Sample A's RSS every 2 s; poll cluster_status every 5 s to capture + * backPressurePercent on A→B socket. + * - Stop churn; wait for convergence. + * + * Assertions: + * 1. Peak RSS on A < SLOW_RSS_CAP_MB (1500 MB default). + * 2. Convergence on record_count within 120 s of stop. + * 3. No uncaught / OOM / orphan markers on either node. + * 4. Soft: if backPressurePercent ever exceeded 0, log a "[slow] backpressure + * observed" line; otherwise log a "[slow] no backpressure observed" warn + * (test doesn't fail, but reviewer should note the asymmetry wasn't enough). + * + * Run: + * HARPER_RUN_STRESS_TESTS=1 HARPER_STRESS_SLOW_MINUTES=5 \ + * npm run test:integration -- integrationTests/stress/slowConsumerBackpressure.test.mjs + */ + +import { suite, test, before, after } from 'node:test'; +import { ok } from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import { join } from 'node:path'; +import { startHarper, teardownHarper, getNextAvailableLoopbackAddress, targz } from '@harperfast/integration-testing'; +import { + stressEnabled, + sendOperation, + trySendOperation, + fetchWithRetry, + concurrent, + readLog, + waitForAllConnected, + sampleMetrics, + summariseSamples, + mb, + prerenderId, +} from './stressShared.mjs'; + +process.env.HARPER_INTEGRATION_TEST_INSTALL_SCRIPT = join( + import.meta.dirname ?? module.path, + '..', + '..', + 'dist', + 'bin', + 'harper.js' +); + +if (!stressEnabled()) { + suite('Slow-consumer backpressure (skipped)', () => { + test('skipped — set HARPER_RUN_STRESS_TESTS=1 to enable', { skip: true }, () => {}); + }); +} else { + const A_THREADS = 4; + const B_THREADS = 1; + const KEYSPACE = Number(process.env.HARPER_STRESS_SLOW_KEYS ?? 500); + const TOTAL_MINUTES = Number(process.env.HARPER_STRESS_SLOW_MINUTES ?? 5); + const SLOW_RSS_CAP_MB = Number(process.env.HARPER_STRESS_SLOW_RSS_CAP_MB ?? 1500); + const SUITE_TIMEOUT_MS = (TOTAL_MINUTES + 5) * 60_000; + + suite('Slow-consumer backpressure', { timeout: SUITE_TIMEOUT_MS }, (ctx) => { + before(async () => { + const aHost = await getNextAvailableLoopbackAddress(); + const bHost = await getNextAvailableLoopbackAddress(); + const cfgFor = (host, threads) => ({ + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: host + ':9933' }, + threads: { count: threads }, + }); + const nodeA = { name: ctx.name, harper: { hostname: aHost } }; + const nodeB = { name: ctx.name, harper: { hostname: bHost } }; + await startHarper(nodeA, { + config: cfgFor(aHost, A_THREADS), + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + await startHarper(nodeB, { + config: cfgFor(bHost, B_THREADS), + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + ctx.nodes = [nodeA.harper, nodeB.harper]; + + const tokenResp = await sendOperation(ctx.nodes[0], { + operation: 'create_authentication_tokens', + authorization: ctx.nodes[0].admin, + }); + await sendOperation(ctx.nodes[1], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[0].hostname, + authorization: 'Bearer ' + tokenResp.operation_token, + }); + await waitForAllConnected(ctx.nodes[1], { timeoutMs: 60_000 }); + + const payload = await targz(join(import.meta.dirname, 'fixture-prerender-workload')); + await sendOperation(ctx.nodes[0], { + operation: 'deploy_component', + project: 'prerender-workload', + payload, + replicated: true, + restart: true, + }); + await delay(40_000); + await waitForAllConnected(ctx.nodes[1], { timeoutMs: 90_000 }); + }); + + after(async () => { + if (!ctx.nodes) return; + await Promise.all(ctx.nodes.map((n) => teardownHarper({ harper: n }).catch(() => null))); + }); + + test('sender does not OOM and cluster reconverges after slow-consumer pressure', async () => { + const [A, B] = ctx.nodes; + + const aSampler = sampleMetrics(A, { intervalMs: 2000 }); + const bpReadings = []; // { t, percent } + + let stopChurn = false; + let writes = 0; + + const bpPoller = (async () => { + while (!stopChurn) { + const status = await trySendOperation(A, { operation: 'cluster_status' }); + if (status?.connections) { + for (const c of status.connections) { + for (const sock of c.database_sockets ?? []) { + const p = sock.backPressurePercent ?? 0; + if (p > 0) { + bpReadings.push({ t: Date.now(), db: sock.database, percent: p }); + } + } + } + } + await delay(5000); + } + })(); + const driver = concurrent(async () => { + if (stopChurn) return; + const id = prerenderId(writes++ % KEYSPACE); + try { + await fetchWithRetry(A.httpURL + '/Prerender/' + encodeURIComponent(id), { retries: 1 }); + } catch { + // fine + } + }, 16); + const churnLoop = (async () => { + while (!stopChurn) { + await driver.execute(); + await delay(5); + } + await driver.finish(); + })(); + + const endAt = Date.now() + TOTAL_MINUTES * 60_000; + while (Date.now() < endAt) { + const minsLeft = Math.ceil((endAt - Date.now()) / 60_000); + console.log(`[slow] writes=${writes} bpHits=${bpReadings.length} remainingMins=${minsLeft}`); + await delay(30_000); + } + + stopChurn = true; + await churnLoop; + await bpPoller; + console.log(`[slow] churn stopped; total writes=${writes}; bp readings=${bpReadings.length}`); + + // Convergence wait. + const drainDeadline = Date.now() + 180_000; + let counts = { A: -1, B: -1 }; + let convergedAt = null; + while (Date.now() < drainDeadline) { + const [a, b] = await Promise.all([ + sendOperation(A, { operation: 'describe_table', table: 'Prerender' }).catch(() => null), + sendOperation(B, { operation: 'describe_table', table: 'Prerender' }).catch(() => null), + ]); + counts = { A: a?.record_count ?? -1, B: b?.record_count ?? -1 }; + if (counts.A > 0 && counts.A === counts.B) { + convergedAt = Date.now(); + break; + } + console.log(`[slow] catchup: ${JSON.stringify(counts)}`); + await delay(3000); + } + + const aSummary = summariseSamples(aSampler.stop()); + const [logA, logB] = await Promise.all([readLog(A), readLog(B)]); + + // (1) RSS cap. + const peakAMb = aSummary.peakRss / 1024 / 1024; + ok( + peakAMb < SLOW_RSS_CAP_MB, + `A peak RSS ${peakAMb.toFixed(0)} MB exceeded cap ${SLOW_RSS_CAP_MB} MB (samples=${aSummary.sampleCount})` + ); + + // (2) Convergence with bounded drift. + const minCount = Math.min(counts.A, counts.B); + const maxCount = Math.max(counts.A, counts.B); + const drift = maxCount > 0 ? (maxCount - minCount) / maxCount : 1; + ok( + convergedAt !== null || drift < 0.01, + `did not converge within 180s and drift > 1%: A=${counts.A} B=${counts.B} drift=${(drift * 100).toFixed(2)}%` + ); + ok(maxCount > 0, `record_count must be > 0 (saw ${maxCount}); was churn firing?`); + + // (3) No uncaught / OOM / orphan markers. + const uncaughtRe = /\[error\]: uncaughtException/g; + const orphanRe = /\[error\] \[replication\]: Error sending blob.*ENOENT/g; + const oomRe = /JavaScript heap out of memory|FATAL ERROR.*Allocation failed/g; + for (const [name, log] of [ + ['A', logA], + ['B', logB], + ]) { + const u = (log.match(uncaughtRe) ?? []).length; + const o = (log.match(orphanRe) ?? []).length; + const m = (log.match(oomRe) ?? []).length; + ok(u === 0, `${name} logged ${u} uncaughtException`); + ok(o === 0, `${name} logged ${o} blob orphan markers`); + ok(m === 0, `${name} logged ${m} OOM markers`); + } + + // (4) Soft check — did we actually observe backpressure? + if (bpReadings.length === 0) { + console.warn( + `[slow] WARN: no backPressurePercent > 0 observed during ${TOTAL_MINUTES}m of asymmetric churn. ` + + `B may have kept up with A; consider raising concurrency, payload size, or duration.` + ); + } else { + const maxBp = bpReadings.reduce((m, r) => Math.max(m, r.percent), 0); + console.log(`[slow] backpressure observed: ${bpReadings.length} readings, max=${maxBp}%`); + } + + console.log( + `[slow] completed: writes=${writes} peakRss=${mb(aSummary.peakRss)} counts=${JSON.stringify(counts)} ` + + `bpReadings=${bpReadings.length}` + ); + }); + }); +} diff --git a/integrationTests/stress/soakWithRollingRestarts.test.mjs b/integrationTests/stress/soakWithRollingRestarts.test.mjs new file mode 100644 index 000000000..1920b6d90 --- /dev/null +++ b/integrationTests/stress/soakWithRollingRestarts.test.mjs @@ -0,0 +1,384 @@ +/** + * Long-running soak test that recreates the wtk-ap-west-1 failure mode. + * + * Background (production, May 2026): + * - 12-node cluster running a prerender cache table (Norton-style URL+device + * keys, mix of inline-stored small JSON and file-backed large HTML). + * - One node accumulated hours of cache writes, then got unhappy: peer + * catch-up after a restart OOM'd the receive worker → restart → catch-up + * OOM'd again → ~25s crash loop, 284 OOM events in 2 hours. Replication + * reassignments cascaded onto surviving workers, killing them in turn. + * + * What this test does: + * - Brings up a 4-node cluster with `THREADS_COUNT=4`. Four nodes is enough + * for replication to be non-trivial and reassignment to happen on + * multiple peers when one node dies; four worker threads make the + * per-worker reassignment stagger fix (PR #147) observable. + * - Deploys a prerender-style component (mixed small/large blobs, see + * fixture-prerender-workload/) to all nodes. + * - Runs HTTP traffic continuously from a small client pool against all + * four nodes, generating cache-miss writes. + * - Every CYCLE_MINUTES, picks a random node, `killHarper()`s it + * (SIGTERM→SIGKILL fast path), then restarts it. Lets it catch up while + * traffic continues on the other three. + * - Repeats for HARPER_STRESS_SOAK_MINUTES total (default 20 minutes + * locally; 240 in the workflow). + * + * Hard assertions evaluated *after* the soak completes: + * 1. No `ERR_WORKER_OUT_OF_MEMORY` in any node's hdb.log. + * 2. No `uncaughtException` in any node's hdb.log. + * 3. No `Error sending blob` ENOENT lines in any node's hdb.log — these + * would indicate the blob-orphan pattern we observed on qub. (The + * `Blob save failed` receive-side lines that PR #149's test exercises + * should not appear here either, because no fault injection is active.) + * 4. After the final node returns and traffic settles, every node's + * `record_count` on Prerender matches (full convergence). + * 5. Peak per-process RSS on every node stays under 1.5 GB throughout. + * + * Sampled invariants checked every 30s, with snapshots dumped to stdout for + * post-hoc analysis: + * - cluster_status shows all peers connected (transient disconnects during + * a restart cycle are allowed; permanent ones fail the soak). + * - record_count drift across nodes < 1% (eventual consistency). + * + * To run locally: + * HARPER_RUN_STRESS_TESTS=1 HARPER_STRESS_SOAK_MINUTES=10 \ + * npm run test:integration -- integrationTests/stress/soakWithRollingRestarts.test.mjs + * + * To run in CI: trigger the stress-tests.yaml workflow; it sets the env vars. + */ + +import { suite, test, before, after } from 'node:test'; +import { ok } from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import { join } from 'node:path'; +import { + startHarper, + teardownHarper, + killHarper, + getNextAvailableLoopbackAddress, + targz, +} from '@harperfast/integration-testing'; +import { + stressEnabled, + sendOperation, + trySendOperation, + fetchWithRetry, + concurrent, + readLog, + clusterSnapshot, + waitForAllConnected, + waitForRecordCount, + sampleMetrics, + summariseSamples, + mb, + prerenderId, +} from './stressShared.mjs'; + +process.env.HARPER_INTEGRATION_TEST_INSTALL_SCRIPT = join( + import.meta.dirname ?? module.path, + '..', + '..', + 'dist', + 'bin', + 'harper.js' +); + +if (!stressEnabled()) { + // Don't register the suite at all — keeps the default integration runner + // (`integrationTests/**/*.test.mjs`) from accidentally running 20 minutes + // of soak inside a 15-minute CI shard. + suite('Replication soak with rolling restarts (skipped)', () => { + test('skipped — set HARPER_RUN_STRESS_TESTS=1 to enable', { skip: true }, () => {}); + }); +} else { + const NODE_COUNT = 4; + const THREADS_PER_NODE = 4; + const TOTAL_MINUTES = Number(process.env.HARPER_STRESS_SOAK_MINUTES ?? 20); + const CYCLE_SECONDS = Number(process.env.HARPER_STRESS_CYCLE_SECONDS ?? 90); + const TRAFFIC_RPS = Number(process.env.HARPER_STRESS_RPS ?? 30); + const URL_KEYSPACE = Number(process.env.HARPER_STRESS_URL_KEYSPACE ?? 2000); + const PEAK_RSS_LIMIT_BYTES = 1.5 * 1024 * 1024 * 1024; + const SUITE_TIMEOUT_MS = (TOTAL_MINUTES + 5) * 60_000; + + suite('Replication soak with rolling restarts', { timeout: SUITE_TIMEOUT_MS }, (ctx) => { + before(async () => { + ctx.nodes = await Promise.all( + Array.from({ length: NODE_COUNT }).map(async () => { + const node = { name: ctx.name, harper: { hostname: await getNextAvailableLoopbackAddress() } }; + await startHarper(node, { + config: { + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'warn' }, + replication: { securePort: node.harper.hostname + ':9933' }, + threads: { count: THREADS_PER_NODE }, + }, + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + return node.harper; + }) + ); + + // Mesh: every node adds every other node so we get full replication + // across all peers, matching the production fully-connected topology. + const tokens = await Promise.all( + ctx.nodes.map((n) => sendOperation(n, { operation: 'create_authentication_tokens', authorization: n.admin })) + ); + for (let i = 0; i < NODE_COUNT; i++) { + for (let j = 0; j < NODE_COUNT; j++) { + if (i === j) continue; + await sendOperation(ctx.nodes[i], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[j].hostname, + authorization: 'Bearer ' + tokens[j].operation_token, + }); + } + } + // Wait for every node to see every other peer connected. + for (const n of ctx.nodes) await waitForAllConnected(n, { timeoutMs: 90_000 }); + + // Deploy the prerender workload component to one node; replicated:true + // installs it on all peers, then restart=true makes it active everywhere. + const payload = await targz(join(import.meta.dirname, 'fixture-prerender-workload')); + await sendOperation(ctx.nodes[0], { + operation: 'deploy_component', + project: 'prerender-workload', + payload, + replicated: true, + restart: true, + }); + // Restart settle. + await delay(40_000); + // Make sure cluster is healthy again post-restart. + for (const n of ctx.nodes) await waitForAllConnected(n, { timeoutMs: 90_000 }); + }); + + after(async () => { + if (!ctx.nodes) return; + await Promise.all(ctx.nodes.map((n) => teardownHarper({ harper: n }).catch(() => null))); + }); + + test('cluster survives sustained traffic + rolling SIGKILLs without OOM, leaks, or blob orphans', async () => { + const startedAt = Date.now(); + const endAt = startedAt + TOTAL_MINUTES * 60_000; + + // Per-node metric samplers run for the whole soak. We sample every + // 2 seconds — fine-grained enough to catch a memory burst preceding an + // OOM but not so fine-grained that operations API noise dominates. + const samplers = ctx.nodes.map((n) => sampleMetrics(n, { intervalMs: 2000 })); + + // Traffic driver — `concurrent` keeps a pool of in-flight requests. + // Cache-miss responses each create a record + blob on the receiving + // node which then replicates to the other three. + let nextSeq = 0; + let trafficStopped = false; + const concurrencyTarget = Math.max(1, Math.floor(TRAFFIC_RPS / 4)); + const drivers = ctx.nodes.map((n) => { + const driver = concurrent(async () => { + if (trafficStopped) return; + const idSeq = nextSeq++ % URL_KEYSPACE; + const id = prerenderId(idSeq); + try { + await fetchWithRetry(n.httpURL + '/Prerender/' + encodeURIComponent(id), { retries: 1 }); + } catch { + // Transient HTTP errors are expected when we kill a node; + // the soak's assertions live in the post-run log scan. + } + }, concurrencyTarget); + return driver; + }); + // Fire-and-forget feed loop per driver — pace ourselves with a small + // inter-request delay so we don't peg the loopback before Harper. + const feeders = drivers.map(async (driver) => { + while (!trafficStopped) { + await driver.execute(); + await delay(1000 / Math.max(1, TRAFFIC_RPS / NODE_COUNT)); + } + await driver.finish(); + }); + + // Rolling-restart loop. + const cycleEvents = []; + let cycle = 0; + while (Date.now() < endAt) { + cycle++; + const remainingMs = endAt - Date.now(); + const cycleDurationMs = Math.min(CYCLE_SECONDS * 1000, remainingMs); + if (cycleDurationMs < 30_000) break; // not enough time for a full cycle, wind down + const settleMs = Math.floor(cycleDurationMs * 0.55); + const recoverMs = cycleDurationMs - settleMs; + + console.log( + `[soak] cycle ${cycle}/${Math.ceil((TOTAL_MINUTES * 60) / CYCLE_SECONDS)} ` + + `t=${Math.round((Date.now() - startedAt) / 1000)}s ` + + `settling ${Math.round(settleMs / 1000)}s before kill` + ); + await delay(settleMs); + + // Pick the next victim round-robin so each node gets cycled. + const victimIdx = (cycle - 1) % NODE_COUNT; + const victim = ctx.nodes[victimIdx]; + const before = await clusterSnapshot(victim).catch(() => null); + console.log(`[soak] killing node ${victimIdx} (${victim.hostname})`); + const killAt = Date.now(); + await killHarper({ harper: victim }); + + // Restart — preserves dataRootDir and hostname so it rejoins the + // existing cluster state. Update ctx.nodes[victimIdx] with the new + // handle so subsequent same-node cycles (when round-robin wraps) + // kill the actually-running process, not a stale reference. + const restartCtx = { + name: ctx.name, + harper: { dataRootDir: victim.dataRootDir, hostname: victim.hostname }, + }; + await startHarper(restartCtx, { + config: { + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'warn' }, + replication: { securePort: victim.hostname + ':9933' }, + threads: { count: THREADS_PER_NODE }, + }, + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + ctx.nodes[victimIdx] = restartCtx.harper; + const restartedAt = Date.now(); + cycleEvents.push({ cycle, victimIdx, downMs: restartedAt - killAt, beforeSnap: before }); + + // Wait the rest of the cycle for recovery + traffic. + await delay(Math.max(0, recoverMs - (Date.now() - restartedAt))); + + // Light health probe: every cycle, at least one peer should see + // the resurrected victim as connected within 30s of restart. + const sawConnected = await Promise.any( + ctx.nodes + .filter((_, idx) => idx !== victimIdx) + .map(async (peer) => { + const deadline = Date.now() + 30_000; + while (Date.now() < deadline) { + const snap = await clusterSnapshot(peer).catch(() => null); + if ( + snap?.peers.some( + (p) => (p.url ?? '').includes(victim.hostname) && Object.values(p.dbs).every((d) => d.connected) + ) + ) + return true; + await delay(500); + } + return false; + }) + ).catch(() => false); + if (!sawConnected) { + console.warn(`[soak] cycle ${cycle}: no peer saw victim reconnect within 30s (continuing)`); + } + } + + // Stop traffic; let in-flight requests drain. Bound the drain so a + // stuck fetch on a still-recovering node can't hold the test. + console.log('[soak] stopping traffic'); + trafficStopped = true; + await Promise.race([ + Promise.all(feeders), + delay(20_000).then(() => console.log('[soak] feeder drain timed out at 20s — continuing')), + ]); + + console.log('[soak] stopping sampling'); + const allSamples = samplers.map((s) => s.stop()); + + // Allow steady-state convergence: stop writes, give replication time + // to drain on all nodes, then compare record counts. + console.log('[soak] convergence wait 15s'); + await delay(15_000); + + // Reference count = max observed across all nodes (whichever node has + // the highest count is the leader to catch up to). + const counts = await Promise.all( + ctx.nodes.map((n) => + trySendOperation(n, { operation: 'describe_table', table: 'Prerender' }).then((r) => r?.record_count ?? -1) + ) + ); + const target = Math.max(...counts); + console.log(`[soak] post-settle record_count per node: ${counts.join(', ')} — target ${target}`); + + // Allow up to 90s extra convergence after stopping writes — laggards + // may still be replaying buffered events. + for (let i = 0; i < ctx.nodes.length; i++) { + if (counts[i] < target) { + try { + const reached = await waitForRecordCount(ctx.nodes[i], 'Prerender', target, { timeoutMs: 90_000 }); + console.log(`[soak] node ${i} caught up to ${reached}`); + } catch (err) { + console.log(`[soak] node ${i} did not catch up: ${err.message}`); + } + } + } + const finalCounts = await Promise.all( + ctx.nodes.map((n) => + trySendOperation(n, { operation: 'describe_table', table: 'Prerender' }).then((r) => r?.record_count ?? -1) + ) + ); + console.log(`[soak] final record_count per node: ${finalCounts.join(', ')}`); + + // === Assertions === + + // (1) No OOM markers in any node's log. + for (let i = 0; i < ctx.nodes.length; i++) { + const log = await readLog(ctx.nodes[i]); + ok( + !log.includes('ERR_WORKER_OUT_OF_MEMORY'), + `node ${i} (${ctx.nodes[i].hostname}) hit ERR_WORKER_OUT_OF_MEMORY during soak` + ); + } + + // (2) No uncaughtException anywhere. + for (let i = 0; i < ctx.nodes.length; i++) { + const log = await readLog(ctx.nodes[i]); + const uncaught = log.match(/\[error\]: uncaughtException/g) ?? []; + ok( + uncaught.length === 0, + `node ${i} logged ${uncaught.length} uncaughtException(s); first match: ${uncaught[0]}` + ); + } + + // (3) No "Error sending blob ... ENOENT" — the blob-orphan signature. + for (let i = 0; i < ctx.nodes.length; i++) { + const log = await readLog(ctx.nodes[i]); + const orphans = log.match(/\[error\] \[replication\]: Error sending blob.*ENOENT/g) ?? []; + ok( + orphans.length === 0, + `node ${i} logged ${orphans.length} blob-orphan ENOENT(s) during soak; sample:\n ${orphans[0]}` + ); + } + + // (4) Convergence: every node has the same Prerender record_count. + // Allow up to 1% drift to absorb in-flight writes from the moment + // traffic was stopped — the goal is "no permanent divergence", + // not "exact at the instant we sampled". + const minCount = Math.min(...finalCounts); + const maxCount = Math.max(...finalCounts); + const drift = maxCount > 0 ? (maxCount - minCount) / maxCount : 0; + ok( + drift < 0.01, + `record counts diverged > 1% across nodes: ${JSON.stringify(finalCounts)} (drift ${(drift * 100).toFixed(2)}%)` + ); + + // (5) Per-node peak RSS under the limit. + for (let i = 0; i < ctx.nodes.length; i++) { + const summary = summariseSamples(allSamples[i]); + console.log( + `[soak] node ${i}: peakRss=${mb(summary.peakRss)} avgRss=${mb(summary.avgRss)} ` + + `peakWorkerFootprint=${mb(summary.peakThreadFootprint)} samples=${summary.sampleCount}` + ); + ok( + summary.peakRss > 0 && summary.peakRss < PEAK_RSS_LIMIT_BYTES, + `node ${i} peak RSS ${mb(summary.peakRss)} exceeded limit ${mb(PEAK_RSS_LIMIT_BYTES)}` + ); + } + + console.log( + `[soak] completed ${cycle} kill cycles over ${Math.round((Date.now() - startedAt) / 1000)}s; ` + + `final convergent record_count=${finalCounts[0]}` + ); + }); + }); +} diff --git a/integrationTests/stress/stressShared.mjs b/integrationTests/stress/stressShared.mjs new file mode 100644 index 000000000..73ebab72d --- /dev/null +++ b/integrationTests/stress/stressShared.mjs @@ -0,0 +1,251 @@ +/** + * Shared helpers for long-running stress tests in integrationTests/stress/. + * + * These tests are *opt-in* via `HARPER_RUN_STRESS_TESTS=1`. Each test file + * checks this flag and refuses to register a suite when it's missing, so a + * normal `npm run test:integration` doesn't accidentally fire a 30-minute + * soak. Set the flag to actually run them. + */ + +import { equal } from 'node:assert'; +import { readFile } from 'node:fs/promises'; +import { join } from 'node:path'; +import { setTimeout as delay } from 'node:timers/promises'; + +export function stressEnabled() { + return process.env.HARPER_RUN_STRESS_TESTS === '1'; +} + +/** + * Send an operations-API request and assert HTTP 200. + * Mirrors clusterShared.sendOperation; duplicated here to keep stress tests + * independent of the cluster test surface. + */ +export async function sendOperation(node, operation) { + const response = await fetch(node.operationsAPIURL, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(operation), + }); + const data = await response.json(); + equal(response.status, 200, JSON.stringify(data)); + return data; +} + +/** + * Like sendOperation but returns `null` on any failure (used during restart + * windows when the operations API is briefly unreachable). + */ +export async function trySendOperation(node, operation) { + try { + const response = await fetch(node.operationsAPIURL, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(operation), + }); + if (!response.ok) return null; + return await response.json(); + } catch { + return null; + } +} + +/** + * Like the cluster-test fetchWithRetry but with a default per-attempt timeout + * so a stalled connection (e.g. to a node mid-kill) can't hang the test. + */ +export function fetchWithRetry(url, options) { + let retries = options?.retries ?? 20; + const perAttemptTimeoutMs = options?.timeoutMs ?? 5000; + const fetchOpts = { ...options, signal: AbortSignal.timeout(perAttemptTimeoutMs) }; + delete fetchOpts.retries; + delete fetchOpts.timeoutMs; + let response = fetch(url, fetchOpts); + if (retries > 0) { + response = response.catch(() => { + const nextOpts = { ...options, retries: retries - 1 }; + return delay(500).then(() => fetchWithRetry(url, nextOpts)); + }); + } + return response; +} + +export function concurrent(task, concurrency = 20) { + const tasks = Array.from({ length: concurrency }); + let i = 0; + return { + async execute() { + i = (i + 1) % concurrency; + await tasks[i]; + tasks[i] = task(); + }, + finish() { + return Promise.all(tasks); + }, + }; +} + +/** + * Read a node's `hdb.log`. Checks `ctx.harper.logDir` first (set when + * `HARPER_INTEGRATION_TEST_LOG_DIR` is in the env), falls back to + * `{dataRootDir}/log/hdb.log`. Returns '' if neither exists. + */ +export async function readLog(node) { + const candidates = []; + if (node.logDir) candidates.push(join(node.logDir, 'hdb.log')); + if (node.dataRootDir) candidates.push(join(node.dataRootDir, 'log', 'hdb.log')); + for (const path of candidates) { + try { + return await readFile(path, 'utf8'); + } catch (err) { + if (err.code !== 'ENOENT') throw err; + } + } + return ''; +} + +/** + * Capture a structured cluster_status snapshot — uniform shape so callers + * can diff before/after windows without re-parsing nested objects. + */ +export async function clusterSnapshot(node) { + const status = await sendOperation(node, { operation: 'cluster_status' }); + const peers = []; + for (const conn of status.connections ?? []) { + const peer = { + url: conn.url, + name: conn.name, + subscriptions: conn.subscriptions, + dbs: {}, + }; + for (const s of conn.database_sockets ?? []) { + peer.dbs[s.database] = { + connected: s.connected, + lastReceivedVersion: s.lastReceivedVersion ?? null, + lastCommitConfirmed: s.lastCommitConfirmed ?? null, + backPressurePercent: s.backPressurePercent ?? 0, + }; + } + peers.push(peer); + } + return { node_name: status.node_name, peers }; +} + +/** + * Wait until every database socket on `node`'s `cluster_status` reports + * connected. Returns the final snapshot or throws on timeout. + */ +export async function waitForAllConnected(node, opts = {}) { + const deadline = Date.now() + (opts.timeoutMs ?? 60000); + let last; + while (Date.now() < deadline) { + last = await clusterSnapshot(node).catch(() => null); + if (last && last.peers.length > 0 && last.peers.every((p) => Object.values(p.dbs).every((d) => d.connected))) { + return last; + } + await delay(500); + } + throw new Error(`waitForAllConnected timed out; final snapshot: ${JSON.stringify(last)}`); +} + +/** + * Poll record counts on a single table until `node` matches `referenceCount`. + * Returns the final count if it caught up, or throws on timeout. + */ +export async function waitForRecordCount(node, table, referenceCount, opts = {}) { + const deadline = Date.now() + (opts.timeoutMs ?? 120000); + let last = -1; + while (Date.now() < deadline) { + const resp = await trySendOperation(node, { operation: 'describe_table', table }); + if (resp?.record_count !== undefined) { + last = resp.record_count; + if (last >= referenceCount) return last; + } + await delay(opts.pollMs ?? 500); + } + throw new Error(`waitForRecordCount(${table}) timed out at ${last}, want ${referenceCount}`); +} + +/** + * Sample structured metrics from `system_information` at fixed intervals + * and return all samples on stop. Captures memory + thread heap stats and + * the *unique-PID set* per thread role, which is how we detect worker + * restarts (a new pid in the same role means the previous worker died). + */ +export function sampleMetrics(node, opts = {}) { + const interval = opts.intervalMs ?? 1000; + const samples = []; + let stopped = false; + let timer; + const tick = async () => { + if (stopped) return; + const info = await trySendOperation(node, { + operation: 'system_information', + attributes: ['memory', 'threads', 'metrics'], + }); + if (info) { + samples.push({ + t: Date.now(), + rss: info.memory?.rss ?? 0, + threads: (info.threads ?? []).map((th) => ({ + threadId: th.threadId ?? 0, + name: th.name ?? '', + heapUsed: th.heapUsed ?? 0, + externalMemory: th.externalMemory ?? 0, + arrayBuffers: th.arrayBuffers ?? 0, + })), + }); + } + timer = setTimeout(tick, interval); + }; + timer = setTimeout(tick, interval); + return { + samples, + stop() { + stopped = true; + clearTimeout(timer); + return samples; + }, + }; +} + +/** + * Summarise a samples array (from sampleMetrics) into peak/avg figures. + */ +export function summariseSamples(samples) { + if (samples.length === 0) return { peakRss: 0, avgRss: 0, peakThreadFootprint: 0, sampleCount: 0 }; + let peakRss = 0; + let sumRss = 0; + let peakThreadFootprint = 0; + for (const s of samples) { + if (s.rss > peakRss) peakRss = s.rss; + sumRss += s.rss; + for (const t of s.threads) { + const f = (t.heapUsed || 0) + (t.externalMemory || 0) + (t.arrayBuffers || 0); + if (f > peakThreadFootprint) peakThreadFootprint = f; + } + } + return { + peakRss, + avgRss: Math.floor(sumRss / samples.length), + peakThreadFootprint, + sampleCount: samples.length, + }; +} + +const MB = 1024 * 1024; +export function mb(bytes) { + return (bytes / MB).toFixed(0) + ' MB'; +} + +/** + * Generate a deterministic-but-varied prerender-style record id like + * "https://example.com/path/|". Mimics the Norton URL+device + * tuple pattern from the wtk prerender table without depending on a real + * URL list. + */ +const DEVICES = ['mobile', 'desktop', 'tablet']; +export function prerenderId(seq) { + const dev = DEVICES[seq % DEVICES.length]; + return `https://example.com/path/${seq}|${dev}`; +} diff --git a/integrationTests/stress/workerExitCascade.test.mjs b/integrationTests/stress/workerExitCascade.test.mjs new file mode 100644 index 000000000..78358a6bd --- /dev/null +++ b/integrationTests/stress/workerExitCascade.test.mjs @@ -0,0 +1,261 @@ +/** + * Worker-exit cascade regression test for PR #147's third fix + * (`WORKER_EXIT_REASSIGN_STAGGER_MS`). + * + * Background: when a worker died, harper-pro's subscriptionManager + * reassigned every `onDatabase` subscription that worker held to other + * workers — all in the same event-loop tick. With the receive backpressure + * fix in place, the *immediate* OOM hazard was contained, but a fresh + * worker still got slammed with several catch-up connections at once. The + * stagger fix spaces these reassignments by 100ms via a rolling + * `nextWorkerExitReassignAt` timestamp. + * + * The existing receiveBacklogMemory test runs with `THREADS_COUNT=1`; only + * one worker, no reassignment possible. This test deliberately runs with + * 4 worker threads, kills exactly one mid-load, and then inspects the log + * for the spacing of the post-death "Setting up subscription with leader" + * lines. + * + * Setup: + * - Node A: leader, continuously writing + * - Node B: target — 4 worker threads, has the SuicideWorker component + * installed so the test driver can kill a worker via HTTP + * + * Sequence: + * 1. Start both nodes, connect, deploy components. + * 2. Begin a sustained write workload on A targeting multiple databases + * (so each one becomes a separate subscription on B → multiple + * reassignments on worker death). + * 3. After the workload steadies, hit /SuicideWorker on B. The worker + * receiving the request exits with code 137. + * 4. Watch B's `system_information.threads` over the next 30s and B's + * log for "Setting up subscription with leader" lines. + * + * Assertions: + * - Exactly one new worker PID appears in B's thread list after the + * kill (no cascade — only the worker we explicitly killed died). + * - Subscription-reassignment log lines in the post-kill window are + * spaced at intervals ≥ 80 ms (the configured stagger is 100 ms; + * allow a 20 ms slop for log-timestamp resolution). + * - B continues committing replication after the kill (its + * `lastReceivedVersion` for A's connection still advances). + * + * To run locally: + * HARPER_RUN_STRESS_TESTS=1 \ + * npm run test:integration -- integrationTests/stress/workerExitCascade.test.mjs + */ + +import { suite, test, before, after } from 'node:test'; +import { ok, equal } from 'node:assert'; +import { setTimeout as delay } from 'node:timers/promises'; +import { join } from 'node:path'; +import { + startHarper, + teardownHarper, + getNextAvailableLoopbackAddress, + setupHarperWithFixture, +} from '@harperfast/integration-testing'; +import { + stressEnabled, + sendOperation, + fetchWithRetry, + readLog, + clusterSnapshot, + waitForAllConnected, +} from './stressShared.mjs'; + +process.env.HARPER_INTEGRATION_TEST_INSTALL_SCRIPT = join( + import.meta.dirname ?? module.path, + '..', + '..', + 'dist', + 'bin', + 'harper.js' +); + +if (!stressEnabled()) { + suite('Worker exit cascade (skipped)', () => { + test('skipped — set HARPER_RUN_STRESS_TESTS=1 to enable', { skip: true }, () => {}); + }); +} else { + const THREADS_PER_NODE = 4; + const DB_COUNT = 6; // 6 dbs × 1 table each → 6 subscriptions to reassign on worker death + + suite('Worker exit reassignment is staggered, not stampede', { timeout: 240_000 }, (ctx) => { + before(async () => { + const nodeA = { name: ctx.name, harper: { hostname: await getNextAvailableLoopbackAddress() } }; + const nodeB = { name: ctx.name, harper: { hostname: await getNextAvailableLoopbackAddress() } }; + const cfg = (host) => ({ + analytics: { aggregatePeriod: -1 }, + logging: { colors: false, console: true, level: 'debug' }, + replication: { securePort: host + ':9933' }, + threads: { count: THREADS_PER_NODE }, + }); + await startHarper(nodeA, { config: cfg(nodeA.harper.hostname), env: { HARPER_NO_FLUSH_ON_EXIT: true } }); + // B is initialized via setupHarperWithFixture so the SuicideWorker + // component lives at {dataRoot}/components/fixture-suicide-worker + // before Harper boots and scans the components dir. + await setupHarperWithFixture(nodeB, join(import.meta.dirname, 'fixture-suicide-worker'), { + config: cfg(nodeB.harper.hostname), + env: { HARPER_NO_FLUSH_ON_EXIT: true }, + }); + ctx.nodes = [nodeA.harper, nodeB.harper]; + + // Mesh: B adds A. + const tokenResp = await sendOperation(ctx.nodes[0], { + operation: 'create_authentication_tokens', + authorization: ctx.nodes[0].admin, + }); + await sendOperation(ctx.nodes[1], { + operation: 'add_node', + rejectUnauthorized: false, + hostname: ctx.nodes[0].hostname, + authorization: 'Bearer ' + tokenResp.operation_token, + }); + + // Create several databases so multiple subscriptions form on B (each + // db is a separate WS in replicationConnection). This makes the + // stagger observable — with one db there's nothing to space out. + for (let i = 0; i < DB_COUNT; i++) { + for (const n of ctx.nodes) { + await sendOperation(n, { + operation: 'create_table', + database: `stress_db${i}`, + table: 'load', + primary_key: 'id', + attributes: [ + { name: 'id', type: 'String' }, + { name: 'payload', type: 'String' }, + ], + }); + } + } + await waitForAllConnected(ctx.nodes[1], { timeoutMs: 60_000 }); + }); + + after(async () => { + if (!ctx.nodes) return; + await Promise.all(ctx.nodes.map((n) => teardownHarper({ harper: n }).catch(() => null))); + }); + + test('killing a worker mid-load reassigns subscriptions with ≥80ms stagger and no cascade', async () => { + const [A, B] = ctx.nodes; + + // Begin a sustained write workload — small upserts across all dbs so + // replication is actually flowing when we kill the worker. + let stopWriting = false; + const writers = []; + for (let dbI = 0; dbI < DB_COUNT; dbI++) { + writers.push( + (async () => { + let n = 0; + while (!stopWriting) { + try { + await sendOperation(A, { + operation: 'upsert', + database: `stress_db${dbI}`, + table: 'load', + records: [{ id: `r${n++}`, payload: 'x'.repeat(64) }], + }); + } catch { + // Transient hiccups OK during reassignment. + } + await delay(25); + } + })() + ); + } + + // Let the cluster reach a steady catch-up state before we perturb it. + await delay(8000); + + // Snapshot B's threads BEFORE the kill so we can detect new PIDs. + const beforeInfo = await sendOperation(B, { + operation: 'system_information', + attributes: ['threads'], + }); + const beforePIDs = new Set((beforeInfo.threads ?? []).map((t) => `${t.name}:${t.threadId}`)); + console.log(`[cascade] before-kill threads: ${[...beforePIDs].join(', ')}`); + + // Fire the SuicideWorker endpoint on B. Note: not retrying — we want + // exactly one worker to die. + const killAt = Date.now(); + const resp = await fetchWithRetry(B.httpURL + '/SuicideWorker', { retries: 3 }); + const suicide = await resp.json().catch(() => null); + console.log(`[cascade] suicide response: ${JSON.stringify(suicide)}`); + ok(suicide?.threadId, 'SuicideWorker endpoint did not return a threadId'); + + // Watch for 30 seconds. + await delay(30_000); + + // Stop writing and let things settle so we don't race the assertion. + stopWriting = true; + await Promise.all(writers); + + // Inspect B's threads now. + const afterInfo = await sendOperation(B, { + operation: 'system_information', + attributes: ['threads'], + }); + const afterPIDs = new Set((afterInfo.threads ?? []).map((t) => `${t.name}:${t.threadId}`)); + console.log(`[cascade] after-kill threads: ${[...afterPIDs].join(', ')}`); + + // Look at B's log for reassignment markers in the post-kill window. + const log = await readLog(B); + const killISO = new Date(killAt).toISOString(); + // Match lines like: + // 2026-05-19T05:00:00.123Z [main/0] [warn]: Setting up subscription with leader hostX + const reassignRe = /^(\d{4}-\d{2}-\d{2}T[\d:.]+Z).*Setting up subscription with leader/gm; + const reassignTimestamps = []; + for (const m of log.matchAll(reassignRe)) { + if (m[1] >= killISO) reassignTimestamps.push(new Date(m[1]).getTime()); + } + reassignTimestamps.sort((a, b) => a - b); + console.log( + `[cascade] reassignment timestamps after kill (count=${reassignTimestamps.length}):`, + reassignTimestamps.map((t) => t - killAt).join('ms, ') + 'ms' + ); + + // === Assertions === + + // (1) At least one reassignment happened — sanity, otherwise the test + // proves nothing. + ok( + reassignTimestamps.length >= 1, + `expected at least one reassignment log line after kill; got ${reassignTimestamps.length}` + ); + + // (2) Pairwise gaps between consecutive reassignments should be + // ≥80 ms. The configured stagger is 100 ms; we allow slack for + // timestamp truncation and event-loop scheduling. + if (reassignTimestamps.length >= 2) { + const gaps = reassignTimestamps.slice(1).map((t, i) => t - reassignTimestamps[i]); + const tooSmall = gaps.filter((g) => g < 80); + ok( + tooSmall.length <= 1, // at most one near-zero gap (the bookkeeping for the very first reassignment after death) + `expected reassignments to be staggered ≥80ms apart; got gaps ${gaps.join('ms, ')}ms` + ); + } + + // (3) Only one *new* worker PID should appear post-kill. If multiple + // workers died from the cascade, we'd see two or more new PIDs. + const newPIDs = [...afterPIDs].filter((p) => !beforePIDs.has(p)); + const goneePIDs = [...beforePIDs].filter((p) => !afterPIDs.has(p)); + console.log(`[cascade] new PIDs after kill:`, newPIDs); + console.log(`[cascade] gone PIDs after kill:`, goneePIDs); + ok(newPIDs.length <= 1, `expected ≤1 new worker after kill, saw ${newPIDs.length}: ${newPIDs.join(',')}`); + ok(goneePIDs.length <= 1, `expected ≤1 worker to have died, saw ${goneePIDs.length}: ${goneePIDs.join(',')}`); + + // (4) Replication still advancing on B. + const finalSnap = await clusterSnapshot(B); + const allStillConnected = finalSnap.peers.every((p) => Object.values(p.dbs).every((d) => d.connected)); + ok(allStillConnected, `B not fully reconnected post-kill: ${JSON.stringify(finalSnap)}`); + + // (5) Crisp invariant: NO ERR_WORKER_OUT_OF_MEMORY or uncaughtException + // in B's log — even one means the cascade fix regressed. + ok(!log.includes('ERR_WORKER_OUT_OF_MEMORY'), 'B logged ERR_WORKER_OUT_OF_MEMORY'); + const uncaught = log.match(/\[error\]: uncaughtException/g) ?? []; + equal(uncaught.length, 0, `B logged ${uncaught.length} uncaughtException(s)`); + }); + }); +}