Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions bin/cliOperations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,28 @@ async function cliOperations(req: any, skipResponseLog = false) {
}
}
if (sseError) {
console.error(`error: ${sseError.message ?? sseError}`);
const errMsg = sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError);
console.error(`error: ${errMsg}`);
process.exit(1);
}
responseData = finalResult ?? { message: 'Deploy completed (no result payload).' };
} else {
// When useSse is true, httpRequest returns a raw IncomingMessage (streamResponse mode),
// so .body is undefined. Drain the stream to get the text (e.g. a 401 error body).
let bodyText: string;
if (useSse) {
const chunks: Buffer[] = [];
for await (const chunk of response as AsyncIterable<Buffer>) chunks.push(Buffer.from(chunk));
bodyText = Buffer.concat(chunks).toString('utf8');
} else {
bodyText = response.body;
}
try {
responseData = JSON.parse(response.body);
responseData = JSON.parse(bodyText);
} catch {
responseData = {
status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'),
body: response.body,
body: bodyText,
};
}
}
Expand Down
5 changes: 4 additions & 1 deletion bin/sseConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { StringDecoder } from 'node:string_decoder';
import type { Readable } from 'node:stream';

export interface SSEMessage {
Expand All @@ -15,9 +16,10 @@ export interface SSEMessage {
* Readable does not guarantee chunks align with SSE record boundaries.
*/
export async function* parseSSE(stream: Readable): AsyncGenerator<SSEMessage> {
const decoder = new StringDecoder('utf8');
let buffer = '';
for await (const chunk of stream) {
buffer += chunk.toString('utf8');
buffer += typeof chunk === 'string' ? chunk : decoder.write(chunk);
while (true) {
const recordEnd = buffer.indexOf('\n\n');
const crlfEnd = buffer.indexOf('\r\n\r\n');
Expand All @@ -37,6 +39,7 @@ export async function* parseSSE(stream: Readable): AsyncGenerator<SSEMessage> {
if (msg) yield msg;
}
}
buffer += decoder.end();
// Any trailing record without a terminating blank line is treated as a final message,
// matching the looser behavior browsers exhibit on connection close.
if (buffer.trim()) {
Expand Down
30 changes: 29 additions & 1 deletion components/operations.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ const manageThreads = require('../server/threads/manageThreads.js');
const { packageDirectory } = require('../components/packageComponent.ts');
const { Resources } = require('../resources/Resources.ts');
const { Application, prepareApplication } = require('./Application.ts');
const { stagePayloadToTempFile } = require('./payloadStaging.ts');
const { server } = require('../server/Server.ts');
const { Readable } = require('node:stream');
const { createReadStream } = require('node:fs');

/**
* Read the settings.js file and return the
Expand Down Expand Up @@ -389,6 +392,21 @@ async function deployComponent(req) {
await configUtils.addConfig(req.project, applicationConfig);
}

// Stage streamed payloads to a temp file when replication is needed. The payload
// Readable is consumed once by local extraction; without staging, replicas would have
// nothing to relay. Skipped when there are no peers (the only-local-deploy case keeps
// its zero-disk-copy property) or when the deploy is package-identifier-based.
let stagedPayload;
const needsReplication = req.replicated !== false && (server.nodes?.length ?? 0) > 0;
if (req.payload instanceof Readable && needsReplication) {
stagedPayload = await stagePayloadToTempFile(req.payload, req.project);
// Re-source the local extraction from the staged file. The Application sees a regular
// fs Readable rather than the original chunked HTTP body, which keeps backpressure
// behavior of extract identical to the non-replicated case.
req.payload = createReadStream(stagedPayload.path);
req._stagedPayloadPath = stagedPayload.path;
}

const application = new Application({
name: req.project,
payload: req.payload,
Expand All @@ -405,6 +423,8 @@ async function deployComponent(req) {
await prepareApplication(application);
} catch (err) {
progress?.emit('phase', { phase: 'extract_or_install', status: 'error', message: err?.message ?? String(err) });
// Clean up the staged payload on early failure so we don't leak disk.
stagedPayload?.cleanup().catch(() => {});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catches prepareApplication failures, but there's a gap between this catch and the replicateOperation finally: the load-phase throw lastError path (the if (lastError) throw lastError block that runs when the pseudo-resource load fails, after line ~452) is not covered by either cleanup handler. If a multi-node streaming deploy reaches the load phase and the component fails to load, stagedPayload leaks.

Suggested change
stagedPayload?.cleanup().catch(() => {});
stagedPayload?.cleanup().catch(() => {});

Same one-liner before throw lastError in the load-error branch (or restructure with a single outer try/finally wrapping both phases).

throw err;
}
progress?.emit('phase', { phase: 'install', status: 'done' });
Expand Down Expand Up @@ -444,7 +464,15 @@ async function deployComponent(req) {
// plain `{listeners:[]}` object would still take the `if (progress)` branch and then
// throw `TypeError: progress.emit is not a function`. Strip it before fan-out.
delete req.progress;
let response = await server.replication.replicateOperation(req);
let response;
try {
response = await server.replication.replicateOperation(req);
} finally {
// Whether replication succeeded, failed, or didn't run, the staged payload has served
// its purpose. Best-effort cleanup; if the rm fails (e.g. file already gone on a retry
// path) we don't surface that to the deploy caller.
await stagedPayload?.cleanup().catch(() => {});
}
progress?.emit('phase', { phase: 'replicate', status: 'done' });
if (req.restart === true) {
progress?.emit('phase', { phase: 'restart', status: 'start' });
Expand Down
41 changes: 41 additions & 0 deletions components/payloadStaging.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { createWriteStream } from 'node:fs';
import { mkdtemp, rm } from 'node:fs/promises';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { pipeline } from 'node:stream/promises';
import type { Readable } from 'node:stream';

/**
* Buffer a streamed deploy_component payload to a temp tar.gz file on disk.
*
* Used by replicated deploys: the origin needs to keep a copy of the streamed payload so
* it can re-stream it to each peer over the HTTPS relay (see harper-pro's deployRelay).
* Without this, the payload Readable would be consumed once by local extraction and gone
* by the time replication runs.
*
* Trade-off: we write the full payload to disk before extraction reads from it (instead of
* tee-ing in-flight) so the local-deploy path stays unchanged — extractApplication still
* gets a regular createReadStream over a complete file. Two passes over the data, but no
* concurrent-tee complexity, and disk speed isn't the bottleneck on deploy.
*
* Returns the staged file's path and a cleanup function. The cleanup deletes the temp
* directory; safe to call multiple times (rm with force).
*/
export async function stagePayloadToTempFile(
source: Readable,
projectName: string
): Promise<{ path: string; cleanup: () => Promise<void> }> {
const dir = await mkdtemp(join(tmpdir(), `harper-deploy-${sanitize(projectName)}-`));
const path = join(dir, 'payload.tar.gz');
await pipeline(source, createWriteStream(path));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temp dir leaks on pipeline failure (still open from prior run)

mkdtemp already created dir before pipeline runs. If pipeline rejects (source stream error), the function throws without returning a cleanup handle — the caller has no way to delete the orphaned directory. The error-propagation test asserts the rejection propagates but does not assert the temp dir is removed.

Suggested change
await pipeline(source, createWriteStream(path));
const path = join(dir, 'payload.tar.gz');
try {
await pipeline(source, createWriteStream(path));
} catch (err) {
await rm(dir, { recursive: true, force: true }).catch(() => {});
throw err;
}

Also add a test assertion: after stagePayloadToTempFile rejects, stat(path.dirname(...)) should throw ENOENT.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temp dir leaks on pipeline failure (carry from prior run)

mkdtemp already created dir before pipeline runs. If pipeline rejects, the caller never receives a cleanup handle — the directory is orphaned. The error-propagation test asserts the rejection propagates but doesn't assert the temp dir is removed afterward.

Suggested change
await pipeline(source, createWriteStream(path));
try {
await pipeline(source, createWriteStream(path));
} catch (err) {
await rm(dir, { recursive: true, force: true }).catch(() => {});
throw err;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temp dir leaks on pipeline failure (carry from prior run — still open)

mkdtemp already created dir before pipeline runs. If pipeline rejects (source stream error), the function throws without returning a cleanup handle — the caller has no way to delete the orphaned directory.

Suggested change
await pipeline(source, createWriteStream(path));
const path = join(dir, 'payload.tar.gz');
try {
await pipeline(source, createWriteStream(path));
} catch (err) {
await rm(dir, { recursive: true, force: true }).catch(() => {});
throw err;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temp dir leaks on pipeline failure (carry from prior run — still open)

mkdtemp already created dir before pipeline runs. If pipeline rejects (source stream error), the function throws without returning a cleanup handle — the caller has no way to delete the orphaned directory.

Suggested change
await pipeline(source, createWriteStream(path));
const path = join(dir, 'payload.tar.gz');
try {
await pipeline(source, createWriteStream(path));
} catch (err) {
await rm(dir, { recursive: true, force: true }).catch(() => {});
throw err;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still leaks dir on pipeline failure. mkdtemp already ran at line 28; if pipeline throws here the function exits without returning a cleanup handle, so the caller can never delete the orphaned temp directory.

Suggested change
await pipeline(source, createWriteStream(path));
try {
await pipeline(source, createWriteStream(path));
} catch (err) {
await rm(dir, { recursive: true, force: true }).catch(() => {});
throw err;
}

const cleanup = async () => {
await rm(dir, { recursive: true, force: true });
};
return { path, cleanup };
}

function sanitize(name: string): string {
// keep alphanumerics, dashes, underscores; replace everything else so a malicious or
// quirky project name (slashes, dots, control chars) can't escape the tmpdir.
return name.replace(/[^a-zA-Z0-9._-]/g, '_').slice(0, 64);
}
41 changes: 30 additions & 11 deletions server/serverHelpers/progressEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,40 @@ export class ProgressEmitter {
export function createSSEResponseStream(emitter: ProgressEmitter, operation: () => Promise<unknown>): Readable {
const stream = new PassThrough();

let active = true;
const unsubscribe = emitter.subscribe((event) => {
writeSSE(stream, event);
if (active) writeSSE(stream, event);
});

const cleanup = () => {
if (active) {
active = false;
unsubscribe();
}
};

// If the client disconnects (Ctrl-C, network drop) stop writing to the stream and
// release the emitter subscription so it doesn't accumulate for the operation lifetime.
stream.on('close', cleanup);
stream.on('end', cleanup);

operation()
.then((result) => {
writeSSE(stream, { event: 'done', data: { result } });
if (active) writeSSE(stream, { event: 'done', data: { result } });
})
.catch((err) => {
writeSSE(stream, {
event: 'error',
data: {
message: err?.message ?? String(err),
code: err?.statusCode ?? err?.code,
},
});
if (active) {
writeSSE(stream, {
event: 'error',
data: {
message: err?.message ?? String(err),
code: err?.statusCode ?? err?.code,
},
});
}
})
.finally(() => {
unsubscribe();
cleanup();
stream.end();
});

Expand All @@ -75,5 +90,9 @@ export function createSSEResponseStream(emitter: ProgressEmitter, operation: ()

function writeSSE(stream: PassThrough, event: ProgressEvent): void {
const data = typeof event.data === 'string' ? event.data : JSON.stringify(event.data);
stream.write(`event: ${event.event}\ndata: ${data}\n\n`);
stream.write(`event: ${event.event}\n`);
for (const line of data.split(/\r?\n/)) {
stream.write(`data: ${line}\n`);
}
stream.write('\n');
}
67 changes: 67 additions & 0 deletions unitTests/components/payloadStaging.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict';

const assert = require('node:assert');
const { Readable } = require('node:stream');
const { readFile, stat } = require('node:fs/promises');
const path = require('node:path');
const testUtils = require('../testUtils.js');
testUtils.preTestPrep();

const { stagePayloadToTempFile } = require('#src/components/payloadStaging');

describe('stagePayloadToTempFile', () => {
it('writes a streamed payload to a temp file with the expected contents', async () => {
const payload = Buffer.from('abcdefghij'.repeat(10000), 'utf8'); // 100 KB
const { path: tmpPath, cleanup } = await stagePayloadToTempFile(Readable.from(payload), 'demo');
try {
const written = await readFile(tmpPath);
assert.strictEqual(written.length, payload.length);
assert.deepStrictEqual(written, payload);
assert.match(tmpPath, /harper-deploy-demo-/, 'temp dir is named after the project');
assert.strictEqual(path.basename(tmpPath), 'payload.tar.gz');
} finally {
await cleanup();
}
});

it('cleanup() removes the staged file and its parent temp dir', async () => {
const { path: tmpPath, cleanup } = await stagePayloadToTempFile(Readable.from('hello'), 'cleanup-test');
await cleanup();
await assert.rejects(stat(tmpPath), /ENOENT/, 'staged file must be gone');
await assert.rejects(stat(path.dirname(tmpPath)), /ENOENT/, 'staged dir must be gone');
});

it('cleanup() is safe to call twice (force: true)', async () => {
const { cleanup } = await stagePayloadToTempFile(Readable.from('hello'), 'double-cleanup');
await cleanup();
await cleanup(); // must not throw
});

it('sanitizes path-traversal characters in the project name', async () => {
const { path: tmpPath, cleanup } = await stagePayloadToTempFile(Readable.from('x'), '../../evil/name');
try {
// Path separators must be replaced so the temp dir lives in a single mkdtemp slot
// directly under tmpdir, not navigated upstream. `..` segments alone don't traverse
// because there's no `/` between them after sanitization.
const os = require('node:os');
assert.strictEqual(
path.dirname(path.dirname(tmpPath)),
path.resolve(os.tmpdir()),
'staged dir must live directly under tmpdir'
);
assert.doesNotMatch(path.basename(path.dirname(tmpPath)), /\//);
assert.match(path.basename(path.dirname(tmpPath)), /harper-deploy-.+_evil_name-/);
} finally {
await cleanup();
}
});

it('propagates source-stream errors through pipeline', async () => {
const source = new Readable({
read() {
this.destroy(new Error('source boom'));
},
});
await assert.rejects(stagePayloadToTempFile(source, 'erroring'), /source boom/);
});
});
Loading