From 167f5b0e65cc12197244cf32a4548becfa32039e Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Thu, 14 May 2026 09:15:24 -0600 Subject: [PATCH 1/5] feat(deploy): stage streamed payloads to a temp file for replication When a deploy_component is replicated to peers, the origin needs to keep a copy of the payload so it can re-stream it to each peer. The payload Readable is consumed once by local extraction; without staging, peer replication would have nothing to send. deployComponent now stages the payload to a temp file before extraction when there are nodes to replicate to and the payload is a Readable (non-package, streaming deploys). Local extraction re-sources from the staged file, which keeps backpressure behavior identical to the non-replicated case. The temp file is cleaned up in a finally block after replication completes (success, failure, or skipped). Local-only deploys keep their zero-disk-copy property; package-identifier deploys are unaffected. This is the core-side staging needed for harper-pro's direct-HTTPS deploy relay (slice 3b, addressing HarperFast/harper#524). Co-Authored-By: Claude Opus 4.7 (1M context) --- components/operations.js | 30 ++++++++- components/payloadStaging.ts | 41 +++++++++++++ unitTests/components/payloadStaging.test.js | 67 +++++++++++++++++++++ 3 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 components/payloadStaging.ts create mode 100644 unitTests/components/payloadStaging.test.js diff --git a/components/operations.js b/components/operations.js index ac8d76dcc..fcfc487c9 100644 --- a/components/operations.js +++ b/components/operations.js @@ -17,7 +17,10 @@ const manageThreads = require('../server/threads/manageThreads.js'); const { packageDirectory } = require('../components/packageComponent.ts'); const { Resources } = require('../resources/Resources.ts'); const { Application, prepareApplication } = require('./Application.ts'); +const { stagePayloadToTempFile } = require('./payloadStaging.ts'); const { server } = require('../server/Server.ts'); +const { Readable } = require('node:stream'); +const { createReadStream } = require('node:fs'); /** * Read the settings.js file and return the @@ -389,6 +392,21 @@ async function deployComponent(req) { await configUtils.addConfig(req.project, applicationConfig); } + // Stage streamed payloads to a temp file when replication is needed. The payload + // Readable is consumed once by local extraction; without staging, replicas would have + // nothing to relay. Skipped when there are no peers (the only-local-deploy case keeps + // its zero-disk-copy property) or when the deploy is package-identifier-based. + let stagedPayload; + const needsReplication = req.replicated !== false && (server.nodes?.length ?? 0) > 0; + if (req.payload instanceof Readable && needsReplication) { + stagedPayload = await stagePayloadToTempFile(req.payload, req.project); + // Re-source the local extraction from the staged file. The Application sees a regular + // fs Readable rather than the original chunked HTTP body, which keeps backpressure + // behavior of extract identical to the non-replicated case. + req.payload = createReadStream(stagedPayload.path); + req._stagedPayloadPath = stagedPayload.path; + } + const application = new Application({ name: req.project, payload: req.payload, @@ -405,6 +423,8 @@ async function deployComponent(req) { await prepareApplication(application); } catch (err) { progress?.emit('phase', { phase: 'extract_or_install', status: 'error', message: err?.message ?? String(err) }); + // Clean up the staged payload on early failure so we don't leak disk. + stagedPayload?.cleanup().catch(() => {}); throw err; } progress?.emit('phase', { phase: 'install', status: 'done' }); @@ -439,7 +459,15 @@ async function deployComponent(req) { // if doing a rolling restart set restart to false so that other nodes don't also restart. req.restart = rollingRestart ? false : req.restart; progress?.emit('phase', { phase: 'replicate', status: 'start' }); - let response = await server.replication.replicateOperation(req); + let response; + try { + response = await server.replication.replicateOperation(req); + } finally { + // Whether replication succeeded, failed, or didn't run, the staged payload has served + // its purpose. Best-effort cleanup; if the rm fails (e.g. file already gone on a retry + // path) we don't surface that to the deploy caller. + await stagedPayload?.cleanup().catch(() => {}); + } progress?.emit('phase', { phase: 'replicate', status: 'done' }); if (req.restart === true) { progress?.emit('phase', { phase: 'restart', status: 'start' }); diff --git a/components/payloadStaging.ts b/components/payloadStaging.ts new file mode 100644 index 000000000..ef59491ba --- /dev/null +++ b/components/payloadStaging.ts @@ -0,0 +1,41 @@ +import { createWriteStream } from 'node:fs'; +import { mkdtemp, rm } from 'node:fs/promises'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { pipeline } from 'node:stream/promises'; +import type { Readable } from 'node:stream'; + +/** + * Buffer a streamed deploy_component payload to a temp tar.gz file on disk. + * + * Used by replicated deploys: the origin needs to keep a copy of the streamed payload so + * it can re-stream it to each peer over the HTTPS relay (see harper-pro's deployRelay). + * Without this, the payload Readable would be consumed once by local extraction and gone + * by the time replication runs. + * + * Trade-off: we write the full payload to disk before extraction reads from it (instead of + * tee-ing in-flight) so the local-deploy path stays unchanged — extractApplication still + * gets a regular createReadStream over a complete file. Two passes over the data, but no + * concurrent-tee complexity, and disk speed isn't the bottleneck on deploy. + * + * Returns the staged file's path and a cleanup function. The cleanup deletes the temp + * directory; safe to call multiple times (rm with force). + */ +export async function stagePayloadToTempFile( + source: Readable, + projectName: string +): Promise<{ path: string; cleanup: () => Promise }> { + const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`)); + const path = join(dir, 'payload.tar.gz'); + await pipeline(source, createWriteStream(path)); + const cleanup = async () => { + await rm(dir, { recursive: true, force: true }); + }; + return { path, cleanup }; +} + +function sanitize(name: string): string { + // keep alphanumerics, dashes, underscores; replace everything else so a malicious or + // quirky project name (slashes, dots, control chars) can't escape the tmpdir. + return name.replace(/[^a-zA-Z0-9._-]/g, '_').slice(0, 64); +} diff --git a/unitTests/components/payloadStaging.test.js b/unitTests/components/payloadStaging.test.js new file mode 100644 index 000000000..68b9dec43 --- /dev/null +++ b/unitTests/components/payloadStaging.test.js @@ -0,0 +1,67 @@ +'use strict'; + +const assert = require('node:assert'); +const { Readable } = require('node:stream'); +const { readFile, stat } = require('node:fs/promises'); +const path = require('node:path'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { stagePayloadToTempFile } = require('#src/components/payloadStaging'); + +describe('stagePayloadToTempFile', () => { + it('writes a streamed payload to a temp file with the expected contents', async () => { + const payload = Buffer.from('abcdefghij'.repeat(10000), 'utf8'); // 100 KB + const { path: tmpPath, cleanup } = await stagePayloadToTempFile(Readable.from(payload), 'demo'); + try { + const written = await readFile(tmpPath); + assert.strictEqual(written.length, payload.length); + assert.deepStrictEqual(written, payload); + assert.match(tmpPath, /harper-deploy-demo-/, 'temp dir is named after the project'); + assert.strictEqual(path.basename(tmpPath), 'payload.tar.gz'); + } finally { + await cleanup(); + } + }); + + it('cleanup() removes the staged file and its parent temp dir', async () => { + const { path: tmpPath, cleanup } = await stagePayloadToTempFile(Readable.from('hello'), 'cleanup-test'); + await cleanup(); + await assert.rejects(stat(tmpPath), /ENOENT/, 'staged file must be gone'); + await assert.rejects(stat(path.dirname(tmpPath)), /ENOENT/, 'staged dir must be gone'); + }); + + it('cleanup() is safe to call twice (force: true)', async () => { + const { cleanup } = await stagePayloadToTempFile(Readable.from('hello'), 'double-cleanup'); + await cleanup(); + await cleanup(); // must not throw + }); + + it('sanitizes path-traversal characters in the project name', async () => { + const { path: tmpPath, cleanup } = await stagePayloadToTempFile(Readable.from('x'), '../../evil/name'); + try { + // Path separators must be replaced so the temp dir lives in a single mkdtemp slot + // directly under tmpdir, not navigated upstream. `..` segments alone don't traverse + // because there's no `/` between them after sanitization. + const os = require('node:os'); + assert.strictEqual( + path.dirname(path.dirname(tmpPath)), + path.resolve(os.tmpdir()), + 'staged dir must live directly under tmpdir' + ); + assert.doesNotMatch(path.basename(path.dirname(tmpPath)), /\//); + assert.match(path.basename(path.dirname(tmpPath)), /harper-deploy-.+_evil_name-/); + } finally { + await cleanup(); + } + }); + + it('propagates source-stream errors through pipeline', async () => { + const source = new Readable({ + read() { + this.destroy(new Error('source boom')); + }, + }); + await assert.rejects(stagePayloadToTempFile(source, 'erroring'), /source boom/); + }); +}); From fc8530ea11e41885d7b5dff0880a2cd8a1e5c3fe Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Tue, 19 May 2026 23:02:04 -0600 Subject: [PATCH 2/5] fix: drain SSE response stream when server returns non-SSE on SSE path When useSse=true, httpRequest uses streamResponse mode and returns a raw IncomingMessage. The previous fallback branch read response.body which is undefined in that mode (e.g. on a 401 auth failure). Drain the stream chunks manually so error bodies are captured correctly. Co-Authored-By: Claude Opus 4.7 (1M context) --- bin/cliOperations.ts | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index 79c235a1f..dc277b5a3 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -274,12 +274,22 @@ async function cliOperations(req: any, skipResponseLog = false) { } responseData = finalResult ?? { message: 'Deploy completed (no result payload).' }; } else { + // When useSse is true, httpRequest returns a raw IncomingMessage (streamResponse mode), + // so .body is undefined. Drain the stream to get the text (e.g. a 401 error body). + let bodyText: string; + if (useSse) { + const chunks: Buffer[] = []; + for await (const chunk of response as AsyncIterable) chunks.push(Buffer.from(chunk)); + bodyText = Buffer.concat(chunks).toString('utf8'); + } else { + bodyText = response.body; + } try { - responseData = JSON.parse(response.body); + responseData = JSON.parse(bodyText); } catch { responseData = { status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'), - body: response.body, + body: bodyText, }; } } From 1fdcf09c8ca99e27a3786042278a9e7bc337e7a9 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 20 May 2026 03:15:13 -0600 Subject: [PATCH 3/5] fix(sse): StringDecoder for split UTF-8, cleanup on client disconnect, error stringify - parseSSE: use StringDecoder so multi-byte chars split across chunk boundaries decode correctly instead of corrupting (e.g. box-drawing chars, emojis) - createSSEResponseStream: gate writes behind an active flag; listen to stream close/end to unsubscribe the emitter immediately on client disconnect so the subscription doesn't linger for the full operation lifetime - cliOperations SSE error: JSON.stringify fallback when sseError has no .message so the error renders as JSON rather than [object Object] Co-Authored-By: Claude Opus 4.7 (1M context) --- bin/cliOperations.ts | 4 ++- bin/sseConsumer.ts | 5 +++- server/serverHelpers/progressEmitter.ts | 35 ++++++++++++++++++------- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index dc277b5a3..21a04132c 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -269,7 +269,9 @@ async function cliOperations(req: any, skipResponseLog = false) { } } if (sseError) { - console.error(`error: ${sseError.message ?? sseError}`); + const errMsg = + sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError); + console.error(`error: ${errMsg}`); process.exit(1); } responseData = finalResult ?? { message: 'Deploy completed (no result payload).' }; diff --git a/bin/sseConsumer.ts b/bin/sseConsumer.ts index f279013dd..b89485f15 100644 --- a/bin/sseConsumer.ts +++ b/bin/sseConsumer.ts @@ -1,3 +1,4 @@ +import { StringDecoder } from 'node:string_decoder'; import type { Readable } from 'node:stream'; export interface SSEMessage { @@ -15,9 +16,10 @@ export interface SSEMessage { * Readable does not guarantee chunks align with SSE record boundaries. */ export async function* parseSSE(stream: Readable): AsyncGenerator { + const decoder = new StringDecoder('utf8'); let buffer = ''; for await (const chunk of stream) { - buffer += chunk.toString('utf8'); + buffer += typeof chunk === 'string' ? chunk : decoder.write(chunk); while (true) { const recordEnd = buffer.indexOf('\n\n'); const crlfEnd = buffer.indexOf('\r\n\r\n'); @@ -37,6 +39,7 @@ export async function* parseSSE(stream: Readable): AsyncGenerator { if (msg) yield msg; } } + buffer += decoder.end(); // Any trailing record without a terminating blank line is treated as a final message, // matching the looser behavior browsers exhibit on connection close. if (buffer.trim()) { diff --git a/server/serverHelpers/progressEmitter.ts b/server/serverHelpers/progressEmitter.ts index cf390f995..af2cc8bd9 100644 --- a/server/serverHelpers/progressEmitter.ts +++ b/server/serverHelpers/progressEmitter.ts @@ -48,25 +48,40 @@ export class ProgressEmitter { export function createSSEResponseStream(emitter: ProgressEmitter, operation: () => Promise): Readable { const stream = new PassThrough(); + let active = true; const unsubscribe = emitter.subscribe((event) => { - writeSSE(stream, event); + if (active) writeSSE(stream, event); }); + const cleanup = () => { + if (active) { + active = false; + unsubscribe(); + } + }; + + // If the client disconnects (Ctrl-C, network drop) stop writing to the stream and + // release the emitter subscription so it doesn't accumulate for the operation lifetime. + stream.on('close', cleanup); + stream.on('end', cleanup); + operation() .then((result) => { - writeSSE(stream, { event: 'done', data: { result } }); + if (active) writeSSE(stream, { event: 'done', data: { result } }); }) .catch((err) => { - writeSSE(stream, { - event: 'error', - data: { - message: err?.message ?? String(err), - code: err?.statusCode ?? err?.code, - }, - }); + if (active) { + writeSSE(stream, { + event: 'error', + data: { + message: err?.message ?? String(err), + code: err?.statusCode ?? err?.code, + }, + }); + } }) .finally(() => { - unsubscribe(); + cleanup(); stream.end(); }); From 39a59712f4779d01a56ee276777fbaf148e19d02 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 20 May 2026 03:17:40 -0600 Subject: [PATCH 4/5] style: prettier format cliOperations errMsg line Co-Authored-By: Claude Opus 4.7 (1M context) --- bin/cliOperations.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index 21a04132c..0aee1422f 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -269,8 +269,7 @@ async function cliOperations(req: any, skipResponseLog = false) { } } if (sseError) { - const errMsg = - sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError); + const errMsg = sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError); console.error(`error: ${errMsg}`); process.exit(1); } From e190a48382df83573db88ef131b1ff3f39957cc6 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 20 May 2026 04:32:47 -0600 Subject: [PATCH 5/5] fix(sse): split multi-line data into per-line data: fields per spec Per the SSE spec, if a data value contains newlines each line must be emitted as a separate data: field. A single data: field with embedded newlines is not valid. Co-Authored-By: Claude Opus 4.7 (1M context) --- server/serverHelpers/progressEmitter.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/serverHelpers/progressEmitter.ts b/server/serverHelpers/progressEmitter.ts index af2cc8bd9..f383957ca 100644 --- a/server/serverHelpers/progressEmitter.ts +++ b/server/serverHelpers/progressEmitter.ts @@ -90,5 +90,9 @@ export function createSSEResponseStream(emitter: ProgressEmitter, operation: () function writeSSE(stream: PassThrough, event: ProgressEvent): void { const data = typeof event.data === 'string' ? event.data : JSON.stringify(event.data); - stream.write(`event: ${event.event}\ndata: ${data}\n\n`); + stream.write(`event: ${event.event}\n`); + for (const line of data.split(/\r?\n/)) { + stream.write(`data: ${line}\n`); + } + stream.write('\n'); }