diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index 882a071b5..0aee1422f 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -9,13 +9,22 @@ import { httpRequest } from '../utility/common_utils.ts'; import * as path from 'path'; import * as fs from 'fs-extra'; import * as YAML from 'yaml'; -import { streamPackagedDirectory } from '../components/packageComponent.ts'; +import { streamPackagedDirectory, getPackagedDirectorySize } from '../components/packageComponent.ts'; import { buildMultipartBody } from './multipartBuilder.ts'; +import { parseSSE } from './sseConsumer.ts'; +import { DeployRenderer } from './deployRenderer.ts'; import { getHdbPid } from '../utility/processManagement/processManagement.js'; import { initConfig, getConfigPath } from '../config/configUtils.js'; const OP_ALIASES = { deploy: 'deploy_component', package: 'package_component' }; +// Operations whose responses should be consumed as text/event-stream so live phase events +// (extract, install, load, replicate, restart) render as they happen instead of after the +// whole deploy completes. Add an operation here only after wiring its server-side +// SSE_PROGRESS_OPERATIONS entry — otherwise the server returns the buffered JSON path and +// the SSE parser sees no events. +const SSE_OPERATIONS = new Set(['deploy_component']); + // Properties on `req` that the CLI itself uses for transport/UX, not the operations API. // They never get serialized into the request body. const TRANSPORT_ONLY_FIELDS = new Set([ @@ -37,13 +46,20 @@ const PREPARE_OPERATION: any = { const projectPath = process.cwd(); if (!req.project) req.project = path.basename(projectPath); + const pkgOptions = { + skip_node_modules: req.skip_node_modules !== false, + skip_symlinks: req.skip_symlinks === true, + }; + // Compute the uncompressed source-tree total up front so the upload bar has a + // meaningful 100% target. Done before streaming begins; for very large trees this + // adds a one-time directory walk that's still much cheaper than the deploy itself. + // Best-effort: getPackagedDirectorySize swallows per-entry stat errors and returns + // whatever it could measure, so a permission glitch can't block the deploy. + req._uploadTotal = await getPackagedDirectorySize(projectPath, pkgOptions); // Stream the tar+gzip directly to the server as the file part of a multipart body. // This bypasses the Node Buffer 2 GB cap that the previous CBOR-encoded path was // subject to, so large components can deploy without materializing in memory. - req._packageStream = streamPackagedDirectory(projectPath, { - skip_node_modules: req.skip_node_modules !== false, - skip_symlinks: req.skip_symlinks === true, - }); + req._packageStream = streamPackagedDirectory(projectPath, pkgOptions); req._multipart = true; }, }; @@ -191,7 +207,15 @@ async function cliOperations(req: any, skipResponseLog = false) { options.headers.Authorization = `Bearer ${tokens.operation_token}`; } } + const useSse = SSE_OPERATIONS.has(req.operation); + if (useSse) { + options.headers.Accept = 'text/event-stream'; + options.streamResponse = true; + } let body; + // One renderer owns the upload bar and the SSE event rendering for a multipart deploy. + // Created here so the upload-stream tap and the SSE consumer below see the same instance. + const renderer = req._multipart ? new DeployRenderer({ uploadTotal: req._uploadTotal }) : null; if (req._multipart) { const packageStream = req._packageStream; const fields = {}; @@ -209,20 +233,66 @@ async function cliOperations(req: any, skipResponseLog = false) { // Use chunked transfer-encoding: we don't know the total size up front because the // payload is streamed from `tar.pack` and never fully buffered. options.headers['Transfer-Encoding'] = 'chunked'; - body = multipart.stream; + // Tap the body so bytes flowing into the HTTP request advance the upload bar. + // The renderer's Transform is identity — chunks pass through unmodified. + body = renderer ? renderer.tapUploadStream(multipart.stream) : multipart.stream; } else { body = req; } let response: any = await httpRequest(options, body); + // Upload is done by the time we get the response; tear the bar down before any SSE + // rendering so the bar and event lines don't fight for the same terminal row. + renderer?.endUpload(); + let responseData; - try { - responseData = JSON.parse(response.body); - } catch { - responseData = { - status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'), - body: response.body, - }; + if (useSse && response.headers['content-type']?.startsWith('text/event-stream')) { + // Consume SSE: render phase events live, capture the final result from the `done` + // event (or the error message from the `error` event). The HTTP status stays 200 + // until end-of-stream; failures are signaled in-band. + let finalResult; + let sseError; + for await (const message of parseSSE(response)) { + renderer?.renderEvent(message); + if (message.event === 'done') { + try { + finalResult = JSON.parse(message.data)?.result; + } catch { + finalResult = message.data; + } + } else if (message.event === 'error') { + try { + sseError = JSON.parse(message.data); + } catch { + sseError = { message: message.data }; + } + } + } + if (sseError) { + const errMsg = sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError); + console.error(`error: ${errMsg}`); + process.exit(1); + } + responseData = finalResult ?? { message: 'Deploy completed (no result payload).' }; + } else { + // When useSse is true, httpRequest returns a raw IncomingMessage (streamResponse mode), + // so .body is undefined. Drain the stream to get the text (e.g. a 401 error body). + let bodyText: string; + if (useSse) { + const chunks: Buffer[] = []; + for await (const chunk of response as AsyncIterable) chunks.push(Buffer.from(chunk)); + bodyText = Buffer.concat(chunks).toString('utf8'); + } else { + bodyText = response.body; + } + try { + responseData = JSON.parse(bodyText); + } catch { + responseData = { + status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'), + body: bodyText, + }; + } } let responseLog; diff --git a/bin/deployRenderer.ts b/bin/deployRenderer.ts new file mode 100644 index 000000000..cdaf5ca59 --- /dev/null +++ b/bin/deployRenderer.ts @@ -0,0 +1,187 @@ +import { Transform } from 'node:stream'; +// cli-progress is already a runtime dep of harper (see package.json); using its +// SingleBar to render the upload phase here doesn't add a new dependency. +import cliProgress from 'cli-progress'; +import type { SSEMessage } from './sseConsumer.ts'; + +interface RendererOptions { + uploadTotal?: number; + output?: NodeJS.WritableStream; +} + +interface UploadState { + bar: cliProgress.SingleBar | null; + sent: number; + textLastLogged: number; + finished: boolean; +} + +interface PhaseState { + current?: string; + installManager?: string; + installLineCount: number; +} + +/** + * Deploy-time renderer that owns the progress display across two phases: + * + * 1. Local upload — driven by `tapUploadStream`, which wraps the multipart body so we + * can update a `cli-progress` bar against the precomputed uncompressed source-tree + * total. In a non-TTY environment (CI logs, redirected output) we fall back to + * periodic text lines so logs stay grep-able. + * + * 2. Server-side phases — driven by `renderEvent`, called for each SSE message the + * CLI receives from the operations API. Phase events print one-liners; live + * `install` events (npm/pnpm/yarn stdout) are throttled to one line under a + * "[install]" header so a noisy `npm install` doesn't drown the terminal. + * + * Designed so the two phases hand off cleanly: `endUpload()` tears the bar down (so + * it doesn't compete with subsequent prints) before any SSE events render. + */ +export class DeployRenderer { + private upload: UploadState = { bar: null, sent: 0, textLastLogged: 0, finished: false }; + private phase: PhaseState = { installLineCount: 0 }; + private output: NodeJS.WritableStream; + private isTty: boolean; + private uploadTotal: number; + + constructor(options: RendererOptions = {}) { + this.output = options.output ?? process.stderr; + // Only render a bar when stderr is a real terminal. CI runners, log redirection, + // and pipes look identical from Node's perspective: !isTTY. + this.isTty = Boolean((this.output as NodeJS.WriteStream).isTTY); + this.uploadTotal = options.uploadTotal ?? 0; + } + + /** + * Wrap an outbound stream so each byte flowing through it advances the upload bar. + * The Transform is identity — chunks pass through unmodified. + */ + tapUploadStream(stream: T): NodeJS.ReadableStream { + this.upload.bar = this.isTty + ? new cliProgress.SingleBar( + { + format: 'Uploading [{bar}] {percentage}% | {value}/{total} bytes', + barCompleteChar: '█', + barIncompleteChar: '░', + hideCursor: true, + stream: this.output, + etaBuffer: 50, + }, + cliProgress.Presets.shades_classic + ) + : null; + this.upload.bar?.start(this.uploadTotal || 1, 0); + + const counter = new Transform({ + transform: (chunk, _enc, cb) => { + this.upload.sent += chunk.length; + this.tickUpload(); + cb(null, chunk); + }, + flush: (cb) => { + this.endUpload(); + cb(); + }, + }); + stream.on('error', (err) => counter.destroy(err)); + stream.pipe(counter); + return counter; + } + + endUpload(): void { + if (this.upload.finished) return; + this.upload.finished = true; + if (this.upload.bar) { + // Snap to total so the bar shows 100% even when our uncompressed-total estimate + // is slightly off (gzip output is usually smaller than the source tree). + if (this.uploadTotal > 0) this.upload.bar.update(this.uploadTotal); + this.upload.bar.stop(); + this.upload.bar = null; + } else { + this.output.write(`Upload complete (${formatBytes(this.upload.sent)})\n`); + } + } + + private tickUpload(): void { + if (this.upload.bar) { + this.upload.bar.update(this.upload.sent); + return; + } + // Non-TTY: log a line every 10% of the total (or every 5MB if total unknown). + const step = this.uploadTotal > 0 ? this.uploadTotal / 10 : 5 * 1024 * 1024; + if (this.upload.sent - this.upload.textLastLogged >= step) { + this.upload.textLastLogged = this.upload.sent; + const pct = this.uploadTotal > 0 ? Math.min(100, Math.floor((this.upload.sent / this.uploadTotal) * 100)) : null; + this.output.write( + pct !== null + ? `Uploaded ${formatBytes(this.upload.sent)} / ~${formatBytes(this.uploadTotal)} (${pct}%)\n` + : `Uploaded ${formatBytes(this.upload.sent)}\n` + ); + } + } + + renderEvent(message: SSEMessage): void { + let parsed: unknown; + try { + parsed = JSON.parse(message.data); + } catch { + parsed = message.data; + } + switch (message.event) { + case 'phase': + this.renderPhase(parsed as { phase?: string; status?: string; message?: string }); + break; + case 'install': + this.renderInstall(parsed as { manager?: string; stream?: string; line?: string }); + break; + case 'error': { + const e = parsed as { message?: string; code?: string | number }; + this.output.write(`error: ${e.message ?? message.data}${e.code ? ` (${e.code})` : ''}\n`); + break; + } + case 'done': + // Caller picks up final result via the SSE iterator; nothing to render here. + break; + } + } + + private renderPhase(data: { phase?: string; status?: string; message?: string }): void { + const label = data.phase ?? '?'; + if (data.status === 'start') { + if (this.phase.current !== label) { + this.output.write(`${label}…\n`); + this.phase.current = label; + this.phase.installLineCount = 0; + } + } else if (data.status === 'done') { + if (label === 'install' && this.phase.installLineCount > 0) { + this.output.write(`install done (${this.phase.installLineCount} log lines)\n`); + } else { + this.output.write(`${label} done\n`); + } + } else if (data.status === 'error') { + this.output.write(`${label} ERROR: ${data.message ?? 'failed'}\n`); + } + } + + private renderInstall(data: { manager?: string; stream?: string; line?: string }): void { + const line = (data.line ?? '').trimEnd(); + if (!line) return; + if (data.manager && data.manager !== this.phase.installManager) { + this.phase.installManager = data.manager; + this.output.write(`install: using ${data.manager}\n`); + } + this.phase.installLineCount++; + // Prefix with stream so users can distinguish stderr noise from stdout warnings. + const tag = data.stream === 'stderr' ? '!' : '|'; + this.output.write(` ${tag} ${line}\n`); + } +} + +function formatBytes(bytes: number): string { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KiB`; + if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MiB`; + return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GiB`; +} diff --git a/bin/sseConsumer.ts b/bin/sseConsumer.ts new file mode 100644 index 000000000..b89485f15 --- /dev/null +++ b/bin/sseConsumer.ts @@ -0,0 +1,126 @@ +import { StringDecoder } from 'node:string_decoder'; +import type { Readable } from 'node:stream'; + +export interface SSEMessage { + event: string; + data: string; + id?: string; + retry?: number; +} + +/** + * Parse a Readable carrying Server-Sent Events into structured messages. + * + * Yields one `SSEMessage` per blank-line-terminated record. Handles split data: lines, + * CRLF or LF line endings, and arbitrary chunk boundaries — the underlying Node http + * Readable does not guarantee chunks align with SSE record boundaries. + */ +export async function* parseSSE(stream: Readable): AsyncGenerator { + const decoder = new StringDecoder('utf8'); + let buffer = ''; + for await (const chunk of stream) { + buffer += typeof chunk === 'string' ? chunk : decoder.write(chunk); + while (true) { + const recordEnd = buffer.indexOf('\n\n'); + const crlfEnd = buffer.indexOf('\r\n\r\n'); + let endIdx = -1; + let delimLen = 0; + if (recordEnd !== -1 && (crlfEnd === -1 || recordEnd < crlfEnd)) { + endIdx = recordEnd; + delimLen = 2; + } else if (crlfEnd !== -1) { + endIdx = crlfEnd; + delimLen = 4; + } + if (endIdx === -1) break; + const record = buffer.slice(0, endIdx); + buffer = buffer.slice(endIdx + delimLen); + const msg = parseRecord(record); + if (msg) yield msg; + } + } + buffer += decoder.end(); + // Any trailing record without a terminating blank line is treated as a final message, + // matching the looser behavior browsers exhibit on connection close. + if (buffer.trim()) { + const msg = parseRecord(buffer); + if (msg) yield msg; + } +} + +function parseRecord(record: string): SSEMessage | null { + const lines = record.split(/\r?\n/); + let event = 'message'; + let id: string | undefined; + let retry: number | undefined; + const dataLines: string[] = []; + for (const line of lines) { + if (line === '' || line.startsWith(':')) continue; + const colon = line.indexOf(':'); + const field = colon === -1 ? line : line.slice(0, colon); + // Per spec, a leading space after the colon is stripped. + let value = colon === -1 ? '' : line.slice(colon + 1); + if (value.startsWith(' ')) value = value.slice(1); + switch (field) { + case 'event': + event = value; + break; + case 'data': + dataLines.push(value); + break; + case 'id': + id = value; + break; + case 'retry': { + const n = Number(value); + if (Number.isFinite(n)) retry = n; + break; + } + } + } + if (dataLines.length === 0 && event === 'message') return null; + return { event, data: dataLines.join('\n'), id, retry }; +} + +interface RenderState { + currentPhase?: string; +} + +/** + * Render SSE deploy events as terse, line-oriented progress to stderr (so stdout stays + * reserved for the final JSON/YAML response document). Phase transitions print once. + */ +export function renderDeployProgress(message: SSEMessage, state: RenderState, output: NodeJS.WritableStream): void { + let parsed: unknown; + try { + parsed = JSON.parse(message.data); + } catch { + parsed = message.data; + } + switch (message.event) { + case 'phase': { + const p = parsed as { phase?: string; status?: string; rolling?: boolean }; + const label = p.phase ?? '?'; + if (p.status === 'start') { + if (state.currentPhase !== label) { + output.write(`${label}…\n`); + state.currentPhase = label; + } + } else if (p.status === 'done') { + output.write(`${label} done\n`); + } else if (p.status === 'error') { + const msg = (parsed as { message?: string }).message ?? 'failed'; + output.write(`${label} ERROR: ${msg}\n`); + } + break; + } + case 'error': { + const e = parsed as { message?: string; code?: string | number }; + output.write(`error: ${e.message ?? message.data}${e.code ? ` (${e.code})` : ''}\n`); + break; + } + case 'done': + // Caller picks up the final result via the SSE iterator; nothing to render here. + break; + } +} diff --git a/components/Application.ts b/components/Application.ts index 15d9c99d4..9ac2f7c0e 100644 --- a/components/Application.ts +++ b/components/Application.ts @@ -282,6 +282,16 @@ export async function installApplication(application: Application) { // If node_modules doesn't exist, we need to install dependencies } + // Build a per-spawn `onLine` callback that forwards each complete stdout/stderr line as + // an `install` SSE event to the deploy progress emitter. The `manager` field is filled + // in lazily by each spawn site so the CLI can show "install: using npm/pnpm/yarn/". + // Skipped entirely when there's no emitter (non-SSE callers). + const progress = application.progress; + const installEmitter = (manager: string) => + progress + ? (stream: 'stdout' | 'stderr', line: string) => progress.emit('install', { manager, stream, line }) + : undefined; + // If custom install command is specified, run it if (application.install?.command) { const [command, ...args] = application.install.command.split(' '); @@ -290,7 +300,8 @@ export async function installApplication(application: Application) { command, args, application.dirPath, - application.install?.timeout + application.install?.timeout, + installEmitter(command) ); // if it succeeds, return if (code === 0) { @@ -346,7 +357,8 @@ export async function installApplication(application: Application) { (application.packageManagerPrefix ? application.packageManagerPrefix + ' ' : '') + packageManager.name, application.install?.allowInstallScripts ? ['install'] : ['install', '--ignore-scripts'], // All of `npm`, `yarn`, and `pnpm` support the `install` command. If we need to configure options here we may have to use some other defaults though application.dirPath, - application.install?.timeout + application.install?.timeout, + installEmitter(packageManager.name) ); // if it succeeds, return @@ -398,7 +410,8 @@ export async function installApplication(application: Application) { (application.packageManagerPrefix ? application.packageManagerPrefix + ' ' : '') + 'npm', npmInstallArgs, application.dirPath, - application.install?.timeout + application.install?.timeout, + installEmitter('npm') ); // if it succeeds, return @@ -434,6 +447,10 @@ export class Application { dirPath: string; logger: Logger; packageManagerPrefix: string; // can be used to configure a package manager prefix, specifically "sfw". + // Optional progress emitter for SSE-style reporting. Set by the operations API when the + // caller requested `Accept: text/event-stream`. Undefined for the historical + // single-response code path; phase-event emissions are all optional-chained off this. + progress?: { emit(event: string, data: unknown): void }; constructor({ name, payload, packageIdentifier, install }: ApplicationOptions) { this.name = name; @@ -476,7 +493,14 @@ export function derivePackageIdentifier(packageIdentifier: string) { * @returns A promise that resolves when all preparation steps complete. */ export function prepareApplication(application: Application) { - return extractApplication(application).then(() => installApplication(application)); + return extractApplication(application).then(() => { + // extractApplication finished; the next phase is install. We emit the boundary here so + // the SSE consumer sees `extract done → install start` in order even though Application + // itself isn't aware of which phase comes next. + application.progress?.emit('phase', { phase: 'extract', status: 'done' }); + application.progress?.emit('phase', { phase: 'install', status: 'start' }); + return installApplication(application); + }); } /** @@ -596,7 +620,8 @@ export function nonInteractiveSpawn( command: string, args: string[], cwd: string, - timeoutMs: number = 60 * 60 * 1000 + timeoutMs: number = 60 * 60 * 1000, + onLine?: (stream: 'stdout' | 'stderr', line: string) => void ): Promise<{ stdout: string; stderr: string; code: number }> { return new Promise((resolve, reject) => { logger @@ -626,20 +651,36 @@ export function nonInteractiveSpawn( reject(new Error(`Command\`${command} ${args.join(' ')}\` timed out after ${timeoutMs}ms`)); }, timeoutMs); + // `onLine` forwards each complete line of stdout/stderr to the caller as it arrives, + // used by the deploy progress emitter to stream live install output back to the CLI. + // We buffer per stream until a newline so a chunk that splits mid-line doesn't fire + // a half-line; trailing partial lines are flushed on close. + let stdoutLineBuf = ''; + let stderrLineBuf = ''; + function flushLines(buf: string, stream: 'stdout' | 'stderr'): string { + if (!onLine) return ''; + let idx; + while ((idx = buf.indexOf('\n')) !== -1) { + onLine(stream, buf.slice(0, idx)); + buf = buf.slice(idx + 1); + } + return buf; + } + let stdout = ''; childProcess.stdout.on('data', (chunk) => { - // buffer stdout for later resolve - stdout += chunk.toString(); - // log stdout lines immediately - // TODO: Technically nothing guarantees that a chunk will be a complete line so need to implement - // something here to buffer until a newline character, then log the complete line - logger.loggerWithTag(`${applicationName}:spawn:${command}:stdout`).debug?.(chunk.toString()); + const text = chunk.toString(); + stdout += text; + logger.loggerWithTag(`${applicationName}:spawn:${command}:stdout`).debug?.(text); + if (onLine) stdoutLineBuf = flushLines(stdoutLineBuf + text, 'stdout'); }); // buffer stderr let stderr = ''; childProcess.stderr.on('data', (chunk) => { - stderr += chunk.toString(); + const text = chunk.toString(); + stderr += text; + if (onLine) stderrLineBuf = flushLines(stderrLineBuf + text, 'stderr'); }); childProcess.on('error', (error) => { @@ -656,6 +697,12 @@ export function nonInteractiveSpawn( if (stderr) { printStd(applicationName, command, stderr, 'stderr'); } + // Flush any partial line buffered after the last newline (npm sometimes ends without + // a trailing \n, especially on error paths). + if (onLine) { + if (stdoutLineBuf) onLine('stdout', stdoutLineBuf); + if (stderrLineBuf) onLine('stderr', stderrLineBuf); + } logger.loggerWithTag(`${applicationName}:spawn:${command}`).debug?.(`Process exited with code ${code}`); resolve({ stdout, diff --git a/components/operations.js b/components/operations.js index 4256af8b2..088f0e8e9 100644 --- a/components/operations.js +++ b/components/operations.js @@ -348,6 +348,11 @@ async function packageComponent(req) { * @returns {Promise} */ async function deployComponent(req) { + // `req.progress` is a ProgressEmitter set by handlePostRequest when the client sends + // `Accept: text/event-stream`. Non-SSE callers leave it undefined; the optional-chained + // calls below are no-ops in that case, keeping the historical single-response path intact. + const progress = req.progress; + if (req.project) { req.project = path.parse(req.project).name; } else if (req.package) { @@ -393,12 +398,21 @@ async function deployComponent(req) { timeout: req.install_timeout, }, }); + if (progress) application.progress = progress; - await prepareApplication(application); + progress?.emit('phase', { phase: 'extract', status: 'start' }); + try { + await prepareApplication(application); + } catch (err) { + progress?.emit('phase', { phase: 'extract_or_install', status: 'error', message: err?.message ?? String(err) }); + throw err; + } + progress?.emit('phase', { phase: 'install', status: 'done' }); // now we attempt to actually load the component in case there is // an error we can immediately detect and report, but app code should not run on the main thread if (!isMainThread && !process.env.HARPER_SAFE_MODE) { + progress?.emit('phase', { phase: 'load', status: 'start' }); const pseudoResources = new Resources(); pseudoResources.isWorker = true; @@ -415,16 +429,29 @@ async function deployComponent(req) { req.project ); - if (lastError) throw lastError; + if (lastError) { + progress?.emit('phase', { phase: 'load', status: 'error', message: lastError?.message ?? String(lastError) }); + throw lastError; + } + progress?.emit('phase', { phase: 'load', status: 'done' }); } const rollingRestart = req.restart === 'rolling'; // if doing a rolling restart set restart to false so that other nodes don't also restart. req.restart = rollingRestart ? false : req.restart; + progress?.emit('phase', { phase: 'replicate', status: 'start' }); + // ProgressEmitter is local to the origin: its listeners are functions that can't survive + // the replication-channel serialization, and a peer node receiving `req.progress` as a + // plain `{listeners:[]}` object would still take the `if (progress)` branch and then + // throw `TypeError: progress.emit is not a function`. Strip it before fan-out. + delete req.progress; let response = await server.replication.replicateOperation(req); + progress?.emit('phase', { phase: 'replicate', status: 'done' }); if (req.restart === true) { + progress?.emit('phase', { phase: 'restart', status: 'start' }); manageThreads.restartWorkers('http'); response.message = `Successfully deployed: ${application.name}, restarting Harper`; } else if (rollingRestart) { + progress?.emit('phase', { phase: 'restart', status: 'start', rolling: true }); const serverUtilities = require('../server/serverHelpers/serverUtilities.ts'); const jobResponse = await serverUtilities.executeJob({ operation: 'restart_service', diff --git a/components/packageComponent.ts b/components/packageComponent.ts index 6075e8432..f1cf3a648 100644 --- a/components/packageComponent.ts +++ b/components/packageComponent.ts @@ -1,5 +1,6 @@ import { join } from 'path'; import { Readable } from 'node:stream'; +import { readdir, stat } from 'node:fs/promises'; import tar from 'tar-fs'; import { createGzip } from 'node:zlib'; @@ -36,6 +37,56 @@ export function streamPackagedDirectory(directory: string, options: PackageOptio return packStream.pipe(gzip); } +/** + * Compute the total uncompressed size in bytes of files that `streamPackagedDirectory` + * would include for the same options. Used by the CLI to drive an upload progress bar: + * we count bytes flowing through the tar source (pre-gzip), so a percentage against this + * total represents "how much of the source tree has been read", which is what users want + * to see during a long deploy. The on-the-wire (gzipped) size will be smaller, so the + * bar may finish slightly before the request actually completes — acceptable trade-off + * versus walking twice or buffering. + * + * Symlinks are stat'd (not lstat'd) when `skip_symlinks` is false, matching tar-fs's + * `dereference: !skip_symlinks` behavior. Errors on individual entries are swallowed so + * a transient stat failure doesn't block the deploy — total is best-effort. + */ +export async function getPackagedDirectorySize( + directory: string, + options: PackageOptions = DEFAULT_OPTIONS +): Promise { + const skipNodeModules = options.skip_node_modules === true; + const dereference = options.skip_symlinks !== true; + const statFn = dereference ? stat : (await import('node:fs/promises')).lstat; + let total = 0; + async function walk(current: string, rel: string): Promise { + let entries; + try { + entries = await readdir(current, { withFileTypes: true }); + } catch { + return; + } + for (const entry of entries) { + const relPath = rel ? join(rel, entry.name) : entry.name; + if (skipNodeModules && (relPath.includes('node_modules') || relPath.includes(join('cache', 'webpack')))) { + continue; + } + const full = join(current, entry.name); + if (entry.isDirectory()) { + await walk(full, relPath); + } else if (entry.isFile() || (dereference && entry.isSymbolicLink())) { + try { + const s = await statFn(full); + if (s.isFile()) total += s.size; + } catch { + /* skip unreadable entries — don't block the deploy on a best-effort total */ + } + } + } + } + await walk(directory, ''); + return total; +} + /** * Package a directory into a tar+gzip buffer. Retained for callers that need * an in-memory payload (small deploys, tests). For large directories prefer diff --git a/server/serverHelpers/progressEmitter.ts b/server/serverHelpers/progressEmitter.ts new file mode 100644 index 000000000..f383957ca --- /dev/null +++ b/server/serverHelpers/progressEmitter.ts @@ -0,0 +1,98 @@ +import { PassThrough, Readable } from 'node:stream'; + +export interface ProgressEvent { + event: string; + data: unknown; +} + +export type ProgressListener = (event: ProgressEvent) => void; + +/** + * Lightweight pub-sub used to report phase/install/replicate events from a long-running + * operation back to the HTTP layer. We deliberately don't use Node's EventEmitter here: + * we only need broadcast semantics for a small set of event types, and we want the + * `emit(event, data)` shape that matches the SSE wire format directly. + */ +export class ProgressEmitter { + private listeners: ProgressListener[] = []; + + emit(event: string, data: unknown): void { + // Snapshot before iteration so a listener that unsubscribes itself during dispatch + // doesn't shift indexes underneath us. + const snapshot = this.listeners.slice(); + for (const listener of snapshot) { + try { + listener({ event, data }); + } catch { + // A buggy listener must never break the operation. Swallow and continue. + } + } + } + + subscribe(listener: ProgressListener): () => void { + this.listeners.push(listener); + return () => { + const i = this.listeners.indexOf(listener); + if (i !== -1) this.listeners.splice(i, 1); + }; + } +} + +/** + * Wrap a long-running operation so its progress events stream back as Server-Sent Events. + * + * The returned Readable emits one SSE message per `emitter.emit(...)` call, then a final + * `done` (or `error`) event with the operation's result, then ends. The caller is + * expected to set Content-Type: text/event-stream on the response. + */ +export function createSSEResponseStream(emitter: ProgressEmitter, operation: () => Promise): Readable { + const stream = new PassThrough(); + + let active = true; + const unsubscribe = emitter.subscribe((event) => { + if (active) writeSSE(stream, event); + }); + + const cleanup = () => { + if (active) { + active = false; + unsubscribe(); + } + }; + + // If the client disconnects (Ctrl-C, network drop) stop writing to the stream and + // release the emitter subscription so it doesn't accumulate for the operation lifetime. + stream.on('close', cleanup); + stream.on('end', cleanup); + + operation() + .then((result) => { + if (active) writeSSE(stream, { event: 'done', data: { result } }); + }) + .catch((err) => { + if (active) { + writeSSE(stream, { + event: 'error', + data: { + message: err?.message ?? String(err), + code: err?.statusCode ?? err?.code, + }, + }); + } + }) + .finally(() => { + cleanup(); + stream.end(); + }); + + return stream; +} + +function writeSSE(stream: PassThrough, event: ProgressEvent): void { + const data = typeof event.data === 'string' ? event.data : JSON.stringify(event.data); + stream.write(`event: ${event.event}\n`); + for (const line of data.split(/\r?\n/)) { + stream.write(`data: ${line}\n`); + } + stream.write('\n'); +} diff --git a/server/serverHelpers/serverHandlers.js b/server/serverHelpers/serverHandlers.js index 692c9fbc4..ebed8d589 100644 --- a/server/serverHelpers/serverHandlers.js +++ b/server/serverHelpers/serverHandlers.js @@ -15,6 +15,13 @@ const pAuthorize = util.promisify(auth.authorize); const serverUtilities = require('./serverUtilities.ts'); const { applyImpersonation } = require('../../security/impersonation.ts'); const { createGzip, constants } = require('zlib'); +const { ProgressEmitter, createSSEResponseStream } = require('./progressEmitter.ts'); + +// Operations that support `Accept: text/event-stream` for live progress reporting. +// Adding an operation here means its handler may read `req.body.progress` (a ProgressEmitter) +// and emit phase/install/replicate events; non-SSE clients still get the historical +// single-response behavior because the emitter is undefined on that path. +const SSE_PROGRESS_OPERATIONS = new Set([terms.OPERATIONS_ENUM.DEPLOY_COMPONENT]); const NO_AUTH_OPERATIONS = [ terms.OPERATIONS_ENUM.CREATE_AUTHENTICATION_TOKENS, @@ -119,6 +126,23 @@ async function handlePostRequest(req, res, _bypassAuth = false) { if (req.body.bypass_auth) delete req.body.bypass_auth; operation_function = serverUtilities.chooseOperation(req.body); + + // SSE progress branch — when the client asks for `text/event-stream` on an operation + // that supports it, run the operation in the background and stream events back as they + // happen. The progress emitter is attached to req.body so the operation handler can + // emit without changing its return signature; non-SSE callers leave `progress` + // undefined and the handler stays on its synchronous result path. + // Optional-chained `req.headers` because unit tests dispatch through this path with + // synthetic req shapes that don't set headers; production Fastify requests always do. + if (req.headers?.accept === 'text/event-stream' && SSE_PROGRESS_OPERATIONS.has(req.body.operation)) { + const emitter = new ProgressEmitter(); + req.body.progress = emitter; + res.header('Content-Type', 'text/event-stream'); + res.header('Cache-Control', 'no-cache'); + res.header('X-Accel-Buffering', 'no'); // disable proxy buffering so events flush in real time + return createSSEResponseStream(emitter, () => serverUtilities.processLocalTransaction(req, operation_function)); + } + let result = await serverUtilities.processLocalTransaction(req, operation_function); if (result instanceof Readable && result.headers) { for (let [name, value] of result.headers) { diff --git a/unitTests/bin/deployRenderer.test.js b/unitTests/bin/deployRenderer.test.js new file mode 100644 index 000000000..9ab450b95 --- /dev/null +++ b/unitTests/bin/deployRenderer.test.js @@ -0,0 +1,108 @@ +'use strict'; + +const assert = require('node:assert'); +const { PassThrough } = require('node:stream'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { DeployRenderer } = require('#src/bin/deployRenderer'); + +function makeOutput() { + const lines = []; + const stream = { write: (s) => lines.push(s), isTTY: false }; + return { stream, lines }; +} + +describe('DeployRenderer', () => { + describe('upload progress (non-TTY)', () => { + it('advances bytes as data flows through the tap and logs at 10% steps', async () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ uploadTotal: 1_000_000, output: stream }); + const source = new PassThrough(); + const tap = renderer.tapUploadStream(source); + // Drain into a sink so backpressure doesn't pause the tap. + const sink = new PassThrough(); + tap.pipe(sink); + sink.on('data', () => {}); + // Write 5×200_000 byte chunks → 1MB total, crossing each 10% threshold. + for (let i = 0; i < 5; i++) source.write(Buffer.alloc(200_000)); + source.end(); + await new Promise((resolve) => sink.on('end', resolve)); + renderer.endUpload(); + // Expect at least one per-10% progress line plus the final "Upload complete" line. + const progressLines = lines.filter((l) => /Uploaded /.test(l)); + assert.ok( + progressLines.length >= 4, + `expected multiple progress lines, got ${progressLines.length}: ${progressLines.join('|')}` + ); + assert.ok( + lines.some((l) => l.startsWith('Upload complete')), + 'should log Upload complete on endUpload' + ); + }); + + it('endUpload is idempotent', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ uploadTotal: 100, output: stream }); + renderer.tapUploadStream(new PassThrough()); + renderer.endUpload(); + renderer.endUpload(); + const completeLines = lines.filter((l) => l.startsWith('Upload complete')); + assert.strictEqual(completeLines.length, 1); + }); + }); + + describe('renderEvent', () => { + function sseMessage(event, data) { + return { event, data: JSON.stringify(data) }; + } + + it('prints a phase line on start, then on done (no duplicate start)', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('phase', { phase: 'extract', status: 'start' })); + renderer.renderEvent(sseMessage('phase', { phase: 'extract', status: 'start' })); + renderer.renderEvent(sseMessage('phase', { phase: 'extract', status: 'done' })); + assert.deepStrictEqual(lines, ['extract…\n', 'extract done\n']); + }); + + it('forwards install stdout/stderr lines and headers the manager name once', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('phase', { phase: 'install', status: 'start' })); + renderer.renderEvent(sseMessage('install', { manager: 'npm', stream: 'stdout', line: 'added 42 packages' })); + renderer.renderEvent(sseMessage('install', { manager: 'npm', stream: 'stdout', line: 'audited 100 packages' })); + renderer.renderEvent( + sseMessage('install', { manager: 'npm', stream: 'stderr', line: 'npm warn deprecated foo' }) + ); + renderer.renderEvent(sseMessage('phase', { phase: 'install', status: 'done' })); + assert.ok(lines.includes('install: using npm\n'), 'expected manager header'); + assert.strictEqual( + lines.filter((l) => l === 'install: using npm\n').length, + 1, + 'manager header logged only once' + ); + assert.ok(lines.includes(' | added 42 packages\n')); + assert.ok(lines.includes(' | audited 100 packages\n')); + assert.ok(lines.includes(' ! npm warn deprecated foo\n'), 'stderr should be tagged with !'); + assert.ok( + lines.some((l) => /install done \(3 log lines\)/.test(l)), + 'install-done should count emitted lines' + ); + }); + + it('renders an error event with code suffix', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('error', { message: 'npm install failed', code: 500 })); + assert.match(lines[0], /error: npm install failed \(500\)/); + }); + + it('renders a phase-error with the message', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('phase', { phase: 'install', status: 'error', message: 'exit code 1' })); + assert.match(lines[0], /install ERROR: exit code 1/); + }); + }); +}); diff --git a/unitTests/bin/sseConsumer.test.js b/unitTests/bin/sseConsumer.test.js new file mode 100644 index 000000000..2bc1c25a2 --- /dev/null +++ b/unitTests/bin/sseConsumer.test.js @@ -0,0 +1,95 @@ +'use strict'; + +const assert = require('node:assert'); +const { Readable, PassThrough } = require('node:stream'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { parseSSE, renderDeployProgress } = require('#src/bin/sseConsumer'); + +async function collectMessages(stream) { + const out = []; + for await (const msg of parseSSE(stream)) out.push(msg); + return out; +} + +describe('parseSSE', () => { + it('parses a single record', async () => { + const stream = Readable.from(['event: phase\ndata: {"phase":"extract"}\n\n']); + const msgs = await collectMessages(stream); + assert.deepStrictEqual(msgs, [{ event: 'phase', data: '{"phase":"extract"}', id: undefined, retry: undefined }]); + }); + + it('joins multi-line data fields with \\n per the SSE spec', async () => { + const stream = Readable.from(['event: install\ndata: line1\ndata: line2\n\n']); + const msgs = await collectMessages(stream); + assert.strictEqual(msgs[0].data, 'line1\nline2'); + }); + + it('handles records split across multiple chunks', async () => { + const stream = new PassThrough(); + const collector = collectMessages(stream); + stream.write('event: phase\n'); + stream.write('data: {"phase":"extract","sta'); + stream.write('tus":"start"}\n\n'); + stream.write('event: phase\ndata: {"phase":"extract","status":"done"}\n\n'); + stream.end(); + const msgs = await collector; + assert.strictEqual(msgs.length, 2); + assert.strictEqual(JSON.parse(msgs[0].data).status, 'start'); + assert.strictEqual(JSON.parse(msgs[1].data).status, 'done'); + }); + + it('handles CRLF line endings', async () => { + const stream = Readable.from(['event: phase\r\ndata: {"x":1}\r\n\r\n']); + const msgs = await collectMessages(stream); + assert.strictEqual(msgs[0].event, 'phase'); + assert.deepStrictEqual(JSON.parse(msgs[0].data), { x: 1 }); + }); + + it('strips a single leading space after the colon, per spec', async () => { + const stream = Readable.from(['data: hello\n\n']); + const msgs = await collectMessages(stream); + // only ONE space is stripped, the second remains + assert.strictEqual(msgs[0].data, ' hello'); + }); + + it('ignores comment lines (leading colon)', async () => { + const stream = Readable.from([': heartbeat\nevent: ping\ndata: ok\n\n']); + const msgs = await collectMessages(stream); + assert.strictEqual(msgs[0].event, 'ping'); + assert.strictEqual(msgs[0].data, 'ok'); + }); +}); + +describe('renderDeployProgress', () => { + it('prints a phase line on start and another on done', () => { + const lines = []; + const out = { write: (s) => lines.push(s) }; + const state = {}; + renderDeployProgress({ event: 'phase', data: JSON.stringify({ phase: 'extract', status: 'start' }) }, state, out); + renderDeployProgress({ event: 'phase', data: JSON.stringify({ phase: 'extract', status: 'done' }) }, state, out); + assert.deepStrictEqual(lines, ['extract…\n', 'extract done\n']); + }); + + it('does not repeat the same phase-start line', () => { + const lines = []; + const out = { write: (s) => lines.push(s) }; + const state = {}; + const msg = { event: 'phase', data: JSON.stringify({ phase: 'extract', status: 'start' }) }; + renderDeployProgress(msg, state, out); + renderDeployProgress(msg, state, out); + assert.strictEqual(lines.length, 1); + }); + + it('prints an error line with the message', () => { + const lines = []; + const out = { write: (s) => lines.push(s) }; + renderDeployProgress( + { event: 'error', data: JSON.stringify({ message: 'npm install failed', code: 500 }) }, + {}, + out + ); + assert.match(lines[0], /error: npm install failed \(500\)/); + }); +}); diff --git a/unitTests/components/packageDirectorySize.test.js b/unitTests/components/packageDirectorySize.test.js new file mode 100644 index 000000000..b51e22aac --- /dev/null +++ b/unitTests/components/packageDirectorySize.test.js @@ -0,0 +1,83 @@ +'use strict'; + +const assert = require('node:assert'); +const path = require('node:path'); +const fs = require('node:fs/promises'); +const os = require('node:os'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { getPackagedDirectorySize } = require('#src/components/packageComponent'); + +async function makeFixture(files) { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'pkg-size-')); + for (const [rel, content] of Object.entries(files)) { + const full = path.join(dir, rel); + await fs.mkdir(path.dirname(full), { recursive: true }); + await fs.writeFile(full, content); + } + return dir; +} + +describe('getPackagedDirectorySize', () => { + it('returns the sum of file sizes for a simple tree', async () => { + const dir = await makeFixture({ + 'a.txt': 'A'.repeat(100), + 'b/c.txt': 'C'.repeat(200), + 'b/d/e.txt': 'E'.repeat(50), + }); + try { + assert.strictEqual(await getPackagedDirectorySize(dir), 350); + } finally { + await fs.rm(dir, { recursive: true, force: true }); + } + }); + + it('excludes node_modules when skip_node_modules is true', async () => { + const dir = await makeFixture({ + 'src/app.js': 'a'.repeat(123), + 'node_modules/lib/index.js': 'x'.repeat(1_000_000), + }); + try { + const withModules = await getPackagedDirectorySize(dir); + const withoutModules = await getPackagedDirectorySize(dir, { skip_node_modules: true }); + assert.strictEqual(withModules, 123 + 1_000_000); + assert.strictEqual(withoutModules, 123); + } finally { + await fs.rm(dir, { recursive: true, force: true }); + } + }); + + it('also excludes webpack cache when skip_node_modules is true', async () => { + const dir = await makeFixture({ + 'src/a.js': 'a'.repeat(50), + '.cache/webpack/big.bin': 'x'.repeat(500_000), + }); + try { + assert.strictEqual(await getPackagedDirectorySize(dir, { skip_node_modules: true }), 50); + } finally { + await fs.rm(dir, { recursive: true, force: true }); + } + }); + + it('returns 0 for an empty directory', async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), 'pkg-size-empty-')); + try { + assert.strictEqual(await getPackagedDirectorySize(dir), 0); + } finally { + await fs.rm(dir, { recursive: true, force: true }); + } + }); + + it('best-effort: continues past unreadable entries instead of throwing', async () => { + const dir = await makeFixture({ 'a.txt': 'A'.repeat(10) }); + try { + // Read from a deliberately-nonexistent sub-path — getPackagedDirectorySize swallows + // per-entry errors so a transient stat failure can't block a deploy. + assert.strictEqual(await getPackagedDirectorySize(path.join(dir, 'missing-subdir')), 0); + assert.strictEqual(await getPackagedDirectorySize(dir), 10); + } finally { + await fs.rm(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/unitTests/server/serverHelpers/progressEmitter.test.js b/unitTests/server/serverHelpers/progressEmitter.test.js new file mode 100644 index 000000000..5111d556f --- /dev/null +++ b/unitTests/server/serverHelpers/progressEmitter.test.js @@ -0,0 +1,120 @@ +'use strict'; + +const assert = require('node:assert'); +const { Readable } = require('node:stream'); +const testUtils = require('../../testUtils.js'); +testUtils.preTestPrep(); + +const { ProgressEmitter, createSSEResponseStream } = require('#src/server/serverHelpers/progressEmitter'); + +function collect(stream) { + return new Promise((resolve, reject) => { + const chunks = []; + stream.on('data', (c) => chunks.push(c)); + stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); + stream.on('error', reject); + }); +} + +function parseSSEBlocks(text) { + return text + .split('\n\n') + .filter((block) => block.trim().length > 0) + .map((block) => { + const out = {}; + for (const line of block.split('\n')) { + const colon = line.indexOf(':'); + if (colon === -1) continue; + const field = line.slice(0, colon); + let value = line.slice(colon + 1); + if (value.startsWith(' ')) value = value.slice(1); + out[field] = value; + } + return out; + }); +} + +describe('ProgressEmitter', () => { + it('delivers events to every subscriber', () => { + const emitter = new ProgressEmitter(); + const a = []; + const b = []; + emitter.subscribe((e) => a.push(e)); + emitter.subscribe((e) => b.push(e)); + emitter.emit('phase', { phase: 'extract', status: 'start' }); + emitter.emit('phase', { phase: 'extract', status: 'done' }); + assert.deepStrictEqual(a, [ + { event: 'phase', data: { phase: 'extract', status: 'start' } }, + { event: 'phase', data: { phase: 'extract', status: 'done' } }, + ]); + assert.deepStrictEqual(b, a); + }); + + it('unsubscribe stops further delivery', () => { + const emitter = new ProgressEmitter(); + const received = []; + const unsubscribe = emitter.subscribe((e) => received.push(e)); + emitter.emit('phase', { phase: 'extract', status: 'start' }); + unsubscribe(); + emitter.emit('phase', { phase: 'extract', status: 'done' }); + assert.strictEqual(received.length, 1); + }); + + it('swallows listener exceptions so operations are never broken by a buggy subscriber', () => { + const emitter = new ProgressEmitter(); + emitter.subscribe(() => { + throw new Error('listener boom'); + }); + const ok = []; + emitter.subscribe((e) => ok.push(e)); + emitter.emit('phase', { phase: 'extract', status: 'start' }); + assert.strictEqual(ok.length, 1); + }); +}); + +describe('createSSEResponseStream', () => { + it('streams emitter events then a terminating `done` event with the operation result', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => { + emitter.emit('phase', { phase: 'extract', status: 'start' }); + await new Promise((r) => setImmediate(r)); + emitter.emit('phase', { phase: 'extract', status: 'done' }); + return { message: 'Successfully deployed: demo' }; + }); + const text = await collect(stream); + const events = parseSSEBlocks(text); + assert.strictEqual(events.length, 3); + assert.strictEqual(events[0].event, 'phase'); + assert.deepStrictEqual(JSON.parse(events[0].data), { phase: 'extract', status: 'start' }); + assert.strictEqual(events[1].event, 'phase'); + assert.deepStrictEqual(JSON.parse(events[1].data), { phase: 'extract', status: 'done' }); + assert.strictEqual(events[2].event, 'done'); + assert.deepStrictEqual(JSON.parse(events[2].data), { result: { message: 'Successfully deployed: demo' } }); + }); + + it('streams an `error` event when the operation rejects', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => { + emitter.emit('phase', { phase: 'extract', status: 'start' }); + const err = new Error('boom'); + err.statusCode = 500; + throw err; + }); + const events = parseSSEBlocks(await collect(stream)); + assert.strictEqual(events[events.length - 1].event, 'error'); + const errData = JSON.parse(events[events.length - 1].data); + assert.strictEqual(errData.message, 'boom'); + assert.strictEqual(errData.code, 500); + }); + + it('still closes the stream cleanly when the operation emits nothing', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => ({ ok: true })); + const events = parseSSEBlocks(await collect(stream)); + assert.strictEqual(events.length, 1); + assert.strictEqual(events[0].event, 'done'); + }); +}); + +// keep Readable import live for any future tests that need stream sources +void Readable; diff --git a/utility/common_utils.ts b/utility/common_utils.ts index 8b9b14819..ef227d0f2 100644 --- a/utility/common_utils.ts +++ b/utility/common_utils.ts @@ -756,8 +756,24 @@ export function httpRequest(options: any, data: any): Promise { const req = client.request(options, (response: http.IncomingMessage & { body?: string }) => { + if (streamResponse) { + // Hand the raw stream to the caller; do not setEncoding so binary-safe consumers + // (or SSE parsers that prefer Buffers) still work. + resolve(response); + return; + } response.setEncoding('utf8'); response.body = ''; response.on('data', (chunk) => {