diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index 79c235a1f..0aee1422f 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -269,17 +269,28 @@ async function cliOperations(req: any, skipResponseLog = false) { } } if (sseError) { - console.error(`error: ${sseError.message ?? sseError}`); + const errMsg = sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError); + console.error(`error: ${errMsg}`); process.exit(1); } responseData = finalResult ?? { message: 'Deploy completed (no result payload).' }; } else { + // When useSse is true, httpRequest returns a raw IncomingMessage (streamResponse mode), + // so .body is undefined. Drain the stream to get the text (e.g. a 401 error body). + let bodyText: string; + if (useSse) { + const chunks: Buffer[] = []; + for await (const chunk of response as AsyncIterable) chunks.push(Buffer.from(chunk)); + bodyText = Buffer.concat(chunks).toString('utf8'); + } else { + bodyText = response.body; + } try { - responseData = JSON.parse(response.body); + responseData = JSON.parse(bodyText); } catch { responseData = { status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'), - body: response.body, + body: bodyText, }; } } diff --git a/bin/sseConsumer.ts b/bin/sseConsumer.ts index f279013dd..b89485f15 100644 --- a/bin/sseConsumer.ts +++ b/bin/sseConsumer.ts @@ -1,3 +1,4 @@ +import { StringDecoder } from 'node:string_decoder'; import type { Readable } from 'node:stream'; export interface SSEMessage { @@ -15,9 +16,10 @@ export interface SSEMessage { * Readable does not guarantee chunks align with SSE record boundaries. */ export async function* parseSSE(stream: Readable): AsyncGenerator { + const decoder = new StringDecoder('utf8'); let buffer = ''; for await (const chunk of stream) { - buffer += chunk.toString('utf8'); + buffer += typeof chunk === 'string' ? chunk : decoder.write(chunk); while (true) { const recordEnd = buffer.indexOf('\n\n'); const crlfEnd = buffer.indexOf('\r\n\r\n'); @@ -37,6 +39,7 @@ export async function* parseSSE(stream: Readable): AsyncGenerator { if (msg) yield msg; } } + buffer += decoder.end(); // Any trailing record without a terminating blank line is treated as a final message, // matching the looser behavior browsers exhibit on connection close. if (buffer.trim()) { diff --git a/components/operations.js b/components/operations.js index 088f0e8e9..c7fc338b9 100644 --- a/components/operations.js +++ b/components/operations.js @@ -17,7 +17,10 @@ const manageThreads = require('../server/threads/manageThreads.js'); const { packageDirectory } = require('../components/packageComponent.ts'); const { Resources } = require('../resources/Resources.ts'); const { Application, prepareApplication } = require('./Application.ts'); +const { stagePayloadToTempFile } = require('./payloadStaging.ts'); const { server } = require('../server/Server.ts'); +const { Readable } = require('node:stream'); +const { createReadStream } = require('node:fs'); /** * Read the settings.js file and return the @@ -389,6 +392,21 @@ async function deployComponent(req) { await configUtils.addConfig(req.project, applicationConfig); } + // Stage streamed payloads to a temp file when replication is needed. The payload + // Readable is consumed once by local extraction; without staging, replicas would have + // nothing to relay. Skipped when there are no peers (the only-local-deploy case keeps + // its zero-disk-copy property) or when the deploy is package-identifier-based. + let stagedPayload; + const needsReplication = req.replicated !== false && (server.nodes?.length ?? 0) > 0; + if (req.payload instanceof Readable && needsReplication) { + stagedPayload = await stagePayloadToTempFile(req.payload, req.project); + // Re-source the local extraction from the staged file. The Application sees a regular + // fs Readable rather than the original chunked HTTP body, which keeps backpressure + // behavior of extract identical to the non-replicated case. + req.payload = createReadStream(stagedPayload.path); + req._stagedPayloadPath = stagedPayload.path; + } + const application = new Application({ name: req.project, payload: req.payload, @@ -405,6 +423,8 @@ async function deployComponent(req) { await prepareApplication(application); } catch (err) { progress?.emit('phase', { phase: 'extract_or_install', status: 'error', message: err?.message ?? String(err) }); + // Clean up the staged payload on early failure so we don't leak disk. + stagedPayload?.cleanup().catch(() => {}); throw err; } progress?.emit('phase', { phase: 'install', status: 'done' }); @@ -444,7 +464,15 @@ async function deployComponent(req) { // plain `{listeners:[]}` object would still take the `if (progress)` branch and then // throw `TypeError: progress.emit is not a function`. Strip it before fan-out. delete req.progress; - let response = await server.replication.replicateOperation(req); + let response; + try { + response = await server.replication.replicateOperation(req); + } finally { + // Whether replication succeeded, failed, or didn't run, the staged payload has served + // its purpose. Best-effort cleanup; if the rm fails (e.g. file already gone on a retry + // path) we don't surface that to the deploy caller. + await stagedPayload?.cleanup().catch(() => {}); + } progress?.emit('phase', { phase: 'replicate', status: 'done' }); if (req.restart === true) { progress?.emit('phase', { phase: 'restart', status: 'start' }); diff --git a/components/payloadStaging.ts b/components/payloadStaging.ts new file mode 100644 index 000000000..ef59491ba --- /dev/null +++ b/components/payloadStaging.ts @@ -0,0 +1,41 @@ +import { createWriteStream } from 'node:fs'; +import { mkdtemp, rm } from 'node:fs/promises'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { pipeline } from 'node:stream/promises'; +import type { Readable } from 'node:stream'; + +/** + * Buffer a streamed deploy_component payload to a temp tar.gz file on disk. + * + * Used by replicated deploys: the origin needs to keep a copy of the streamed payload so + * it can re-stream it to each peer over the HTTPS relay (see harper-pro's deployRelay). + * Without this, the payload Readable would be consumed once by local extraction and gone + * by the time replication runs. + * + * Trade-off: we write the full payload to disk before extraction reads from it (instead of + * tee-ing in-flight) so the local-deploy path stays unchanged — extractApplication still + * gets a regular createReadStream over a complete file. Two passes over the data, but no + * concurrent-tee complexity, and disk speed isn't the bottleneck on deploy. + * + * Returns the staged file's path and a cleanup function. The cleanup deletes the temp + * directory; safe to call multiple times (rm with force). + */ +export async function stagePayloadToTempFile( + source: Readable, + projectName: string +): Promise<{ path: string; cleanup: () => Promise }> { + const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`)); + const path = join(dir, 'payload.tar.gz'); + await pipeline(source, createWriteStream(path)); + const cleanup = async () => { + await rm(dir, { recursive: true, force: true }); + }; + return { path, cleanup }; +} + +function sanitize(name: string): string { + // keep alphanumerics, dashes, underscores; replace everything else so a malicious or + // quirky project name (slashes, dots, control chars) can't escape the tmpdir. + return name.replace(/[^a-zA-Z0-9._-]/g, '_').slice(0, 64); +} diff --git a/server/serverHelpers/progressEmitter.ts b/server/serverHelpers/progressEmitter.ts index cf390f995..f383957ca 100644 --- a/server/serverHelpers/progressEmitter.ts +++ b/server/serverHelpers/progressEmitter.ts @@ -48,25 +48,40 @@ export class ProgressEmitter { export function createSSEResponseStream(emitter: ProgressEmitter, operation: () => Promise): Readable { const stream = new PassThrough(); + let active = true; const unsubscribe = emitter.subscribe((event) => { - writeSSE(stream, event); + if (active) writeSSE(stream, event); }); + const cleanup = () => { + if (active) { + active = false; + unsubscribe(); + } + }; + + // If the client disconnects (Ctrl-C, network drop) stop writing to the stream and + // release the emitter subscription so it doesn't accumulate for the operation lifetime. + stream.on('close', cleanup); + stream.on('end', cleanup); + operation() .then((result) => { - writeSSE(stream, { event: 'done', data: { result } }); + if (active) writeSSE(stream, { event: 'done', data: { result } }); }) .catch((err) => { - writeSSE(stream, { - event: 'error', - data: { - message: err?.message ?? String(err), - code: err?.statusCode ?? err?.code, - }, - }); + if (active) { + writeSSE(stream, { + event: 'error', + data: { + message: err?.message ?? String(err), + code: err?.statusCode ?? err?.code, + }, + }); + } }) .finally(() => { - unsubscribe(); + cleanup(); stream.end(); }); @@ -75,5 +90,9 @@ export function createSSEResponseStream(emitter: ProgressEmitter, operation: () function writeSSE(stream: PassThrough, event: ProgressEvent): void { const data = typeof event.data === 'string' ? event.data : JSON.stringify(event.data); - stream.write(`event: ${event.event}\ndata: ${data}\n\n`); + stream.write(`event: ${event.event}\n`); + for (const line of data.split(/\r?\n/)) { + stream.write(`data: ${line}\n`); + } + stream.write('\n'); } diff --git a/unitTests/components/payloadStaging.test.js b/unitTests/components/payloadStaging.test.js new file mode 100644 index 000000000..68b9dec43 --- /dev/null +++ b/unitTests/components/payloadStaging.test.js @@ -0,0 +1,67 @@ +'use strict'; + +const assert = require('node:assert'); +const { Readable } = require('node:stream'); +const { readFile, stat } = require('node:fs/promises'); +const path = require('node:path'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { stagePayloadToTempFile } = require('#src/components/payloadStaging'); + +describe('stagePayloadToTempFile', () => { + it('writes a streamed payload to a temp file with the expected contents', async () => { + const payload = Buffer.from('abcdefghij'.repeat(10000), 'utf8'); // 100 KB + const { path: tmpPath, cleanup } = await stagePayloadToTempFile(Readable.from(payload), 'demo'); + try { + const written = await readFile(tmpPath); + assert.strictEqual(written.length, payload.length); + assert.deepStrictEqual(written, payload); + assert.match(tmpPath, /harper-deploy-demo-/, 'temp dir is named after the project'); + assert.strictEqual(path.basename(tmpPath), 'payload.tar.gz'); + } finally { + await cleanup(); + } + }); + + it('cleanup() removes the staged file and its parent temp dir', async () => { + const { path: tmpPath, cleanup } = await stagePayloadToTempFile(Readable.from('hello'), 'cleanup-test'); + await cleanup(); + await assert.rejects(stat(tmpPath), /ENOENT/, 'staged file must be gone'); + await assert.rejects(stat(path.dirname(tmpPath)), /ENOENT/, 'staged dir must be gone'); + }); + + it('cleanup() is safe to call twice (force: true)', async () => { + const { cleanup } = await stagePayloadToTempFile(Readable.from('hello'), 'double-cleanup'); + await cleanup(); + await cleanup(); // must not throw + }); + + it('sanitizes path-traversal characters in the project name', async () => { + const { path: tmpPath, cleanup } = await stagePayloadToTempFile(Readable.from('x'), '../../evil/name'); + try { + // Path separators must be replaced so the temp dir lives in a single mkdtemp slot + // directly under tmpdir, not navigated upstream. `..` segments alone don't traverse + // because there's no `/` between them after sanitization. + const os = require('node:os'); + assert.strictEqual( + path.dirname(path.dirname(tmpPath)), + path.resolve(os.tmpdir()), + 'staged dir must live directly under tmpdir' + ); + assert.doesNotMatch(path.basename(path.dirname(tmpPath)), /\//); + assert.match(path.basename(path.dirname(tmpPath)), /harper-deploy-.+_evil_name-/); + } finally { + await cleanup(); + } + }); + + it('propagates source-stream errors through pipeline', async () => { + const source = new Readable({ + read() { + this.destroy(new Error('source boom')); + }, + }); + await assert.rejects(stagePayloadToTempFile(source, 'erroring'), /source boom/); + }); +});