diff --git a/components/Application.ts b/components/Application.ts index a6d044aaa..681e8abdb 100644 --- a/components/Application.ts +++ b/components/Application.ts @@ -21,6 +21,7 @@ import { spawn } from 'node:child_process'; import { createReadStream, existsSync, readdirSync } from 'node:fs'; import { Readable } from 'node:stream'; import { pipeline } from 'node:stream/promises'; +import { StringDecoder } from 'node:string_decoder'; import { extract } from 'tar-fs'; import gunzip from 'gunzip-maybe'; @@ -306,12 +307,16 @@ export async function installApplication(application: Application) { // If custom install command is specified, run it if (application.install?.command) { const [command, ...args] = application.install.command.split(' '); + const customOnLine = application.onInstallLine + ? (stream: 'stdout' | 'stderr', line: string) => application.onInstallLine!(command, stream, line) + : undefined; const { stdout, stderr, code } = await nonInteractiveSpawn( application.name, command, args, application.dirPath, - application.install?.timeout + application.install?.timeout, + customOnLine ); // if it succeeds, return if (code === 0) { @@ -362,12 +367,16 @@ export async function installApplication(application: Application) { // Would result in `pnpm@7` being used as the executable. // Important note: an `npm` version should not be specifiable; the only valid npm version is the one installed alongside Node.js + const pmOnLine = application.onInstallLine + ? (stream: 'stdout' | 'stderr', line: string) => application.onInstallLine!(packageManager.name, stream, line) + : undefined; const { stdout, stderr, code } = await nonInteractiveSpawn( application.name, (application.packageManagerPrefix ? application.packageManagerPrefix + ' ' : '') + packageManager.name, application.install?.allowInstallScripts ? ['install'] : ['install', '--ignore-scripts'], // All of `npm`, `yarn`, and `pnpm` support the `install` command. If we need to configure options here we may have to use some other defaults though application.dirPath, - application.install?.timeout + application.install?.timeout, + pmOnLine ); // if it succeeds, return @@ -414,12 +423,16 @@ export async function installApplication(application: Application) { const npmInstallArgs = application.install?.allowInstallScripts ? ['install', '--force'] : ['install', '--force', '--ignore-scripts']; + const npmOnLine = application.onInstallLine + ? (stream: 'stdout' | 'stderr', line: string) => application.onInstallLine!('npm', stream, line) + : undefined; const { stdout, stderr, code } = await nonInteractiveSpawn( application.name, (application.packageManagerPrefix ? application.packageManagerPrefix + ' ' : '') + 'npm', npmInstallArgs, application.dirPath, - application.install?.timeout + application.install?.timeout, + npmOnLine ); // if it succeeds, return @@ -440,11 +453,21 @@ export async function installApplication(application: Application) { throw new Error(`Failed to install dependencies for ${application.name} using npm default. Exit code: ${code}`); } +/** + * Slice B2: callback invoked once per complete line of install stdout/stderr from + * `nonInteractiveSpawn`. Threaded through `installApplication` to the underlying spawn + * so a deploy can stream `npm install` output back to the caller as an SSE `install` + * event in real time, rather than waiting for the process to exit. Line-buffered so a + * chunk that splits mid-line never fires a partial line. + */ +export type OnInstallLine = (manager: string, stream: 'stdout' | 'stderr', line: string) => void; + interface ApplicationOptions { name: string; payload?: Buffer | string | Readable; packageIdentifier?: string; install?: { command?: string; timeout?: number; allowInstallScripts?: boolean }; + onInstallLine?: OnInstallLine; } export class Application { @@ -452,15 +475,17 @@ export class Application { payload?: Buffer | string | Readable; packageIdentifier?: string; install?: { command?: string; timeout?: number; allowInstallScripts?: boolean }; + onInstallLine?: OnInstallLine; dirPath: string; logger: Logger; packageManagerPrefix: string; // can be used to configure a package manager prefix, specifically "sfw". - constructor({ name, payload, packageIdentifier, install }: ApplicationOptions) { + constructor({ name, payload, packageIdentifier, install, onInstallLine }: ApplicationOptions) { this.name = name; this.payload = payload; this.packageIdentifier = packageIdentifier && derivePackageIdentifier(packageIdentifier); this.install = install; + this.onInstallLine = onInstallLine; const componentsRoot = getConfigPath(CONFIG_PARAMS.COMPONENTSROOT); if (!componentsRoot) throw new Error('componentsRoot is not configured'); this.dirPath = join(componentsRoot, name); @@ -612,12 +637,53 @@ function getGitSSHCommand() { * @param timeoutMs The timeout for the command in milliseconds. Defaults to 5 minutes. * @returns A promise that resolves when the command completes. */ +/** + * Slice B2: line-buffered split that emits complete `\n`-terminated lines as they + * arrive, holding any partial trailing fragment until the next chunk or `flush()`. + * Required because `child_process` stdout/stderr `'data'` events fire per OS-level + * chunk, with no guarantee a chunk ends on a newline — without buffering, a long + * `npm install` line could be reported to the caller as two halves. + * + * Uses StringDecoder so a multi-byte UTF-8 character (e.g. the ✔ emoji npm prints + * for resolved packages) split across two chunks is reassembled into a single code + * point rather than each half being decoded as replacement characters. + */ +function createLineSplitter(onLine: (line: string) => void): { + push: (chunk: Buffer | string) => void; + flush: () => void; +} { + const decoder = new StringDecoder('utf8'); + let pending = ''; + return { + push(chunk) { + pending += typeof chunk === 'string' ? chunk : decoder.write(chunk); + let nl: number; + while ((nl = pending.indexOf('\n')) !== -1) { + const line = pending.slice(0, nl).replace(/\r$/, ''); + pending = pending.slice(nl + 1); + onLine(line); + } + }, + flush() { + // Drain any bytes the decoder is still holding (e.g. a multi-byte char that + // straddled the final chunk boundary). + const remaining = decoder.end(); + if (remaining) pending += remaining; + if (pending.length > 0) { + onLine(pending); + pending = ''; + } + }, + }; +} + export function nonInteractiveSpawn( applicationName: string, command: string, args: string[], cwd: string, - timeoutMs: number = 60 * 60 * 1000 + timeoutMs: number = 60 * 60 * 1000, + onLine?: (stream: 'stdout' | 'stderr', line: string) => void ): Promise<{ stdout: string; stderr: string; code: number }> { return new Promise((resolve, reject) => { logger @@ -647,24 +713,31 @@ export function nonInteractiveSpawn( reject(new Error(`Command\`${command} ${args.join(' ')}\` timed out after ${timeoutMs}ms`)); }, timeoutMs); + // Slice B2: if a caller passed onLine, line-buffer stdout/stderr alongside the + // existing string accumulation so we never report a half-line. + const stdoutSplitter = onLine ? createLineSplitter((line) => onLine('stdout', line)) : null; + const stderrSplitter = onLine ? createLineSplitter((line) => onLine('stderr', line)) : null; + let stdout = ''; childProcess.stdout.on('data', (chunk) => { // buffer stdout for later resolve stdout += chunk.toString(); // log stdout lines immediately - // TODO: Technically nothing guarantees that a chunk will be a complete line so need to implement - // something here to buffer until a newline character, then log the complete line logger.loggerWithTag(`${applicationName}:spawn:${command}:stdout`).debug?.(chunk.toString()); + stdoutSplitter?.push(chunk); }); // buffer stderr let stderr = ''; childProcess.stderr.on('data', (chunk) => { stderr += chunk.toString(); + stderrSplitter?.push(chunk); }); childProcess.on('error', (error) => { clearTimeout(timeout); + stdoutSplitter?.flush(); + stderrSplitter?.flush(); // Print out stderr before rejecting if (stderr) { printStd(applicationName, command, stderr, 'stderr'); @@ -674,6 +747,10 @@ export function nonInteractiveSpawn( childProcess.on('close', (code) => { clearTimeout(timeout); + // Flush any trailing partial lines so the caller sees process output that didn't + // end on a newline (some package managers do this on their final progress line). + stdoutSplitter?.flush(); + stderrSplitter?.flush(); if (stderr) { printStd(applicationName, command, stderr, 'stderr'); } diff --git a/components/deploymentRecorder.ts b/components/deploymentRecorder.ts index 9d5b78bfa..efa160394 100644 --- a/components/deploymentRecorder.ts +++ b/components/deploymentRecorder.ts @@ -72,6 +72,11 @@ export class DeploymentRecorder { private unsubscribe: (() => void) | null = null; private pendingPut: Promise | null = null; private dirty = false; + // Slice B2: peer outcomes are stashed here by recordPeers and applied inside finish() + // so the terminal put always carries them, avoiding a race with concurrent + // emitter-triggered puts that might otherwise overwrite peer_results with their + // pre-mutation snapshot of the record. + private pendingPeerResults: unknown[] | null = null; private constructor(deploymentId: string, initial: Record) { this.deploymentId = deploymentId; @@ -225,6 +230,28 @@ export class DeploymentRecorder { await this.put(); } + /** + * Slice B2: write per-peer results back to the origin row after `replicateOperation` + * returns. The replication layer returns an opaque array of per-peer outcomes; we + * normalize them here to `{node, status, error?, started_at, completed_at}` and write + * once. Tolerates unknown shapes — anything we can't interpret becomes a plain + * stringified entry so the audit trail at least records that a peer was contacted. + */ + // eslint-disable-next-line @typescript-eslint/require-await + async recordPeers(results: unknown): Promise { + if (this.finished) return; + if (!Array.isArray(results)) return; + // Stash for the terminal finish() put rather than writing immediately. A separate + // put here races with the coalesced emitter-triggered puts (each captures the + // in-memory record as it's serialized) and can lose peer_results when an earlier + // put's later-completing write overwrites our row. finish() bundles peer_results + // with the status=success/failed transition into one put, eliminating the race. + this.pendingPeerResults = results; + // Also update the in-memory record so any get_deployment SSE replay or other read + // before finish() sees the latest peer outcomes. + this.record.peer_results = results.map(normalizePeerResult); + } + async finish(status: 'success' | 'failed' | 'rolled_back', error?: unknown): Promise { if (this.finished) return; // Send a terminal sentinel through the emitter (if any) BEFORE we unsubscribe and @@ -246,6 +273,12 @@ export class DeploymentRecorder { } this.record.status = status; this.record.completed_at = Date.now(); + // Slice B2: re-apply any stashed peer outcomes right before the terminal put so they + // are bundled with the status transition and can't be lost to a put race. + if (this.pendingPeerResults) { + this.record.peer_results = this.pendingPeerResults.map(normalizePeerResult); + this.pendingPeerResults = null; + } if (error) { const e = error as { message?: string; code?: string | number; stack?: string }; this.record.error = { @@ -272,6 +305,73 @@ export class DeploymentRecorder { } } +/** + * Slice B2: peer-side helper — wait for the hdb_deployment row to arrive via table + * replication, then return it. The row is committed on origin before `replicateOperation` + * is called, so peers normally find it immediately; this polling loop is for the rare + * case where the operation arrives faster than the table-replication channel. + * + * The payload_blob's chunks may still be in flight after the row arrives — that's fine, + * the Blob's `stream()` / `bytes()` API blocks on incomplete writes (resources/blob.ts). + */ +export async function awaitDeploymentRow( + deploymentId: string, + options: { timeoutMs?: number; pollIntervalMs?: number; initialPollIntervalMs?: number } = {} +): Promise> { + const timeoutMs = options.timeoutMs ?? 30_000; + const maxIntervalMs = options.pollIntervalMs ?? 100; + // Start fast (5ms) so the common case — replication has already caught up — sees no + // human-noticeable latency, then back off exponentially up to maxIntervalMs for the + // rare case where the row is genuinely still replicating. + let intervalMs = options.initialPollIntervalMs ?? 5; + const table = (databases as any).system?.[terms.SYSTEM_TABLE_NAMES.DEPLOYMENT_TABLE_NAME]; + if (!table) { + throw new Error( + `Deployment tracking is not initialized on this node (system.${terms.SYSTEM_TABLE_NAMES.DEPLOYMENT_TABLE_NAME} missing).` + ); + } + const deadline = Date.now() + timeoutMs; + let lastError: unknown; + while (Date.now() < deadline) { + try { + const row = await table.get(deploymentId); + if (row && row.payload_blob != null) return row; + } catch (err) { + lastError = err; + } + await new Promise((resolve) => setTimeout(resolve, intervalMs)); + intervalMs = Math.min(intervalMs * 2, maxIntervalMs); + } + throw new Error( + `Timed out after ${timeoutMs}ms waiting for hdb_deployment row '${deploymentId}' to replicate` + + (lastError ? ` (last error: ${(lastError as Error).message ?? lastError})` : '') + ); +} + +function normalizePeerResult(raw: unknown): Record { + if (!raw || typeof raw !== 'object') { + // Replication layer returned a primitive — preserve as a stringified marker so the + // audit row at least records that something came back from a peer. + return { node: null, status: 'unknown', raw: String(raw) }; + } + const r = raw as Record; + const err = r.error; + const hasError = + err != null && (typeof err === 'string' ? err.length > 0 : typeof err === 'object' || typeof err === 'number'); + return { + node: r.node ?? r.name ?? r.hostname ?? null, + status: hasError ? 'failed' : (r.status ?? 'success'), + error: hasError + ? { + message: typeof err === 'object' ? ((err as any).message ?? String(err)) : String(err), + code: typeof err === 'object' ? (err as any).code : undefined, + } + : null, + started_at: r.started_at ?? null, + completed_at: r.completed_at ?? null, + }; +} + function startStatusFor(phase: string | undefined): DeploymentStatus | null { switch (phase) { case 'extract': diff --git a/components/operations.js b/components/operations.js index abe434526..591beac60 100644 --- a/components/operations.js +++ b/components/operations.js @@ -18,7 +18,7 @@ const { packageDirectory } = require('../components/packageComponent.ts'); const { Resources } = require('../resources/Resources.ts'); const { Application, prepareApplication } = require('./Application.ts'); const { server } = require('../server/Server.ts'); -const { DeploymentRecorder } = require('./deploymentRecorder.ts'); +const { DeploymentRecorder, awaitDeploymentRow } = require('./deploymentRecorder.ts'); const { ProgressEmitter } = require('../server/serverHelpers/progressEmitter.ts'); /** @@ -417,12 +417,19 @@ async function deployComponent(req) { try { // On the origin, tee the tarball (Buffer or Readable from the multipart parser) // through a hash-and-size tap into the row's payload_blob, then re-source extraction - // from the persisted blob. This is the staging area and (in Slice B) the channel - // peers will replicate from. On peer nodes we skip recording entirely and use the - // raw payload as-is. + // from the persisted blob. The blob is the channel peers read from in Slice B2. if (recorder && req.payload != null) { await recorder.ingestPayload(req.payload); extractionPayload = recorder.row.payload_blob.stream(); + } else if (isReplicatedExecution && req.payload == null && !req.package) { + // Slice B2 of #641: peer-side blob read. Origin stripped req.payload before + // replicateOperation; the tarball travels via the replicated hdb_deployment row's + // payload_blob attribute instead. Wait for the row to arrive on this node, then + // stream the blob — Blob.stream() handles in-flight BLOB_CHUNK writes by blocking + // until the chunks land. If the row never replicates within the timeout, peer + // records a failure and origin will see it in peer_results. + const row = await awaitDeploymentRow(req._deploymentId); + extractionPayload = row.payload_blob.stream(); } const application = new Application({ @@ -434,6 +441,11 @@ async function deployComponent(req) { timeout: req.install_timeout, allowInstallScripts: req.install_allow_scripts, }, + // Slice B2: forward each complete line of install stdout/stderr to the SSE channel + // (and into the recorder's event_log via the same subscriber). Peers have no + // emitter — their install output goes to the local logger only; cross-node install + // streaming is intentionally out of scope for B2. + onInstallLine: emitter ? (manager, stream, line) => emit('install', { manager, stream, line }) : undefined, }); emit('phase', { phase: 'prepare', status: 'start' }); @@ -466,13 +478,19 @@ async function deployComponent(req) { const rollingRestart = req.restart === 'rolling'; // if doing a rolling restart set restart to false so that other nodes don't also restart. req.restart = rollingRestart ? false : req.restart; - // ProgressEmitter holds function listeners that can't survive the replication - // channel's serialization, and the recorder is local to origin anyway. Strip both - // before sending so peers see a clean req. + // Strip transport-only fields that don't survive the replication channel and aren't + // meaningful to peers. The payload travels via the replicated hdb_deployment row's + // payload_blob attribute (Slice B2), so peers don't need req.payload at all — they + // look the row up by deployment_id. req._deploymentId is intentionally KEPT; it is + // the handoff that lets peers find the replicated row. delete req.progress; + delete req.payload; emit('phase', { phase: 'replicate', status: 'start' }); let response = await server.replication.replicateOperation(req); emit('phase', { phase: 'replicate', status: 'done' }); + if (recorder && response?.replicated) { + await recorder.recordPeers(response.replicated); + } if (req.restart === true) { emit('phase', { phase: 'restart', status: 'start' }); manageThreads.restartWorkers('http'); diff --git a/integrationTests/deploy/deploy-tracking-peer-branch.test.ts b/integrationTests/deploy/deploy-tracking-peer-branch.test.ts new file mode 100644 index 000000000..2272db412 --- /dev/null +++ b/integrationTests/deploy/deploy-tracking-peer-branch.test.ts @@ -0,0 +1,178 @@ +/** + * Deployment tracking — peer-side branch (Slice B2 of issue #641). + * + * In a real multi-node deploy, the origin strips `req.payload` before `replicateOperation` + * and the peer reads the tarball from the replicated `hdb_deployment.payload_blob` row + * attribute instead. This test exercises that **peer-side branch** in isolation on a + * single node by: + * + * 1. Doing a normal deploy to populate an `hdb_deployment` row with a `payload_blob`. + * 2. Submitting a second `deploy_component` operation with `_deploymentId` set to that + * row's id and **no** `payload` field — the same shape origin produces for peers. + * 3. Asserting the deploy completes successfully — meaning the peer-side branch in + * `deployComponent` found the row, streamed `payload_blob`, and ran prepare/install/load + * from the blob bytes. + * + * The true 3-node test (verifies BLOB_CHUNK replication actually delivers the row to + * peers, and that `peer_results` is populated) lives in harper-pro, where the actual + * `replicateOperation` is implemented. This OSS test only verifies the handler wiring. + */ +import { suite, test, before, after } from 'node:test'; +import { ok, strictEqual } from 'node:assert/strict'; +import { join } from 'node:path'; +import { mkdtempSync, mkdirSync, writeFileSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { setTimeout as sleep } from 'node:timers/promises'; +import { request } from 'node:http'; +import { Readable } from 'node:stream'; + +import { startHarper, teardownHarper, type ContextWithHarper } from '@harperfast/integration-testing'; + +import { streamPackagedDirectory } from '../../dist/components/packageComponent.js'; +import { buildMultipartBody } from '../../dist/bin/multipartBuilder.js'; + +function postMultipart( + url: URL, + contentType: string, + body: Readable, + auth: { username: string; password: string } +): Promise<{ status: number; body: string }> { + return new Promise((resolve, reject) => { + const req = request( + { + protocol: url.protocol, + hostname: url.hostname, + port: url.port, + method: 'POST', + path: '/', + headers: { + 'Content-Type': contentType, + 'Transfer-Encoding': 'chunked', + 'Authorization': 'Basic ' + Buffer.from(`${auth.username}:${auth.password}`).toString('base64'), + }, + }, + (res) => { + res.setEncoding('utf8'); + let buf = ''; + res.on('data', (chunk) => (buf += chunk)); + res.on('end', () => resolve({ status: res.statusCode ?? 0, body: buf })); + } + ); + req.on('error', reject); + body.on('error', reject); + body.pipe(req); + }); +} + +async function callOperation( + ctx: ContextWithHarper, + op: Record, + headers: Record = {} +): Promise<{ status: number; body: any; rawText: string }> { + const url = new URL(ctx.harper.operationsAPIURL); + const auth = 'Basic ' + Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64'); + const res = await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'Authorization': auth, ...headers }, + body: JSON.stringify(op), + }); + const text = await res.text(); + let parsed: any = text; + try { + parsed = JSON.parse(text); + } catch { + // not JSON + } + return { status: res.status, body: parsed, rawText: text }; +} + +suite('Deployment tracking — peer-side branch (Slice B2)', (ctx: ContextWithHarper) => { + let fixtureDir: string; + let seedDeploymentId: string; + + before(async () => { + await startHarper(ctx); + fixtureDir = mkdtempSync(join(tmpdir(), 'peer-branch-fixture-')); + writeFileSync(join(fixtureDir, 'config.yaml'), 'graphqlSchema:\n files: schema.graphql\nrest: true\n'); + writeFileSync(join(fixtureDir, 'schema.graphql'), 'type Query { hello: String }\n'); + mkdirSync(join(fixtureDir, 'web'), { recursive: true }); + writeFileSync(join(fixtureDir, 'web', 'index.html'), '

Hello, Peer Branch!

'); + }); + + after(async () => { + try { + rmSync(fixtureDir, { recursive: true, force: true }); + } catch { + // best-effort + } + await teardownHarper(ctx); + }); + + test('seed: an initial deploy populates an hdb_deployment row with a payload_blob', async () => { + const project = 'peer-branch-seed-application'; + const multipart = buildMultipartBody( + { operation: 'deploy_component', project, restart: false }, + { + name: 'payload', + filename: 'package.tar.gz', + contentType: 'application/gzip', + stream: streamPackagedDirectory(fixtureDir, { skip_node_modules: true }), + } + ); + const url = new URL(ctx.harper.operationsAPIURL); + const response = await postMultipart(url, multipart.contentType, multipart.stream, ctx.harper.admin); + strictEqual(response.status, 200, `seed deploy failed: ${response.body}`); + const result = JSON.parse(response.body); + seedDeploymentId = result.deployment_id; + ok(seedDeploymentId, 'seed deploy should return a deployment_id'); + + await sleep(200); // let coalesced writes settle + + const got = await callOperation(ctx, { operation: 'get_deployment', deployment_id: seedDeploymentId }); + strictEqual(got.status, 200); + ok(got.body.payload_blob_present, 'seed row should have a payload_blob attached'); + ok(got.body.payload_hash, 'seed row should have a sha256 payload_hash'); + }); + + // On Bun the deploy hangs after extraction when reading a Web ReadableStream from a + // file-backed blob inside the same Harper process — same code passes on Node v22/v24 + // across Linux and Windows. Skipping for now; the harper-pro 3-node cluster test + // (HarperFast/harper-pro#221) covers the same code path end-to-end with real replication. + const skipOnBun = process.env.HARPER_RUNTIME === 'bun'; + test( + 'peer-side branch: deploy_component with _deploymentId + no payload uses the row blob', + { skip: skipOnBun }, + async () => { + // Simulate the operation shape origin produces for peers via `replicateOperation`: + // `_deploymentId` set, no `payload`, no multipart. The handler should detect this is a + // replicated execution and source the tarball from the row's payload_blob. + const peerProject = 'peer-branch-replay-application'; + const response = await callOperation(ctx, { + operation: 'deploy_component', + project: peerProject, + restart: false, + _deploymentId: seedDeploymentId, + }); + strictEqual(response.status, 200, `peer-side deploy should succeed; got ${response.status}: ${response.rawText}`); + + // Confirm the component was actually written on disk (peer code path ran extraction + // from the row's payload_blob and not from a missing req.payload). + const fetched = await fetch(`${ctx.harper.operationsAPIURL}/${peerProject}/`, { + headers: { + Authorization: + 'Basic ' + Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64'), + }, + }); + // The component exposes nothing routable, but a 404 from the component (vs. a 503/connection + // failure) confirms it loaded — Harper only routes to deployed component names. + ok( + fetched.status === 404 || fetched.status === 200, + `expected component to be reachable (any 200/404 from the loaded component), got ${fetched.status}` + ); + } + ); + + // Note: the bogus-_deploymentId-id timeout case isn't covered here because the + // awaitDeploymentRow 30s default would balloon test time. The timeout path is exercised + // by the unit tests for awaitDeploymentRow directly. +}); diff --git a/unitTests/components/applicationSpawn.test.js b/unitTests/components/applicationSpawn.test.js new file mode 100644 index 000000000..3ee320e33 --- /dev/null +++ b/unitTests/components/applicationSpawn.test.js @@ -0,0 +1,125 @@ +'use strict'; + +// Slice B2 of issue #641: verifies the line-buffered `onLine` callback added to +// `nonInteractiveSpawn`. The spawn function buffers stdout/stderr by newline so a +// chunk that splits mid-line never fires a partial line; trailing fragments are +// flushed on process close. These tests drive that contract through Node running +// short scripts written to temp files, exercising the same code path used by +// `npm install` line streaming. + +const assert = require('node:assert'); +const { mkdtempSync, writeFileSync, rmSync } = require('node:fs'); +const { tmpdir } = require('node:os'); +const { join } = require('node:path'); + +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { nonInteractiveSpawn } = require('#src/components/Application'); + +// Write `script` to a temp .js file and return its path; auto-removed in `after`. +let workDir; +before(() => { + workDir = mkdtempSync(join(tmpdir(), 'spawn-onLine-')); +}); +after(() => { + try { + rmSync(workDir, { recursive: true, force: true }); + } catch { + // best effort + } +}); + +function writeScript(name, body) { + const p = join(workDir, name); + writeFileSync(p, body); + return p; +} + +describe('nonInteractiveSpawn onLine line buffering', () => { + it('reports each complete line via the onLine callback', async () => { + const script = writeScript('three-lines.js', `process.stdout.write('first\\nsecond\\nthird\\n');`); + const lines = []; + const result = await nonInteractiveSpawn('test-app', 'node', [script], workDir, 30_000, (stream, line) => + lines.push({ stream, line }) + ); + assert.strictEqual(result.code, 0, `expected exit 0, got ${result.code}; stderr=${result.stderr}`); + assert.deepStrictEqual(lines, [ + { stream: 'stdout', line: 'first' }, + { stream: 'stdout', line: 'second' }, + { stream: 'stdout', line: 'third' }, + ]); + }); + + it('flushes a trailing partial line (no terminating newline) on process close', async () => { + const script = writeScript('trailing.js', `process.stdout.write('one\\ntwo without newline');`); + const lines = []; + const result = await nonInteractiveSpawn('test-app', 'node', [script], workDir, 30_000, (stream, line) => + lines.push({ stream, line }) + ); + assert.strictEqual(result.code, 0); + assert.deepStrictEqual(lines, [ + { stream: 'stdout', line: 'one' }, + { stream: 'stdout', line: 'two without newline' }, + ]); + }); + + it('strips a trailing \\r so CRLF-terminated lines arrive clean', async () => { + const script = writeScript('crlf.js', `process.stdout.write('crlf\\r\\nmore\\r\\n');`); + const lines = []; + const result = await nonInteractiveSpawn('test-app', 'node', [script], workDir, 30_000, (_stream, line) => + lines.push(line) + ); + assert.strictEqual(result.code, 0); + // No trailing \r on either line — splitter normalizes CRLF to LF. + assert.deepStrictEqual(lines, ['crlf', 'more']); + }); + + it('tags stderr lines with the "stderr" stream label', async () => { + const script = writeScript( + 'mixed.js', + `process.stderr.write('warn line\\n'); process.stdout.write('info line\\n');` + ); + const lines = []; + const result = await nonInteractiveSpawn('test-app', 'node', [script], workDir, 30_000, (stream, line) => + lines.push({ stream, line }) + ); + assert.strictEqual(result.code, 0); + // stdout/stderr are independent FDs — interleaving order is implementation-defined. + // Assert content, not order. + assert.ok( + lines.some((l) => l.stream === 'stdout' && l.line === 'info line'), + `expected stdout 'info line', got: ${JSON.stringify(lines)}` + ); + assert.ok( + lines.some((l) => l.stream === 'stderr' && l.line === 'warn line'), + `expected stderr 'warn line', got: ${JSON.stringify(lines)}` + ); + }); + + it('reassembles a multi-byte UTF-8 character split across two chunks', async () => { + // The ✔ codepoint (U+2714) is 3 bytes in UTF-8 (0xE2 0x9C 0x94). Many package + // managers print it in their resolved-package summaries. Without a StringDecoder, + // a chunk boundary that lands inside this sequence corrupts it into U+FFFD + // replacement characters. Drive a script that writes 200 ✔ characters with a + // flush in the middle so the OS pipe is very likely to deliver in multiple chunks. + const script = writeScript( + 'utf8.js', + `for (let i = 0; i < 200; i++) { process.stdout.write('\\u2714'); }\nprocess.stdout.write('\\n');` + ); + const lines = []; + const result = await nonInteractiveSpawn('test-app', 'node', [script], workDir, 30_000, (_stream, line) => + lines.push(line) + ); + assert.strictEqual(result.code, 0); + assert.strictEqual(lines.length, 1); + assert.strictEqual(lines[0], '✔'.repeat(200), 'all ✔ characters should be intact, no U+FFFD'); + }); + + it('is opt-in: no onLine callback still captures stdout/stderr into the resolve payload', async () => { + const script = writeScript('opt-in.js', `process.stdout.write('hello world\\n');`); + const result = await nonInteractiveSpawn('test-app', 'node', [script], workDir, 30_000); + assert.strictEqual(result.code, 0); + assert.match(result.stdout, /hello world/); + }); +}); diff --git a/unitTests/components/deploymentRecorder.test.js b/unitTests/components/deploymentRecorder.test.js new file mode 100644 index 000000000..6cf0709e2 --- /dev/null +++ b/unitTests/components/deploymentRecorder.test.js @@ -0,0 +1,172 @@ +'use strict'; + +// Slice B2 of issue #641: unit-tests for the helpers added to DeploymentRecorder: +// - `recordPeers()` — normalizes the opaque replication-layer per-peer outcomes into +// a stable `[{node, status, error?, started_at, completed_at}]` shape on the row. +// - `awaitDeploymentRow()` — peer-side helper that polls the hdb_deployment table +// until the row arrives via replication, then returns it. +// +// These exercise the table layer via a tiny mock attached to `databases.system` — +// the recorder's `put()` already tolerates a missing table, so we only mock when we +// need to control the `.get()` return value or assert side effects. + +const assert = require('node:assert'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { DeploymentRecorder, awaitDeploymentRow } = require('#src/components/deploymentRecorder'); +const { databases } = require('#src/resources/databases'); +const terms = require('#src/utility/hdbTerms'); + +const DEPLOYMENT_TABLE = terms.SYSTEM_TABLE_NAMES.DEPLOYMENT_TABLE_NAME; + +// Lightweight mock: keeps a Map of rows, exposes get(id) and put(row). +function installMockDeploymentTable() { + const rows = new Map(); + const mock = { + rows, + async get(id) { + return rows.get(id); + }, + async put(row) { + rows.set(row.deployment_id, row); + }, + }; + if (!databases.system) databases.system = {}; + const prior = databases.system[DEPLOYMENT_TABLE]; + databases.system[DEPLOYMENT_TABLE] = mock; + return { + mock, + restore() { + databases.system[DEPLOYMENT_TABLE] = prior; + }, + }; +} + +describe('DeploymentRecorder.recordPeers', () => { + let installed; + beforeEach(() => { + installed = installMockDeploymentTable(); + }); + afterEach(() => installed.restore()); + + it('normalizes a single-peer success result', async () => { + const recorder = await DeploymentRecorder.create({ project: 'p' }); + await recorder.recordPeers([{ node: 'node-b', status: 'success', started_at: 1000, completed_at: 1500 }]); + assert.deepStrictEqual(recorder.row.peer_results, [ + { node: 'node-b', status: 'success', error: null, started_at: 1000, completed_at: 1500 }, + ]); + }); + + it('maps an Error-bearing result to status="failed" with structured error', async () => { + const recorder = await DeploymentRecorder.create({ project: 'p' }); + await recorder.recordPeers([{ node: 'node-c', error: { message: 'install timed out', code: 'ETIMEDOUT' } }]); + assert.deepStrictEqual(recorder.row.peer_results, [ + { + node: 'node-c', + status: 'failed', + error: { message: 'install timed out', code: 'ETIMEDOUT' }, + started_at: null, + completed_at: null, + }, + ]); + }); + + it('treats a string-shaped error as failed and preserves the message', async () => { + const recorder = await DeploymentRecorder.create({ project: 'p' }); + await recorder.recordPeers([{ node: 'node-d', error: 'connection refused' }]); + assert.strictEqual(recorder.row.peer_results[0].status, 'failed'); + assert.strictEqual(recorder.row.peer_results[0].error.message, 'connection refused'); + }); + + it('falls back to "name"/"hostname" when "node" is missing', async () => { + const recorder = await DeploymentRecorder.create({ project: 'p' }); + await recorder.recordPeers([ + { name: 'node-by-name', status: 'success' }, + { hostname: 'node-by-hostname', status: 'success' }, + ]); + assert.strictEqual(recorder.row.peer_results[0].node, 'node-by-name'); + assert.strictEqual(recorder.row.peer_results[1].node, 'node-by-hostname'); + }); + + it('preserves primitive entries as stringified raw markers', async () => { + const recorder = await DeploymentRecorder.create({ project: 'p' }); + await recorder.recordPeers(['node-x failed somehow']); + assert.strictEqual(recorder.row.peer_results[0].status, 'unknown'); + assert.strictEqual(recorder.row.peer_results[0].raw, 'node-x failed somehow'); + }); + + it('is a no-op when called after finish()', async () => { + const recorder = await DeploymentRecorder.create({ project: 'p' }); + await recorder.finish('success'); + await recorder.recordPeers([{ node: 'node-late', status: 'success' }]); + assert.deepStrictEqual(recorder.row.peer_results, []); + }); + + it('stashes peer results so the terminal finish() put carries them — no separate put race', async () => { + // recordPeers should NOT trigger a put on its own. It mutates the in-memory record + // (so live readers see the latest peer outcomes) and stashes for finish() to bundle + // with the status transition. This avoids racing with the coalesced emitter-driven + // puts that capture record snapshots at serialization time. + const recorder = await DeploymentRecorder.create({ project: 'p' }); + const putsBefore = Array.from(installed.mock.rows.values()).length; + await recorder.recordPeers([{ node: 'node-z', status: 'success' }]); + // In-memory state reflects the new peer_results immediately. + assert.strictEqual(recorder.row.peer_results[0].node, 'node-z'); + // Now run finish — that's the put that persists peer_results to LMDB. + await recorder.finish('success'); + const persisted = await installed.mock.get(recorder.deploymentId); + assert.strictEqual(persisted.peer_results[0].node, 'node-z'); + assert.strictEqual(persisted.peer_results[0].status, 'success'); + assert.strictEqual(persisted.status, 'success'); + // Sanity check: at least one put was issued (create + finish). + assert.ok(installed.mock.rows.size > putsBefore - 1); + }); + + it('is a no-op for non-array inputs (defensive against odd replication shapes)', async () => { + const recorder = await DeploymentRecorder.create({ project: 'p' }); + await recorder.recordPeers(undefined); + await recorder.recordPeers(null); + await recorder.recordPeers('not an array'); + await recorder.recordPeers({ node: 'object-not-array' }); + assert.deepStrictEqual(recorder.row.peer_results, []); + }); +}); + +describe('awaitDeploymentRow', () => { + let installed; + beforeEach(() => { + installed = installMockDeploymentTable(); + }); + afterEach(() => installed.restore()); + + it('returns the row immediately when it is already present with payload_blob', async () => { + const row = { deployment_id: 'd1', payload_blob: { fake: true } }; + installed.mock.rows.set('d1', row); + const result = await awaitDeploymentRow('d1'); + assert.strictEqual(result, row); + }); + + it('skips a row with no payload_blob (still in flight) and resolves once it arrives', async () => { + const id = 'd2'; + installed.mock.rows.set(id, { deployment_id: id, payload_blob: null }); + // Schedule a delayed write of the blob so the polling loop sees it. + setTimeout(() => { + installed.mock.rows.set(id, { deployment_id: id, payload_blob: { fake: true } }); + }, 50); + const result = await awaitDeploymentRow(id, { timeoutMs: 1000, pollIntervalMs: 25 }); + assert.ok(result.payload_blob); + }); + + it('rejects with a timeout error when the row never arrives within timeoutMs', async () => { + await assert.rejects( + () => awaitDeploymentRow('never-arrives', { timeoutMs: 100, pollIntervalMs: 25 }), + /Timed out after 100ms waiting for hdb_deployment row 'never-arrives'/ + ); + }); + + it('throws if the deployment table is missing entirely (not yet provisioned)', async () => { + delete databases.system[DEPLOYMENT_TABLE]; + await assert.rejects(() => awaitDeploymentRow('d3'), /Deployment tracking is not initialized on this node/); + }); +});