From bf19a2a0e878119a5220310c52506ad162692ddf Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 13 May 2026 22:15:47 -0600 Subject: [PATCH 1/7] feat(deploy): live SSE progress for deploy_component MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the CLI sends `Accept: text/event-stream` on `deploy_component`, the operations API now returns Server-Sent Events instead of a single buffered response. A ProgressEmitter is attached to the operation request and the handler emits `phase` events at the extract → install → load → replicate → restart boundaries; the stream terminates with a `done` event (carrying the operation result) or an `error` event. The CLI parses the stream live, rendering each phase as it happens so multi-minute deploys no longer look hung. Non-SSE callers see no behavior change — the emitter is undefined on that path and every emission is optional-chained. Builds on #530. First slice of HarperFast/harper#526. Follow-ups: streaming live npm install stdout/stderr as `install` events, and re-emitting per-peer SSE events once the direct-HTTPS replication relay lands in #524 follow-up. Co-Authored-By: Claude Opus 4.7 (1M context) --- bin/cliOperations.js | 57 +++++++- bin/sseConsumer.ts | 123 ++++++++++++++++++ components/Application.ts | 13 +- components/operations.js | 26 +++- server/serverHelpers/progressEmitter.ts | 79 +++++++++++ server/serverHelpers/serverHandlers.js | 22 ++++ unitTests/bin/sseConsumer.test.js | 95 ++++++++++++++ .../serverHelpers/progressEmitter.test.js | 120 +++++++++++++++++ utility/common_utils.js | 16 +++ 9 files changed, 541 insertions(+), 10 deletions(-) create mode 100644 bin/sseConsumer.ts create mode 100644 server/serverHelpers/progressEmitter.ts create mode 100644 unitTests/bin/sseConsumer.test.js create mode 100644 unitTests/server/serverHelpers/progressEmitter.test.js diff --git a/bin/cliOperations.js b/bin/cliOperations.js index f71c4e44f..9477d4d3d 100644 --- a/bin/cliOperations.js +++ b/bin/cliOperations.js @@ -9,11 +9,19 @@ const fs = require('fs-extra'); const YAML = require('yaml'); const { streamPackagedDirectory } = require('../components/packageComponent.ts'); const { buildMultipartBody } = require('./multipartBuilder.ts'); +const { parseSSE, renderDeployProgress } = require('./sseConsumer.ts'); const { getHdbPid } = require('../utility/processManagement/processManagement.js'); const { initConfig, getConfigPath } = require('../config/configUtils.js'); const OP_ALIASES = { deploy: 'deploy_component', package: 'package_component' }; +// Operations whose responses should be consumed as text/event-stream so live phase events +// (extract, install, load, replicate, restart) render as they happen instead of after the +// whole deploy completes. Add an operation here only after wiring its server-side +// SSE_PROGRESS_OPERATIONS entry — otherwise the server returns the buffered JSON path and +// the SSE parser sees no events. +const SSE_OPERATIONS = new Set(['deploy_component']); + // Properties on `req` that the CLI itself uses for transport/UX, not the operations API. // They never get serialized into the request body. const TRANSPORT_ONLY_FIELDS = new Set([ @@ -129,6 +137,11 @@ async function cliOperations(req) { if (target?.username) { options.headers.Authorization = `Basic ${Buffer.from(`${target.username}:${target.password}`).toString('base64')}`; } + const useSse = SSE_OPERATIONS.has(req.operation); + if (useSse) { + options.headers.Accept = 'text/event-stream'; + options.streamResponse = true; + } let body; if (req._multipart) { const packageStream = req._packageStream; @@ -154,13 +167,43 @@ async function cliOperations(req) { let response = await httpRequest(options, body); let responseData; - try { - responseData = JSON.parse(response.body); - } catch { - responseData = { - status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'), - body: response.body, - }; + if (useSse && response.headers['content-type']?.startsWith('text/event-stream')) { + // Consume SSE: render phase events live, capture the final result from the `done` + // event (or the error message from the `error` event). The HTTP status stays 200 + // until end-of-stream; failures are signaled in-band. + const state = {}; + let finalResult; + let sseError; + for await (const message of parseSSE(response)) { + renderDeployProgress(message, state, process.stderr); + if (message.event === 'done') { + try { + finalResult = JSON.parse(message.data)?.result; + } catch { + finalResult = message.data; + } + } else if (message.event === 'error') { + try { + sseError = JSON.parse(message.data); + } catch { + sseError = { message: message.data }; + } + } + } + if (sseError) { + console.error(`error: ${sseError.message ?? sseError}`); + process.exit(1); + } + responseData = finalResult ?? { message: 'Deploy completed (no result payload).' }; + } else { + try { + responseData = JSON.parse(response.body); + } catch { + responseData = { + status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'), + body: response.body, + }; + } } let responseLog; diff --git a/bin/sseConsumer.ts b/bin/sseConsumer.ts new file mode 100644 index 000000000..f279013dd --- /dev/null +++ b/bin/sseConsumer.ts @@ -0,0 +1,123 @@ +import type { Readable } from 'node:stream'; + +export interface SSEMessage { + event: string; + data: string; + id?: string; + retry?: number; +} + +/** + * Parse a Readable carrying Server-Sent Events into structured messages. + * + * Yields one `SSEMessage` per blank-line-terminated record. Handles split data: lines, + * CRLF or LF line endings, and arbitrary chunk boundaries — the underlying Node http + * Readable does not guarantee chunks align with SSE record boundaries. + */ +export async function* parseSSE(stream: Readable): AsyncGenerator { + let buffer = ''; + for await (const chunk of stream) { + buffer += chunk.toString('utf8'); + while (true) { + const recordEnd = buffer.indexOf('\n\n'); + const crlfEnd = buffer.indexOf('\r\n\r\n'); + let endIdx = -1; + let delimLen = 0; + if (recordEnd !== -1 && (crlfEnd === -1 || recordEnd < crlfEnd)) { + endIdx = recordEnd; + delimLen = 2; + } else if (crlfEnd !== -1) { + endIdx = crlfEnd; + delimLen = 4; + } + if (endIdx === -1) break; + const record = buffer.slice(0, endIdx); + buffer = buffer.slice(endIdx + delimLen); + const msg = parseRecord(record); + if (msg) yield msg; + } + } + // Any trailing record without a terminating blank line is treated as a final message, + // matching the looser behavior browsers exhibit on connection close. + if (buffer.trim()) { + const msg = parseRecord(buffer); + if (msg) yield msg; + } +} + +function parseRecord(record: string): SSEMessage | null { + const lines = record.split(/\r?\n/); + let event = 'message'; + let id: string | undefined; + let retry: number | undefined; + const dataLines: string[] = []; + for (const line of lines) { + if (line === '' || line.startsWith(':')) continue; + const colon = line.indexOf(':'); + const field = colon === -1 ? line : line.slice(0, colon); + // Per spec, a leading space after the colon is stripped. + let value = colon === -1 ? '' : line.slice(colon + 1); + if (value.startsWith(' ')) value = value.slice(1); + switch (field) { + case 'event': + event = value; + break; + case 'data': + dataLines.push(value); + break; + case 'id': + id = value; + break; + case 'retry': { + const n = Number(value); + if (Number.isFinite(n)) retry = n; + break; + } + } + } + if (dataLines.length === 0 && event === 'message') return null; + return { event, data: dataLines.join('\n'), id, retry }; +} + +interface RenderState { + currentPhase?: string; +} + +/** + * Render SSE deploy events as terse, line-oriented progress to stderr (so stdout stays + * reserved for the final JSON/YAML response document). Phase transitions print once. + */ +export function renderDeployProgress(message: SSEMessage, state: RenderState, output: NodeJS.WritableStream): void { + let parsed: unknown; + try { + parsed = JSON.parse(message.data); + } catch { + parsed = message.data; + } + switch (message.event) { + case 'phase': { + const p = parsed as { phase?: string; status?: string; rolling?: boolean }; + const label = p.phase ?? '?'; + if (p.status === 'start') { + if (state.currentPhase !== label) { + output.write(`${label}…\n`); + state.currentPhase = label; + } + } else if (p.status === 'done') { + output.write(`${label} done\n`); + } else if (p.status === 'error') { + const msg = (parsed as { message?: string }).message ?? 'failed'; + output.write(`${label} ERROR: ${msg}\n`); + } + break; + } + case 'error': { + const e = parsed as { message?: string; code?: string | number }; + output.write(`error: ${e.message ?? message.data}${e.code ? ` (${e.code})` : ''}\n`); + break; + } + case 'done': + // Caller picks up the final result via the SSE iterator; nothing to render here. + break; + } +} diff --git a/components/Application.ts b/components/Application.ts index 92f9dc0bf..4134b8b79 100644 --- a/components/Application.ts +++ b/components/Application.ts @@ -431,6 +431,10 @@ export class Application { dirPath: string; logger: Logger; packageManagerPrefix: string; // can be used to configure a package manager prefix, specifically "sfw". + // Optional progress emitter for SSE-style reporting. Set by the operations API when the + // caller requested `Accept: text/event-stream`. Undefined for the historical + // single-response code path; phase-event emissions are all optional-chained off this. + progress?: { emit(event: string, data: unknown): void }; constructor({ name, payload, packageIdentifier, install }: ApplicationOptions) { this.name = name; @@ -473,7 +477,14 @@ export function derivePackageIdentifier(packageIdentifier: string) { * @returns A promise that resolves when all preparation steps complete. */ export function prepareApplication(application: Application) { - return extractApplication(application).then(() => installApplication(application)); + return extractApplication(application).then(() => { + // extractApplication finished; the next phase is install. We emit the boundary here so + // the SSE consumer sees `extract done → install start` in order even though Application + // itself isn't aware of which phase comes next. + application.progress?.emit('phase', { phase: 'extract', status: 'done' }); + application.progress?.emit('phase', { phase: 'install', status: 'start' }); + return installApplication(application); + }); } /** diff --git a/components/operations.js b/components/operations.js index 80fa3b384..ac8d76dcc 100644 --- a/components/operations.js +++ b/components/operations.js @@ -348,6 +348,11 @@ async function packageComponent(req) { * @returns {Promise} */ async function deployComponent(req) { + // `req.progress` is a ProgressEmitter set by handlePostRequest when the client sends + // `Accept: text/event-stream`. Non-SSE callers leave it undefined; the optional-chained + // calls below are no-ops in that case, keeping the historical single-response path intact. + const progress = req.progress; + if (req.project) { req.project = path.parse(req.project).name; } else if (req.package) { @@ -393,12 +398,21 @@ async function deployComponent(req) { timeout: req.install_timeout, }, }); + if (progress) application.progress = progress; - await prepareApplication(application); + progress?.emit('phase', { phase: 'extract', status: 'start' }); + try { + await prepareApplication(application); + } catch (err) { + progress?.emit('phase', { phase: 'extract_or_install', status: 'error', message: err?.message ?? String(err) }); + throw err; + } + progress?.emit('phase', { phase: 'install', status: 'done' }); // now we attempt to actually load the component in case there is // an error we can immediately detect and report, but app code should not run on the main thread if (!isMainThread && !process.env.HARPER_SAFE_MODE) { + progress?.emit('phase', { phase: 'load', status: 'start' }); const pseudoResources = new Resources(); pseudoResources.isWorker = true; @@ -415,16 +429,24 @@ async function deployComponent(req) { req.project ); - if (lastError) throw lastError; + if (lastError) { + progress?.emit('phase', { phase: 'load', status: 'error', message: lastError?.message ?? String(lastError) }); + throw lastError; + } + progress?.emit('phase', { phase: 'load', status: 'done' }); } const rollingRestart = req.restart === 'rolling'; // if doing a rolling restart set restart to false so that other nodes don't also restart. req.restart = rollingRestart ? false : req.restart; + progress?.emit('phase', { phase: 'replicate', status: 'start' }); let response = await server.replication.replicateOperation(req); + progress?.emit('phase', { phase: 'replicate', status: 'done' }); if (req.restart === true) { + progress?.emit('phase', { phase: 'restart', status: 'start' }); manageThreads.restartWorkers('http'); response.message = `Successfully deployed: ${application.name}, restarting Harper`; } else if (rollingRestart) { + progress?.emit('phase', { phase: 'restart', status: 'start', rolling: true }); const serverUtilities = require('../server/serverHelpers/serverUtilities.ts'); const jobResponse = await serverUtilities.executeJob({ operation: 'restart_service', diff --git a/server/serverHelpers/progressEmitter.ts b/server/serverHelpers/progressEmitter.ts new file mode 100644 index 000000000..cf390f995 --- /dev/null +++ b/server/serverHelpers/progressEmitter.ts @@ -0,0 +1,79 @@ +import { PassThrough, Readable } from 'node:stream'; + +export interface ProgressEvent { + event: string; + data: unknown; +} + +export type ProgressListener = (event: ProgressEvent) => void; + +/** + * Lightweight pub-sub used to report phase/install/replicate events from a long-running + * operation back to the HTTP layer. We deliberately don't use Node's EventEmitter here: + * we only need broadcast semantics for a small set of event types, and we want the + * `emit(event, data)` shape that matches the SSE wire format directly. + */ +export class ProgressEmitter { + private listeners: ProgressListener[] = []; + + emit(event: string, data: unknown): void { + // Snapshot before iteration so a listener that unsubscribes itself during dispatch + // doesn't shift indexes underneath us. + const snapshot = this.listeners.slice(); + for (const listener of snapshot) { + try { + listener({ event, data }); + } catch { + // A buggy listener must never break the operation. Swallow and continue. + } + } + } + + subscribe(listener: ProgressListener): () => void { + this.listeners.push(listener); + return () => { + const i = this.listeners.indexOf(listener); + if (i !== -1) this.listeners.splice(i, 1); + }; + } +} + +/** + * Wrap a long-running operation so its progress events stream back as Server-Sent Events. + * + * The returned Readable emits one SSE message per `emitter.emit(...)` call, then a final + * `done` (or `error`) event with the operation's result, then ends. The caller is + * expected to set Content-Type: text/event-stream on the response. + */ +export function createSSEResponseStream(emitter: ProgressEmitter, operation: () => Promise): Readable { + const stream = new PassThrough(); + + const unsubscribe = emitter.subscribe((event) => { + writeSSE(stream, event); + }); + + operation() + .then((result) => { + writeSSE(stream, { event: 'done', data: { result } }); + }) + .catch((err) => { + writeSSE(stream, { + event: 'error', + data: { + message: err?.message ?? String(err), + code: err?.statusCode ?? err?.code, + }, + }); + }) + .finally(() => { + unsubscribe(); + stream.end(); + }); + + return stream; +} + +function writeSSE(stream: PassThrough, event: ProgressEvent): void { + const data = typeof event.data === 'string' ? event.data : JSON.stringify(event.data); + stream.write(`event: ${event.event}\ndata: ${data}\n\n`); +} diff --git a/server/serverHelpers/serverHandlers.js b/server/serverHelpers/serverHandlers.js index 686b28d05..8fb03d928 100644 --- a/server/serverHelpers/serverHandlers.js +++ b/server/serverHelpers/serverHandlers.js @@ -15,6 +15,13 @@ const pAuthorize = util.promisify(auth.authorize); const serverUtilities = require('./serverUtilities.ts'); const { applyImpersonation } = require('../../security/impersonation.ts'); const { createGzip, constants } = require('zlib'); +const { ProgressEmitter, createSSEResponseStream } = require('./progressEmitter.ts'); + +// Operations that support `Accept: text/event-stream` for live progress reporting. +// Adding an operation here means its handler may read `req.body.progress` (a ProgressEmitter) +// and emit phase/install/replicate events; non-SSE clients still get the historical +// single-response behavior because the emitter is undefined on that path. +const SSE_PROGRESS_OPERATIONS = new Set([terms.OPERATIONS_ENUM.DEPLOY_COMPONENT]); const NO_AUTH_OPERATIONS = [ terms.OPERATIONS_ENUM.CREATE_AUTHENTICATION_TOKENS, @@ -119,6 +126,21 @@ async function handlePostRequest(req, res, _bypassAuth = false) { if (req.body.bypass_auth) delete req.body.bypass_auth; operation_function = serverUtilities.chooseOperation(req.body); + + // SSE progress branch — when the client asks for `text/event-stream` on an operation + // that supports it, run the operation in the background and stream events back as they + // happen. The progress emitter is attached to req.body so the operation handler can + // emit without changing its return signature; non-SSE callers leave `progress` + // undefined and the handler stays on its synchronous result path. + if (req.headers.accept === 'text/event-stream' && SSE_PROGRESS_OPERATIONS.has(req.body.operation)) { + const emitter = new ProgressEmitter(); + req.body.progress = emitter; + res.header('Content-Type', 'text/event-stream'); + res.header('Cache-Control', 'no-cache'); + res.header('X-Accel-Buffering', 'no'); // disable proxy buffering so events flush in real time + return createSSEResponseStream(emitter, () => serverUtilities.processLocalTransaction(req, operation_function)); + } + let result = await serverUtilities.processLocalTransaction(req, operation_function); if (result instanceof Readable && result.headers) { for (let [name, value] of result.headers) { diff --git a/unitTests/bin/sseConsumer.test.js b/unitTests/bin/sseConsumer.test.js new file mode 100644 index 000000000..2bc1c25a2 --- /dev/null +++ b/unitTests/bin/sseConsumer.test.js @@ -0,0 +1,95 @@ +'use strict'; + +const assert = require('node:assert'); +const { Readable, PassThrough } = require('node:stream'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { parseSSE, renderDeployProgress } = require('#src/bin/sseConsumer'); + +async function collectMessages(stream) { + const out = []; + for await (const msg of parseSSE(stream)) out.push(msg); + return out; +} + +describe('parseSSE', () => { + it('parses a single record', async () => { + const stream = Readable.from(['event: phase\ndata: {"phase":"extract"}\n\n']); + const msgs = await collectMessages(stream); + assert.deepStrictEqual(msgs, [{ event: 'phase', data: '{"phase":"extract"}', id: undefined, retry: undefined }]); + }); + + it('joins multi-line data fields with \\n per the SSE spec', async () => { + const stream = Readable.from(['event: install\ndata: line1\ndata: line2\n\n']); + const msgs = await collectMessages(stream); + assert.strictEqual(msgs[0].data, 'line1\nline2'); + }); + + it('handles records split across multiple chunks', async () => { + const stream = new PassThrough(); + const collector = collectMessages(stream); + stream.write('event: phase\n'); + stream.write('data: {"phase":"extract","sta'); + stream.write('tus":"start"}\n\n'); + stream.write('event: phase\ndata: {"phase":"extract","status":"done"}\n\n'); + stream.end(); + const msgs = await collector; + assert.strictEqual(msgs.length, 2); + assert.strictEqual(JSON.parse(msgs[0].data).status, 'start'); + assert.strictEqual(JSON.parse(msgs[1].data).status, 'done'); + }); + + it('handles CRLF line endings', async () => { + const stream = Readable.from(['event: phase\r\ndata: {"x":1}\r\n\r\n']); + const msgs = await collectMessages(stream); + assert.strictEqual(msgs[0].event, 'phase'); + assert.deepStrictEqual(JSON.parse(msgs[0].data), { x: 1 }); + }); + + it('strips a single leading space after the colon, per spec', async () => { + const stream = Readable.from(['data: hello\n\n']); + const msgs = await collectMessages(stream); + // only ONE space is stripped, the second remains + assert.strictEqual(msgs[0].data, ' hello'); + }); + + it('ignores comment lines (leading colon)', async () => { + const stream = Readable.from([': heartbeat\nevent: ping\ndata: ok\n\n']); + const msgs = await collectMessages(stream); + assert.strictEqual(msgs[0].event, 'ping'); + assert.strictEqual(msgs[0].data, 'ok'); + }); +}); + +describe('renderDeployProgress', () => { + it('prints a phase line on start and another on done', () => { + const lines = []; + const out = { write: (s) => lines.push(s) }; + const state = {}; + renderDeployProgress({ event: 'phase', data: JSON.stringify({ phase: 'extract', status: 'start' }) }, state, out); + renderDeployProgress({ event: 'phase', data: JSON.stringify({ phase: 'extract', status: 'done' }) }, state, out); + assert.deepStrictEqual(lines, ['extract…\n', 'extract done\n']); + }); + + it('does not repeat the same phase-start line', () => { + const lines = []; + const out = { write: (s) => lines.push(s) }; + const state = {}; + const msg = { event: 'phase', data: JSON.stringify({ phase: 'extract', status: 'start' }) }; + renderDeployProgress(msg, state, out); + renderDeployProgress(msg, state, out); + assert.strictEqual(lines.length, 1); + }); + + it('prints an error line with the message', () => { + const lines = []; + const out = { write: (s) => lines.push(s) }; + renderDeployProgress( + { event: 'error', data: JSON.stringify({ message: 'npm install failed', code: 500 }) }, + {}, + out + ); + assert.match(lines[0], /error: npm install failed \(500\)/); + }); +}); diff --git a/unitTests/server/serverHelpers/progressEmitter.test.js b/unitTests/server/serverHelpers/progressEmitter.test.js new file mode 100644 index 000000000..5111d556f --- /dev/null +++ b/unitTests/server/serverHelpers/progressEmitter.test.js @@ -0,0 +1,120 @@ +'use strict'; + +const assert = require('node:assert'); +const { Readable } = require('node:stream'); +const testUtils = require('../../testUtils.js'); +testUtils.preTestPrep(); + +const { ProgressEmitter, createSSEResponseStream } = require('#src/server/serverHelpers/progressEmitter'); + +function collect(stream) { + return new Promise((resolve, reject) => { + const chunks = []; + stream.on('data', (c) => chunks.push(c)); + stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); + stream.on('error', reject); + }); +} + +function parseSSEBlocks(text) { + return text + .split('\n\n') + .filter((block) => block.trim().length > 0) + .map((block) => { + const out = {}; + for (const line of block.split('\n')) { + const colon = line.indexOf(':'); + if (colon === -1) continue; + const field = line.slice(0, colon); + let value = line.slice(colon + 1); + if (value.startsWith(' ')) value = value.slice(1); + out[field] = value; + } + return out; + }); +} + +describe('ProgressEmitter', () => { + it('delivers events to every subscriber', () => { + const emitter = new ProgressEmitter(); + const a = []; + const b = []; + emitter.subscribe((e) => a.push(e)); + emitter.subscribe((e) => b.push(e)); + emitter.emit('phase', { phase: 'extract', status: 'start' }); + emitter.emit('phase', { phase: 'extract', status: 'done' }); + assert.deepStrictEqual(a, [ + { event: 'phase', data: { phase: 'extract', status: 'start' } }, + { event: 'phase', data: { phase: 'extract', status: 'done' } }, + ]); + assert.deepStrictEqual(b, a); + }); + + it('unsubscribe stops further delivery', () => { + const emitter = new ProgressEmitter(); + const received = []; + const unsubscribe = emitter.subscribe((e) => received.push(e)); + emitter.emit('phase', { phase: 'extract', status: 'start' }); + unsubscribe(); + emitter.emit('phase', { phase: 'extract', status: 'done' }); + assert.strictEqual(received.length, 1); + }); + + it('swallows listener exceptions so operations are never broken by a buggy subscriber', () => { + const emitter = new ProgressEmitter(); + emitter.subscribe(() => { + throw new Error('listener boom'); + }); + const ok = []; + emitter.subscribe((e) => ok.push(e)); + emitter.emit('phase', { phase: 'extract', status: 'start' }); + assert.strictEqual(ok.length, 1); + }); +}); + +describe('createSSEResponseStream', () => { + it('streams emitter events then a terminating `done` event with the operation result', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => { + emitter.emit('phase', { phase: 'extract', status: 'start' }); + await new Promise((r) => setImmediate(r)); + emitter.emit('phase', { phase: 'extract', status: 'done' }); + return { message: 'Successfully deployed: demo' }; + }); + const text = await collect(stream); + const events = parseSSEBlocks(text); + assert.strictEqual(events.length, 3); + assert.strictEqual(events[0].event, 'phase'); + assert.deepStrictEqual(JSON.parse(events[0].data), { phase: 'extract', status: 'start' }); + assert.strictEqual(events[1].event, 'phase'); + assert.deepStrictEqual(JSON.parse(events[1].data), { phase: 'extract', status: 'done' }); + assert.strictEqual(events[2].event, 'done'); + assert.deepStrictEqual(JSON.parse(events[2].data), { result: { message: 'Successfully deployed: demo' } }); + }); + + it('streams an `error` event when the operation rejects', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => { + emitter.emit('phase', { phase: 'extract', status: 'start' }); + const err = new Error('boom'); + err.statusCode = 500; + throw err; + }); + const events = parseSSEBlocks(await collect(stream)); + assert.strictEqual(events[events.length - 1].event, 'error'); + const errData = JSON.parse(events[events.length - 1].data); + assert.strictEqual(errData.message, 'boom'); + assert.strictEqual(errData.code, 500); + }); + + it('still closes the stream cleanly when the operation emits nothing', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => ({ ok: true })); + const events = parseSSEBlocks(await collect(stream)); + assert.strictEqual(events.length, 1); + assert.strictEqual(events[0].event, 'done'); + }); +}); + +// keep Readable import live for any future tests that need stream sources +void Readable; diff --git a/utility/common_utils.js b/utility/common_utils.js index ca7f2f4f6..1478f8f20 100644 --- a/utility/common_utils.js +++ b/utility/common_utils.js @@ -806,8 +806,24 @@ function httpRequest(options, data) { let client; if (options.protocol === 'http:') client = http; else client = https; + // `streamResponse` opts into a non-buffered response shape: the promise resolves as soon + // as headers arrive, with `response` itself as a Readable. Used by the CLI for SSE + // (text/event-stream) deploys so progress events render live instead of after the deploy + // finishes. Strip the flag from `options` before handing it to http.request so it doesn't + // end up as an unknown header/option. + const streamResponse = options.streamResponse === true; + if (streamResponse) { + options = { ...options }; + delete options.streamResponse; + } return new Promise((resolve, reject) => { const req = client.request(options, (response) => { + if (streamResponse) { + // Hand the raw stream to the caller; do not setEncoding so binary-safe consumers + // (or SSE parsers that prefer Buffers) still work. + resolve(response); + return; + } response.setEncoding('utf8'); response.body = ''; response.on('data', (chunk) => { From 1f696107f9394da8adb7a801ec3c41badd6cfbe6 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Tue, 19 May 2026 22:40:00 -0600 Subject: [PATCH 2/7] fix(deploy): strip req.progress before replicating; guard handler's headers access MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs flagged by review: 1. Blocker — `req.progress` (a ProgressEmitter) leaked into `replicateOperation(req)`, which serialises the request and forwards it to peer nodes. Functions don't survive that serialisation, so peers received `{ progress: { listeners: [] } }` — a truthy plain object whose `emit()` method throws `TypeError`. Every SSE deploy on a cluster would fail on every peer node. Fix: `delete req.progress` before the replicateOperation call. The local alias `const progress = req.progress` keeps the origin's emitter intact for the post-replicate restart phase events. 2. Test break — the SSE branch in `handlePostRequest` read `req.headers.accept` unguarded. Existing unit tests dispatch through that path with synthetic req shapes that don't set headers; production Fastify requests always do. Optional-chained the access. Co-Authored-By: Claude Opus 4.7 (1M context) --- components/operations.js | 5 +++++ server/serverHelpers/serverHandlers.js | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/components/operations.js b/components/operations.js index 8f28b7e86..088f0e8e9 100644 --- a/components/operations.js +++ b/components/operations.js @@ -439,6 +439,11 @@ async function deployComponent(req) { // if doing a rolling restart set restart to false so that other nodes don't also restart. req.restart = rollingRestart ? false : req.restart; progress?.emit('phase', { phase: 'replicate', status: 'start' }); + // ProgressEmitter is local to the origin: its listeners are functions that can't survive + // the replication-channel serialization, and a peer node receiving `req.progress` as a + // plain `{listeners:[]}` object would still take the `if (progress)` branch and then + // throw `TypeError: progress.emit is not a function`. Strip it before fan-out. + delete req.progress; let response = await server.replication.replicateOperation(req); progress?.emit('phase', { phase: 'replicate', status: 'done' }); if (req.restart === true) { diff --git a/server/serverHelpers/serverHandlers.js b/server/serverHelpers/serverHandlers.js index 7134584d8..ebed8d589 100644 --- a/server/serverHelpers/serverHandlers.js +++ b/server/serverHelpers/serverHandlers.js @@ -132,7 +132,9 @@ async function handlePostRequest(req, res, _bypassAuth = false) { // happen. The progress emitter is attached to req.body so the operation handler can // emit without changing its return signature; non-SSE callers leave `progress` // undefined and the handler stays on its synchronous result path. - if (req.headers.accept === 'text/event-stream' && SSE_PROGRESS_OPERATIONS.has(req.body.operation)) { + // Optional-chained `req.headers` because unit tests dispatch through this path with + // synthetic req shapes that don't set headers; production Fastify requests always do. + if (req.headers?.accept === 'text/event-stream' && SSE_PROGRESS_OPERATIONS.has(req.body.operation)) { const emitter = new ProgressEmitter(); req.body.progress = emitter; res.header('Content-Type', 'text/event-stream'); From bd61561da219af0443f9dc7fc6ebb5c3f6a25d24 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Tue, 19 May 2026 22:53:51 -0600 Subject: [PATCH 3/7] feat(deploy): real upload progress bar + live npm install output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two visibility improvements requested mid-review for #531: 1. **Upload progress bar.** The CLI now computes the uncompressed source- tree total (`getPackagedDirectorySize`, mirrors the same skip_node_modules/skip_symlinks predicates as `streamPackagedDirectory`) and wraps the multipart body in a counting Transform that drives a `cli-progress` SingleBar. In a non-TTY environment (CI logs, redirected output) the renderer falls back to periodic `Uploaded X / ~Y (Z%)` lines logged on each 10% step, so logs stay grep-able. Percentage is against uncompressed source bytes, not wire bytes — the actual upload is gzipped and finishes slightly before 100%, an acceptable trade-off versus walking the tree twice. 2. **Live `npm install` stdout/stderr.** `nonInteractiveSpawn` accepts an optional `onLine` callback; `installApplication` plumbs it through all three install code paths (custom command, devEngines packageManager, npm fallback). Lines are buffered until a newline so a chunk that splits mid-line doesn't fire a half-event; trailing partial lines are flushed on close. The deploy progress emitter fires `install { manager, stream, line }` SSE events; the CLI renderer prefixes each line with `|` (stdout) or `!` (stderr) and shows the manager name once. Install-done summarises with a log-line count so the user knows how much they just saw. Was deferred as a follow-up in the original #531 PR description; brought forward at reviewer request. Co-Authored-By: Claude Opus 4.7 (1M context) --- bin/cliOperations.ts | 34 +++- bin/deployRenderer.ts | 187 ++++++++++++++++++ components/Application.ts | 58 ++++-- components/packageComponent.ts | 51 +++++ unitTests/bin/deployRenderer.test.js | 108 ++++++++++ .../components/packageDirectorySize.test.js | 83 ++++++++ 6 files changed, 501 insertions(+), 20 deletions(-) create mode 100644 bin/deployRenderer.ts create mode 100644 unitTests/bin/deployRenderer.test.js create mode 100644 unitTests/components/packageDirectorySize.test.js diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index feb3811ba..79c235a1f 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -9,9 +9,10 @@ import { httpRequest } from '../utility/common_utils.ts'; import * as path from 'path'; import * as fs from 'fs-extra'; import * as YAML from 'yaml'; -import { streamPackagedDirectory } from '../components/packageComponent.ts'; +import { streamPackagedDirectory, getPackagedDirectorySize } from '../components/packageComponent.ts'; import { buildMultipartBody } from './multipartBuilder.ts'; -import { parseSSE, renderDeployProgress } from './sseConsumer.ts'; +import { parseSSE } from './sseConsumer.ts'; +import { DeployRenderer } from './deployRenderer.ts'; import { getHdbPid } from '../utility/processManagement/processManagement.js'; import { initConfig, getConfigPath } from '../config/configUtils.js'; @@ -45,13 +46,20 @@ const PREPARE_OPERATION: any = { const projectPath = process.cwd(); if (!req.project) req.project = path.basename(projectPath); + const pkgOptions = { + skip_node_modules: req.skip_node_modules !== false, + skip_symlinks: req.skip_symlinks === true, + }; + // Compute the uncompressed source-tree total up front so the upload bar has a + // meaningful 100% target. Done before streaming begins; for very large trees this + // adds a one-time directory walk that's still much cheaper than the deploy itself. + // Best-effort: getPackagedDirectorySize swallows per-entry stat errors and returns + // whatever it could measure, so a permission glitch can't block the deploy. + req._uploadTotal = await getPackagedDirectorySize(projectPath, pkgOptions); // Stream the tar+gzip directly to the server as the file part of a multipart body. // This bypasses the Node Buffer 2 GB cap that the previous CBOR-encoded path was // subject to, so large components can deploy without materializing in memory. - req._packageStream = streamPackagedDirectory(projectPath, { - skip_node_modules: req.skip_node_modules !== false, - skip_symlinks: req.skip_symlinks === true, - }); + req._packageStream = streamPackagedDirectory(projectPath, pkgOptions); req._multipart = true; }, }; @@ -205,6 +213,9 @@ async function cliOperations(req: any, skipResponseLog = false) { options.streamResponse = true; } let body; + // One renderer owns the upload bar and the SSE event rendering for a multipart deploy. + // Created here so the upload-stream tap and the SSE consumer below see the same instance. + const renderer = req._multipart ? new DeployRenderer({ uploadTotal: req._uploadTotal }) : null; if (req._multipart) { const packageStream = req._packageStream; const fields = {}; @@ -222,22 +233,27 @@ async function cliOperations(req: any, skipResponseLog = false) { // Use chunked transfer-encoding: we don't know the total size up front because the // payload is streamed from `tar.pack` and never fully buffered. options.headers['Transfer-Encoding'] = 'chunked'; - body = multipart.stream; + // Tap the body so bytes flowing into the HTTP request advance the upload bar. + // The renderer's Transform is identity — chunks pass through unmodified. + body = renderer ? renderer.tapUploadStream(multipart.stream) : multipart.stream; } else { body = req; } let response: any = await httpRequest(options, body); + // Upload is done by the time we get the response; tear the bar down before any SSE + // rendering so the bar and event lines don't fight for the same terminal row. + renderer?.endUpload(); + let responseData; if (useSse && response.headers['content-type']?.startsWith('text/event-stream')) { // Consume SSE: render phase events live, capture the final result from the `done` // event (or the error message from the `error` event). The HTTP status stays 200 // until end-of-stream; failures are signaled in-band. - const state = {}; let finalResult; let sseError; for await (const message of parseSSE(response)) { - renderDeployProgress(message, state, process.stderr); + renderer?.renderEvent(message); if (message.event === 'done') { try { finalResult = JSON.parse(message.data)?.result; diff --git a/bin/deployRenderer.ts b/bin/deployRenderer.ts new file mode 100644 index 000000000..cdaf5ca59 --- /dev/null +++ b/bin/deployRenderer.ts @@ -0,0 +1,187 @@ +import { Transform } from 'node:stream'; +// cli-progress is already a runtime dep of harper (see package.json); using its +// SingleBar to render the upload phase here doesn't add a new dependency. +import cliProgress from 'cli-progress'; +import type { SSEMessage } from './sseConsumer.ts'; + +interface RendererOptions { + uploadTotal?: number; + output?: NodeJS.WritableStream; +} + +interface UploadState { + bar: cliProgress.SingleBar | null; + sent: number; + textLastLogged: number; + finished: boolean; +} + +interface PhaseState { + current?: string; + installManager?: string; + installLineCount: number; +} + +/** + * Deploy-time renderer that owns the progress display across two phases: + * + * 1. Local upload — driven by `tapUploadStream`, which wraps the multipart body so we + * can update a `cli-progress` bar against the precomputed uncompressed source-tree + * total. In a non-TTY environment (CI logs, redirected output) we fall back to + * periodic text lines so logs stay grep-able. + * + * 2. Server-side phases — driven by `renderEvent`, called for each SSE message the + * CLI receives from the operations API. Phase events print one-liners; live + * `install` events (npm/pnpm/yarn stdout) are throttled to one line under a + * "[install]" header so a noisy `npm install` doesn't drown the terminal. + * + * Designed so the two phases hand off cleanly: `endUpload()` tears the bar down (so + * it doesn't compete with subsequent prints) before any SSE events render. + */ +export class DeployRenderer { + private upload: UploadState = { bar: null, sent: 0, textLastLogged: 0, finished: false }; + private phase: PhaseState = { installLineCount: 0 }; + private output: NodeJS.WritableStream; + private isTty: boolean; + private uploadTotal: number; + + constructor(options: RendererOptions = {}) { + this.output = options.output ?? process.stderr; + // Only render a bar when stderr is a real terminal. CI runners, log redirection, + // and pipes look identical from Node's perspective: !isTTY. + this.isTty = Boolean((this.output as NodeJS.WriteStream).isTTY); + this.uploadTotal = options.uploadTotal ?? 0; + } + + /** + * Wrap an outbound stream so each byte flowing through it advances the upload bar. + * The Transform is identity — chunks pass through unmodified. + */ + tapUploadStream(stream: T): NodeJS.ReadableStream { + this.upload.bar = this.isTty + ? new cliProgress.SingleBar( + { + format: 'Uploading [{bar}] {percentage}% | {value}/{total} bytes', + barCompleteChar: '█', + barIncompleteChar: '░', + hideCursor: true, + stream: this.output, + etaBuffer: 50, + }, + cliProgress.Presets.shades_classic + ) + : null; + this.upload.bar?.start(this.uploadTotal || 1, 0); + + const counter = new Transform({ + transform: (chunk, _enc, cb) => { + this.upload.sent += chunk.length; + this.tickUpload(); + cb(null, chunk); + }, + flush: (cb) => { + this.endUpload(); + cb(); + }, + }); + stream.on('error', (err) => counter.destroy(err)); + stream.pipe(counter); + return counter; + } + + endUpload(): void { + if (this.upload.finished) return; + this.upload.finished = true; + if (this.upload.bar) { + // Snap to total so the bar shows 100% even when our uncompressed-total estimate + // is slightly off (gzip output is usually smaller than the source tree). + if (this.uploadTotal > 0) this.upload.bar.update(this.uploadTotal); + this.upload.bar.stop(); + this.upload.bar = null; + } else { + this.output.write(`Upload complete (${formatBytes(this.upload.sent)})\n`); + } + } + + private tickUpload(): void { + if (this.upload.bar) { + this.upload.bar.update(this.upload.sent); + return; + } + // Non-TTY: log a line every 10% of the total (or every 5MB if total unknown). + const step = this.uploadTotal > 0 ? this.uploadTotal / 10 : 5 * 1024 * 1024; + if (this.upload.sent - this.upload.textLastLogged >= step) { + this.upload.textLastLogged = this.upload.sent; + const pct = this.uploadTotal > 0 ? Math.min(100, Math.floor((this.upload.sent / this.uploadTotal) * 100)) : null; + this.output.write( + pct !== null + ? `Uploaded ${formatBytes(this.upload.sent)} / ~${formatBytes(this.uploadTotal)} (${pct}%)\n` + : `Uploaded ${formatBytes(this.upload.sent)}\n` + ); + } + } + + renderEvent(message: SSEMessage): void { + let parsed: unknown; + try { + parsed = JSON.parse(message.data); + } catch { + parsed = message.data; + } + switch (message.event) { + case 'phase': + this.renderPhase(parsed as { phase?: string; status?: string; message?: string }); + break; + case 'install': + this.renderInstall(parsed as { manager?: string; stream?: string; line?: string }); + break; + case 'error': { + const e = parsed as { message?: string; code?: string | number }; + this.output.write(`error: ${e.message ?? message.data}${e.code ? ` (${e.code})` : ''}\n`); + break; + } + case 'done': + // Caller picks up final result via the SSE iterator; nothing to render here. + break; + } + } + + private renderPhase(data: { phase?: string; status?: string; message?: string }): void { + const label = data.phase ?? '?'; + if (data.status === 'start') { + if (this.phase.current !== label) { + this.output.write(`${label}…\n`); + this.phase.current = label; + this.phase.installLineCount = 0; + } + } else if (data.status === 'done') { + if (label === 'install' && this.phase.installLineCount > 0) { + this.output.write(`install done (${this.phase.installLineCount} log lines)\n`); + } else { + this.output.write(`${label} done\n`); + } + } else if (data.status === 'error') { + this.output.write(`${label} ERROR: ${data.message ?? 'failed'}\n`); + } + } + + private renderInstall(data: { manager?: string; stream?: string; line?: string }): void { + const line = (data.line ?? '').trimEnd(); + if (!line) return; + if (data.manager && data.manager !== this.phase.installManager) { + this.phase.installManager = data.manager; + this.output.write(`install: using ${data.manager}\n`); + } + this.phase.installLineCount++; + // Prefix with stream so users can distinguish stderr noise from stdout warnings. + const tag = data.stream === 'stderr' ? '!' : '|'; + this.output.write(` ${tag} ${line}\n`); + } +} + +function formatBytes(bytes: number): string { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KiB`; + if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MiB`; + return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GiB`; +} diff --git a/components/Application.ts b/components/Application.ts index 7087c9376..9ac2f7c0e 100644 --- a/components/Application.ts +++ b/components/Application.ts @@ -282,6 +282,16 @@ export async function installApplication(application: Application) { // If node_modules doesn't exist, we need to install dependencies } + // Build a per-spawn `onLine` callback that forwards each complete stdout/stderr line as + // an `install` SSE event to the deploy progress emitter. The `manager` field is filled + // in lazily by each spawn site so the CLI can show "install: using npm/pnpm/yarn/". + // Skipped entirely when there's no emitter (non-SSE callers). + const progress = application.progress; + const installEmitter = (manager: string) => + progress + ? (stream: 'stdout' | 'stderr', line: string) => progress.emit('install', { manager, stream, line }) + : undefined; + // If custom install command is specified, run it if (application.install?.command) { const [command, ...args] = application.install.command.split(' '); @@ -290,7 +300,8 @@ export async function installApplication(application: Application) { command, args, application.dirPath, - application.install?.timeout + application.install?.timeout, + installEmitter(command) ); // if it succeeds, return if (code === 0) { @@ -346,7 +357,8 @@ export async function installApplication(application: Application) { (application.packageManagerPrefix ? application.packageManagerPrefix + ' ' : '') + packageManager.name, application.install?.allowInstallScripts ? ['install'] : ['install', '--ignore-scripts'], // All of `npm`, `yarn`, and `pnpm` support the `install` command. If we need to configure options here we may have to use some other defaults though application.dirPath, - application.install?.timeout + application.install?.timeout, + installEmitter(packageManager.name) ); // if it succeeds, return @@ -398,7 +410,8 @@ export async function installApplication(application: Application) { (application.packageManagerPrefix ? application.packageManagerPrefix + ' ' : '') + 'npm', npmInstallArgs, application.dirPath, - application.install?.timeout + application.install?.timeout, + installEmitter('npm') ); // if it succeeds, return @@ -607,7 +620,8 @@ export function nonInteractiveSpawn( command: string, args: string[], cwd: string, - timeoutMs: number = 60 * 60 * 1000 + timeoutMs: number = 60 * 60 * 1000, + onLine?: (stream: 'stdout' | 'stderr', line: string) => void ): Promise<{ stdout: string; stderr: string; code: number }> { return new Promise((resolve, reject) => { logger @@ -637,20 +651,36 @@ export function nonInteractiveSpawn( reject(new Error(`Command\`${command} ${args.join(' ')}\` timed out after ${timeoutMs}ms`)); }, timeoutMs); + // `onLine` forwards each complete line of stdout/stderr to the caller as it arrives, + // used by the deploy progress emitter to stream live install output back to the CLI. + // We buffer per stream until a newline so a chunk that splits mid-line doesn't fire + // a half-line; trailing partial lines are flushed on close. + let stdoutLineBuf = ''; + let stderrLineBuf = ''; + function flushLines(buf: string, stream: 'stdout' | 'stderr'): string { + if (!onLine) return ''; + let idx; + while ((idx = buf.indexOf('\n')) !== -1) { + onLine(stream, buf.slice(0, idx)); + buf = buf.slice(idx + 1); + } + return buf; + } + let stdout = ''; childProcess.stdout.on('data', (chunk) => { - // buffer stdout for later resolve - stdout += chunk.toString(); - // log stdout lines immediately - // TODO: Technically nothing guarantees that a chunk will be a complete line so need to implement - // something here to buffer until a newline character, then log the complete line - logger.loggerWithTag(`${applicationName}:spawn:${command}:stdout`).debug?.(chunk.toString()); + const text = chunk.toString(); + stdout += text; + logger.loggerWithTag(`${applicationName}:spawn:${command}:stdout`).debug?.(text); + if (onLine) stdoutLineBuf = flushLines(stdoutLineBuf + text, 'stdout'); }); // buffer stderr let stderr = ''; childProcess.stderr.on('data', (chunk) => { - stderr += chunk.toString(); + const text = chunk.toString(); + stderr += text; + if (onLine) stderrLineBuf = flushLines(stderrLineBuf + text, 'stderr'); }); childProcess.on('error', (error) => { @@ -667,6 +697,12 @@ export function nonInteractiveSpawn( if (stderr) { printStd(applicationName, command, stderr, 'stderr'); } + // Flush any partial line buffered after the last newline (npm sometimes ends without + // a trailing \n, especially on error paths). + if (onLine) { + if (stdoutLineBuf) onLine('stdout', stdoutLineBuf); + if (stderrLineBuf) onLine('stderr', stderrLineBuf); + } logger.loggerWithTag(`${applicationName}:spawn:${command}`).debug?.(`Process exited with code ${code}`); resolve({ stdout, diff --git a/components/packageComponent.ts b/components/packageComponent.ts index 6075e8432..f1cf3a648 100644 --- a/components/packageComponent.ts +++ b/components/packageComponent.ts @@ -1,5 +1,6 @@ import { join } from 'path'; import { Readable } from 'node:stream'; +import { readdir, stat } from 'node:fs/promises'; import tar from 'tar-fs'; import { createGzip } from 'node:zlib'; @@ -36,6 +37,56 @@ export function streamPackagedDirectory(directory: string, options: PackageOptio return packStream.pipe(gzip); } +/** + * Compute the total uncompressed size in bytes of files that `streamPackagedDirectory` + * would include for the same options. Used by the CLI to drive an upload progress bar: + * we count bytes flowing through the tar source (pre-gzip), so a percentage against this + * total represents "how much of the source tree has been read", which is what users want + * to see during a long deploy. The on-the-wire (gzipped) size will be smaller, so the + * bar may finish slightly before the request actually completes — acceptable trade-off + * versus walking twice or buffering. + * + * Symlinks are stat'd (not lstat'd) when `skip_symlinks` is false, matching tar-fs's + * `dereference: !skip_symlinks` behavior. Errors on individual entries are swallowed so + * a transient stat failure doesn't block the deploy — total is best-effort. + */ +export async function getPackagedDirectorySize( + directory: string, + options: PackageOptions = DEFAULT_OPTIONS +): Promise { + const skipNodeModules = options.skip_node_modules === true; + const dereference = options.skip_symlinks !== true; + const statFn = dereference ? stat : (await import('node:fs/promises')).lstat; + let total = 0; + async function walk(current: string, rel: string): Promise { + let entries; + try { + entries = await readdir(current, { withFileTypes: true }); + } catch { + return; + } + for (const entry of entries) { + const relPath = rel ? join(rel, entry.name) : entry.name; + if (skipNodeModules && (relPath.includes('node_modules') || relPath.includes(join('cache', 'webpack')))) { + continue; + } + const full = join(current, entry.name); + if (entry.isDirectory()) { + await walk(full, relPath); + } else if (entry.isFile() || (dereference && entry.isSymbolicLink())) { + try { + const s = await statFn(full); + if (s.isFile()) total += s.size; + } catch { + /* skip unreadable entries — don't block the deploy on a best-effort total */ + } + } + } + } + await walk(directory, ''); + return total; +} + /** * Package a directory into a tar+gzip buffer. Retained for callers that need * an in-memory payload (small deploys, tests). For large directories prefer diff --git a/unitTests/bin/deployRenderer.test.js b/unitTests/bin/deployRenderer.test.js new file mode 100644 index 000000000..9ab450b95 --- /dev/null +++ b/unitTests/bin/deployRenderer.test.js @@ -0,0 +1,108 @@ +'use strict'; + +const assert = require('node:assert'); +const { PassThrough } = require('node:stream'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { DeployRenderer } = require('#src/bin/deployRenderer'); + +function makeOutput() { + const lines = []; + const stream = { write: (s) => lines.push(s), isTTY: false }; + return { stream, lines }; +} + +describe('DeployRenderer', () => { + describe('upload progress (non-TTY)', () => { + it('advances bytes as data flows through the tap and logs at 10% steps', async () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ uploadTotal: 1_000_000, output: stream }); + const source = new PassThrough(); + const tap = renderer.tapUploadStream(source); + // Drain into a sink so backpressure doesn't pause the tap. + const sink = new PassThrough(); + tap.pipe(sink); + sink.on('data', () => {}); + // Write 5×200_000 byte chunks → 1MB total, crossing each 10% threshold. + for (let i = 0; i < 5; i++) source.write(Buffer.alloc(200_000)); + source.end(); + await new Promise((resolve) => sink.on('end', resolve)); + renderer.endUpload(); + // Expect at least one per-10% progress line plus the final "Upload complete" line. + const progressLines = lines.filter((l) => /Uploaded /.test(l)); + assert.ok( + progressLines.length >= 4, + `expected multiple progress lines, got ${progressLines.length}: ${progressLines.join('|')}` + ); + assert.ok( + lines.some((l) => l.startsWith('Upload complete')), + 'should log Upload complete on endUpload' + ); + }); + + it('endUpload is idempotent', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ uploadTotal: 100, output: stream }); + renderer.tapUploadStream(new PassThrough()); + renderer.endUpload(); + renderer.endUpload(); + const completeLines = lines.filter((l) => l.startsWith('Upload complete')); + assert.strictEqual(completeLines.length, 1); + }); + }); + + describe('renderEvent', () => { + function sseMessage(event, data) { + return { event, data: JSON.stringify(data) }; + } + + it('prints a phase line on start, then on done (no duplicate start)', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('phase', { phase: 'extract', status: 'start' })); + renderer.renderEvent(sseMessage('phase', { phase: 'extract', status: 'start' })); + renderer.renderEvent(sseMessage('phase', { phase: 'extract', status: 'done' })); + assert.deepStrictEqual(lines, ['extract…\n', 'extract done\n']); + }); + + it('forwards install stdout/stderr lines and headers the manager name once', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('phase', { phase: 'install', status: 'start' })); + renderer.renderEvent(sseMessage('install', { manager: 'npm', stream: 'stdout', line: 'added 42 packages' })); + renderer.renderEvent(sseMessage('install', { manager: 'npm', stream: 'stdout', line: 'audited 100 packages' })); + renderer.renderEvent( + sseMessage('install', { manager: 'npm', stream: 'stderr', line: 'npm warn deprecated foo' }) + ); + renderer.renderEvent(sseMessage('phase', { phase: 'install', status: 'done' })); + assert.ok(lines.includes('install: using npm\n'), 'expected manager header'); + assert.strictEqual( + lines.filter((l) => l === 'install: using npm\n').length, + 1, + 'manager header logged only once' + ); + assert.ok(lines.includes(' | added 42 packages\n')); + assert.ok(lines.includes(' | audited 100 packages\n')); + assert.ok(lines.includes(' ! npm warn deprecated foo\n'), 'stderr should be tagged with !'); + assert.ok( + lines.some((l) => /install done \(3 log lines\)/.test(l)), + 'install-done should count emitted lines' + ); + }); + + it('renders an error event with code suffix', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('error', { message: 'npm install failed', code: 500 })); + assert.match(lines[0], /error: npm install failed \(500\)/); + }); + + it('renders a phase-error with the message', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('phase', { phase: 'install', status: 'error', message: 'exit code 1' })); + assert.match(lines[0], /install ERROR: exit code 1/); + }); + }); +}); diff --git a/unitTests/components/packageDirectorySize.test.js b/unitTests/components/packageDirectorySize.test.js new file mode 100644 index 000000000..b51e22aac --- /dev/null +++ b/unitTests/components/packageDirectorySize.test.js @@ -0,0 +1,83 @@ +'use strict'; + +const assert = require('node:assert'); +const path = require('node:path'); +const fs = require('node:fs/promises'); +const os = require('node:os'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { getPackagedDirectorySize } = require('#src/components/packageComponent'); + +async function makeFixture(files) { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'pkg-size-')); + for (const [rel, content] of Object.entries(files)) { + const full = path.join(dir, rel); + await fs.mkdir(path.dirname(full), { recursive: true }); + await fs.writeFile(full, content); + } + return dir; +} + +describe('getPackagedDirectorySize', () => { + it('returns the sum of file sizes for a simple tree', async () => { + const dir = await makeFixture({ + 'a.txt': 'A'.repeat(100), + 'b/c.txt': 'C'.repeat(200), + 'b/d/e.txt': 'E'.repeat(50), + }); + try { + assert.strictEqual(await getPackagedDirectorySize(dir), 350); + } finally { + await fs.rm(dir, { recursive: true, force: true }); + } + }); + + it('excludes node_modules when skip_node_modules is true', async () => { + const dir = await makeFixture({ + 'src/app.js': 'a'.repeat(123), + 'node_modules/lib/index.js': 'x'.repeat(1_000_000), + }); + try { + const withModules = await getPackagedDirectorySize(dir); + const withoutModules = await getPackagedDirectorySize(dir, { skip_node_modules: true }); + assert.strictEqual(withModules, 123 + 1_000_000); + assert.strictEqual(withoutModules, 123); + } finally { + await fs.rm(dir, { recursive: true, force: true }); + } + }); + + it('also excludes webpack cache when skip_node_modules is true', async () => { + const dir = await makeFixture({ + 'src/a.js': 'a'.repeat(50), + '.cache/webpack/big.bin': 'x'.repeat(500_000), + }); + try { + assert.strictEqual(await getPackagedDirectorySize(dir, { skip_node_modules: true }), 50); + } finally { + await fs.rm(dir, { recursive: true, force: true }); + } + }); + + it('returns 0 for an empty directory', async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'pkg-size-empty-')); + try { + assert.strictEqual(await getPackagedDirectorySize(dir), 0); + } finally { + await fs.rm(dir, { recursive: true, force: true }); + } + }); + + it('best-effort: continues past unreadable entries instead of throwing', async () => { + const dir = await makeFixture({ 'a.txt': 'A'.repeat(10) }); + try { + // Read from a deliberately-nonexistent sub-path — getPackagedDirectorySize swallows + // per-entry errors so a transient stat failure can't block a deploy. + assert.strictEqual(await getPackagedDirectorySize(path.join(dir, 'missing-subdir')), 0); + assert.strictEqual(await getPackagedDirectorySize(dir), 10); + } finally { + await fs.rm(dir, { recursive: true, force: true }); + } + }); +}); From a320af51435ade4fe90b51fa9f6ad7748c601e8b Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Tue, 19 May 2026 23:02:04 -0600 Subject: [PATCH 4/7] fix: drain SSE response stream when server returns non-SSE on SSE path When useSse=true, httpRequest uses streamResponse mode and returns a raw IncomingMessage. The previous fallback branch read response.body which is undefined in that mode (e.g. on a 401 auth failure). Drain the stream chunks manually so error bodies are captured correctly. Co-Authored-By: Claude Opus 4.7 (1M context) --- bin/cliOperations.ts | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index 79c235a1f..dc277b5a3 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -274,12 +274,22 @@ async function cliOperations(req: any, skipResponseLog = false) { } responseData = finalResult ?? { message: 'Deploy completed (no result payload).' }; } else { + // When useSse is true, httpRequest returns a raw IncomingMessage (streamResponse mode), + // so .body is undefined. Drain the stream to get the text (e.g. a 401 error body). + let bodyText: string; + if (useSse) { + const chunks: Buffer[] = []; + for await (const chunk of response as AsyncIterable) chunks.push(Buffer.from(chunk)); + bodyText = Buffer.concat(chunks).toString('utf8'); + } else { + bodyText = response.body; + } try { - responseData = JSON.parse(response.body); + responseData = JSON.parse(bodyText); } catch { responseData = { status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'), - body: response.body, + body: bodyText, }; } } From 31c409831dd86e1bb0090dfadee01cf9dd495d08 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 20 May 2026 03:15:13 -0600 Subject: [PATCH 5/7] fix(sse): StringDecoder for split UTF-8, cleanup on client disconnect, error stringify - parseSSE: use StringDecoder so multi-byte chars split across chunk boundaries decode correctly instead of corrupting (e.g. box-drawing chars, emojis) - createSSEResponseStream: gate writes behind an active flag; listen to stream close/end to unsubscribe the emitter immediately on client disconnect so the subscription doesn't linger for the full operation lifetime - cliOperations SSE error: JSON.stringify fallback when sseError has no .message so the error renders as JSON rather than [object Object] Co-Authored-By: Claude Opus 4.7 (1M context) --- bin/cliOperations.ts | 4 ++- bin/sseConsumer.ts | 5 +++- server/serverHelpers/progressEmitter.ts | 35 ++++++++++++++++++------- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index dc277b5a3..21a04132c 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -269,7 +269,9 @@ async function cliOperations(req: any, skipResponseLog = false) { } } if (sseError) { - console.error(`error: ${sseError.message ?? sseError}`); + const errMsg = + sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError); + console.error(`error: ${errMsg}`); process.exit(1); } responseData = finalResult ?? { message: 'Deploy completed (no result payload).' }; diff --git a/bin/sseConsumer.ts b/bin/sseConsumer.ts index f279013dd..b89485f15 100644 --- a/bin/sseConsumer.ts +++ b/bin/sseConsumer.ts @@ -1,3 +1,4 @@ +import { StringDecoder } from 'node:string_decoder'; import type { Readable } from 'node:stream'; export interface SSEMessage { @@ -15,9 +16,10 @@ export interface SSEMessage { * Readable does not guarantee chunks align with SSE record boundaries. */ export async function* parseSSE(stream: Readable): AsyncGenerator { + const decoder = new StringDecoder('utf8'); let buffer = ''; for await (const chunk of stream) { - buffer += chunk.toString('utf8'); + buffer += typeof chunk === 'string' ? chunk : decoder.write(chunk); while (true) { const recordEnd = buffer.indexOf('\n\n'); const crlfEnd = buffer.indexOf('\r\n\r\n'); @@ -37,6 +39,7 @@ export async function* parseSSE(stream: Readable): AsyncGenerator { if (msg) yield msg; } } + buffer += decoder.end(); // Any trailing record without a terminating blank line is treated as a final message, // matching the looser behavior browsers exhibit on connection close. if (buffer.trim()) { diff --git a/server/serverHelpers/progressEmitter.ts b/server/serverHelpers/progressEmitter.ts index cf390f995..af2cc8bd9 100644 --- a/server/serverHelpers/progressEmitter.ts +++ b/server/serverHelpers/progressEmitter.ts @@ -48,25 +48,40 @@ export class ProgressEmitter { export function createSSEResponseStream(emitter: ProgressEmitter, operation: () => Promise): Readable { const stream = new PassThrough(); + let active = true; const unsubscribe = emitter.subscribe((event) => { - writeSSE(stream, event); + if (active) writeSSE(stream, event); }); + const cleanup = () => { + if (active) { + active = false; + unsubscribe(); + } + }; + + // If the client disconnects (Ctrl-C, network drop) stop writing to the stream and + // release the emitter subscription so it doesn't accumulate for the operation lifetime. + stream.on('close', cleanup); + stream.on('end', cleanup); + operation() .then((result) => { - writeSSE(stream, { event: 'done', data: { result } }); + if (active) writeSSE(stream, { event: 'done', data: { result } }); }) .catch((err) => { - writeSSE(stream, { - event: 'error', - data: { - message: err?.message ?? String(err), - code: err?.statusCode ?? err?.code, - }, - }); + if (active) { + writeSSE(stream, { + event: 'error', + data: { + message: err?.message ?? String(err), + code: err?.statusCode ?? err?.code, + }, + }); + } }) .finally(() => { - unsubscribe(); + cleanup(); stream.end(); }); From d9740e86d2378eeb485fbf1b76105ff15a6273b9 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 20 May 2026 03:17:40 -0600 Subject: [PATCH 6/7] style: prettier format cliOperations errMsg line Co-Authored-By: Claude Opus 4.7 (1M context) --- bin/cliOperations.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index 21a04132c..0aee1422f 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -269,8 +269,7 @@ async function cliOperations(req: any, skipResponseLog = false) { } } if (sseError) { - const errMsg = - sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError); + const errMsg = sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError); console.error(`error: ${errMsg}`); process.exit(1); } From b70bd4198355bc4d41487c98d8f0b2061becd3c8 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 20 May 2026 04:32:47 -0600 Subject: [PATCH 7/7] fix(sse): split multi-line data into per-line data: fields per spec Per the SSE spec, if a data value contains newlines each line must be emitted as a separate data: field. A single data: field with embedded newlines is not valid. Co-Authored-By: Claude Opus 4.7 (1M context) --- server/serverHelpers/progressEmitter.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/serverHelpers/progressEmitter.ts b/server/serverHelpers/progressEmitter.ts index af2cc8bd9..f383957ca 100644 --- a/server/serverHelpers/progressEmitter.ts +++ b/server/serverHelpers/progressEmitter.ts @@ -90,5 +90,9 @@ export function createSSEResponseStream(emitter: ProgressEmitter, operation: () function writeSSE(stream: PassThrough, event: ProgressEvent): void { const data = typeof event.data === 'string' ? event.data : JSON.stringify(event.data); - stream.write(`event: ${event.event}\ndata: ${data}\n\n`); + stream.write(`event: ${event.event}\n`); + for (const line of data.split(/\r?\n/)) { + stream.write(`data: ${line}\n`); + } + stream.write('\n'); }