diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index 882a071b5..a13b1fc30 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -11,11 +11,20 @@ import * as fs from 'fs-extra'; import * as YAML from 'yaml'; import { streamPackagedDirectory } from '../components/packageComponent.ts'; import { buildMultipartBody } from './multipartBuilder.ts'; +import { parseSSE } from './sseConsumer.ts'; +import { DeployRenderer } from './deployRenderer.ts'; import { getHdbPid } from '../utility/processManagement/processManagement.js'; import { initConfig, getConfigPath } from '../config/configUtils.js'; const OP_ALIASES = { deploy: 'deploy_component', package: 'package_component' }; +// Operations whose responses should be consumed as text/event-stream so live phase events +// (prepare, load, replicate, restart) render as they happen instead of after the whole +// deploy completes. Add an operation here only after wiring its server-side +// SSE_PROGRESS_OPERATIONS entry — otherwise the server returns the buffered JSON path and +// the SSE parser sees no events. +const SSE_OPERATIONS = new Set(['deploy_component']); + // Properties on `req` that the CLI itself uses for transport/UX, not the operations API. // They never get serialized into the request body. const TRANSPORT_ONLY_FIELDS = new Set([ @@ -191,6 +200,15 @@ async function cliOperations(req: any, skipResponseLog = false) { options.headers.Authorization = `Bearer ${tokens.operation_token}`; } } + const useSse = SSE_OPERATIONS.has(req.operation); + if (useSse) { + options.headers.Accept = 'text/event-stream'; + options.streamResponse = true; + } + // One renderer owns the (future) upload bar and the SSE event rendering for a + // multipart deploy. Created here so the upload-stream tap and the SSE consumer + // below share the same instance. + const renderer = req._multipart ? new DeployRenderer({}) : null; let body; if (req._multipart) { const packageStream = req._packageStream; @@ -209,20 +227,66 @@ async function cliOperations(req: any, skipResponseLog = false) { // Use chunked transfer-encoding: we don't know the total size up front because the // payload is streamed from `tar.pack` and never fully buffered. options.headers['Transfer-Encoding'] = 'chunked'; - body = multipart.stream; + // Tap the body so bytes flowing into the HTTP request advance the upload bar. + // The renderer's Transform is identity — chunks pass through unmodified. + body = renderer ? renderer.tapUploadStream(multipart.stream) : multipart.stream; } else { body = req; } let response: any = await httpRequest(options, body); + // Upload is done by the time we get the response; tear the bar down before any SSE + // rendering so the bar and event lines don't fight for the same terminal row. + renderer?.endUpload(); + let responseData; - try { - responseData = JSON.parse(response.body); - } catch { - responseData = { - status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'), - body: response.body, - }; + if (useSse && response.headers['content-type']?.startsWith('text/event-stream')) { + // Consume SSE: render phase events live, capture the final result from the `done` + // event (or the error message from the `error` event). The HTTP status stays 200 + // until end-of-stream; failures are signaled in-band. + let finalResult; + let sseError; + for await (const message of parseSSE(response)) { + renderer?.renderEvent(message); + if (message.event === 'done') { + try { + finalResult = JSON.parse(message.data)?.result; + } catch { + finalResult = message.data; + } + } else if (message.event === 'error') { + try { + sseError = JSON.parse(message.data); + } catch { + sseError = { message: message.data }; + } + } + } + if (sseError) { + const errMsg = sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError); + console.error(`error: ${errMsg}`); + process.exit(1); + } + responseData = finalResult ?? { message: 'Deploy completed (no result payload).' }; + } else { + // When useSse is true, httpRequest returns a raw IncomingMessage (streamResponse mode), + // so .body is undefined. Drain the stream to get the text (e.g. a 401 error body). + let bodyText: string; + if (useSse) { + const chunks: Buffer[] = []; + for await (const chunk of response as AsyncIterable) chunks.push(Buffer.from(chunk)); + bodyText = Buffer.concat(chunks).toString('utf8'); + } else { + bodyText = response.body; + } + try { + responseData = JSON.parse(bodyText); + } catch { + responseData = { + status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'), + body: bodyText, + }; + } } let responseLog; diff --git a/bin/deployRenderer.ts b/bin/deployRenderer.ts new file mode 100644 index 000000000..cdaf5ca59 --- /dev/null +++ b/bin/deployRenderer.ts @@ -0,0 +1,187 @@ +import { Transform } from 'node:stream'; +// cli-progress is already a runtime dep of harper (see package.json); using its +// SingleBar to render the upload phase here doesn't add a new dependency. +import cliProgress from 'cli-progress'; +import type { SSEMessage } from './sseConsumer.ts'; + +interface RendererOptions { + uploadTotal?: number; + output?: NodeJS.WritableStream; +} + +interface UploadState { + bar: cliProgress.SingleBar | null; + sent: number; + textLastLogged: number; + finished: boolean; +} + +interface PhaseState { + current?: string; + installManager?: string; + installLineCount: number; +} + +/** + * Deploy-time renderer that owns the progress display across two phases: + * + * 1. Local upload — driven by `tapUploadStream`, which wraps the multipart body so we + * can update a `cli-progress` bar against the precomputed uncompressed source-tree + * total. In a non-TTY environment (CI logs, redirected output) we fall back to + * periodic text lines so logs stay grep-able. + * + * 2. Server-side phases — driven by `renderEvent`, called for each SSE message the + * CLI receives from the operations API. Phase events print one-liners; live + * `install` events (npm/pnpm/yarn stdout) are throttled to one line under a + * "[install]" header so a noisy `npm install` doesn't drown the terminal. + * + * Designed so the two phases hand off cleanly: `endUpload()` tears the bar down (so + * it doesn't compete with subsequent prints) before any SSE events render. + */ +export class DeployRenderer { + private upload: UploadState = { bar: null, sent: 0, textLastLogged: 0, finished: false }; + private phase: PhaseState = { installLineCount: 0 }; + private output: NodeJS.WritableStream; + private isTty: boolean; + private uploadTotal: number; + + constructor(options: RendererOptions = {}) { + this.output = options.output ?? process.stderr; + // Only render a bar when stderr is a real terminal. CI runners, log redirection, + // and pipes look identical from Node's perspective: !isTTY. + this.isTty = Boolean((this.output as NodeJS.WriteStream).isTTY); + this.uploadTotal = options.uploadTotal ?? 0; + } + + /** + * Wrap an outbound stream so each byte flowing through it advances the upload bar. + * The Transform is identity — chunks pass through unmodified. + */ + tapUploadStream(stream: T): NodeJS.ReadableStream { + this.upload.bar = this.isTty + ? new cliProgress.SingleBar( + { + format: 'Uploading [{bar}] {percentage}% | {value}/{total} bytes', + barCompleteChar: '█', + barIncompleteChar: '░', + hideCursor: true, + stream: this.output, + etaBuffer: 50, + }, + cliProgress.Presets.shades_classic + ) + : null; + this.upload.bar?.start(this.uploadTotal || 1, 0); + + const counter = new Transform({ + transform: (chunk, _enc, cb) => { + this.upload.sent += chunk.length; + this.tickUpload(); + cb(null, chunk); + }, + flush: (cb) => { + this.endUpload(); + cb(); + }, + }); + stream.on('error', (err) => counter.destroy(err)); + stream.pipe(counter); + return counter; + } + + endUpload(): void { + if (this.upload.finished) return; + this.upload.finished = true; + if (this.upload.bar) { + // Snap to total so the bar shows 100% even when our uncompressed-total estimate + // is slightly off (gzip output is usually smaller than the source tree). + if (this.uploadTotal > 0) this.upload.bar.update(this.uploadTotal); + this.upload.bar.stop(); + this.upload.bar = null; + } else { + this.output.write(`Upload complete (${formatBytes(this.upload.sent)})\n`); + } + } + + private tickUpload(): void { + if (this.upload.bar) { + this.upload.bar.update(this.upload.sent); + return; + } + // Non-TTY: log a line every 10% of the total (or every 5MB if total unknown). + const step = this.uploadTotal > 0 ? this.uploadTotal / 10 : 5 * 1024 * 1024; + if (this.upload.sent - this.upload.textLastLogged >= step) { + this.upload.textLastLogged = this.upload.sent; + const pct = this.uploadTotal > 0 ? Math.min(100, Math.floor((this.upload.sent / this.uploadTotal) * 100)) : null; + this.output.write( + pct !== null + ? `Uploaded ${formatBytes(this.upload.sent)} / ~${formatBytes(this.uploadTotal)} (${pct}%)\n` + : `Uploaded ${formatBytes(this.upload.sent)}\n` + ); + } + } + + renderEvent(message: SSEMessage): void { + let parsed: unknown; + try { + parsed = JSON.parse(message.data); + } catch { + parsed = message.data; + } + switch (message.event) { + case 'phase': + this.renderPhase(parsed as { phase?: string; status?: string; message?: string }); + break; + case 'install': + this.renderInstall(parsed as { manager?: string; stream?: string; line?: string }); + break; + case 'error': { + const e = parsed as { message?: string; code?: string | number }; + this.output.write(`error: ${e.message ?? message.data}${e.code ? ` (${e.code})` : ''}\n`); + break; + } + case 'done': + // Caller picks up final result via the SSE iterator; nothing to render here. + break; + } + } + + private renderPhase(data: { phase?: string; status?: string; message?: string }): void { + const label = data.phase ?? '?'; + if (data.status === 'start') { + if (this.phase.current !== label) { + this.output.write(`${label}…\n`); + this.phase.current = label; + this.phase.installLineCount = 0; + } + } else if (data.status === 'done') { + if (label === 'install' && this.phase.installLineCount > 0) { + this.output.write(`install done (${this.phase.installLineCount} log lines)\n`); + } else { + this.output.write(`${label} done\n`); + } + } else if (data.status === 'error') { + this.output.write(`${label} ERROR: ${data.message ?? 'failed'}\n`); + } + } + + private renderInstall(data: { manager?: string; stream?: string; line?: string }): void { + const line = (data.line ?? '').trimEnd(); + if (!line) return; + if (data.manager && data.manager !== this.phase.installManager) { + this.phase.installManager = data.manager; + this.output.write(`install: using ${data.manager}\n`); + } + this.phase.installLineCount++; + // Prefix with stream so users can distinguish stderr noise from stdout warnings. + const tag = data.stream === 'stderr' ? '!' : '|'; + this.output.write(` ${tag} ${line}\n`); + } +} + +function formatBytes(bytes: number): string { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KiB`; + if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MiB`; + return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GiB`; +} diff --git a/bin/sseConsumer.ts b/bin/sseConsumer.ts new file mode 100644 index 000000000..b89485f15 --- /dev/null +++ b/bin/sseConsumer.ts @@ -0,0 +1,126 @@ +import { StringDecoder } from 'node:string_decoder'; +import type { Readable } from 'node:stream'; + +export interface SSEMessage { + event: string; + data: string; + id?: string; + retry?: number; +} + +/** + * Parse a Readable carrying Server-Sent Events into structured messages. + * + * Yields one `SSEMessage` per blank-line-terminated record. Handles split data: lines, + * CRLF or LF line endings, and arbitrary chunk boundaries — the underlying Node http + * Readable does not guarantee chunks align with SSE record boundaries. + */ +export async function* parseSSE(stream: Readable): AsyncGenerator { + const decoder = new StringDecoder('utf8'); + let buffer = ''; + for await (const chunk of stream) { + buffer += typeof chunk === 'string' ? chunk : decoder.write(chunk); + while (true) { + const recordEnd = buffer.indexOf('\n\n'); + const crlfEnd = buffer.indexOf('\r\n\r\n'); + let endIdx = -1; + let delimLen = 0; + if (recordEnd !== -1 && (crlfEnd === -1 || recordEnd < crlfEnd)) { + endIdx = recordEnd; + delimLen = 2; + } else if (crlfEnd !== -1) { + endIdx = crlfEnd; + delimLen = 4; + } + if (endIdx === -1) break; + const record = buffer.slice(0, endIdx); + buffer = buffer.slice(endIdx + delimLen); + const msg = parseRecord(record); + if (msg) yield msg; + } + } + buffer += decoder.end(); + // Any trailing record without a terminating blank line is treated as a final message, + // matching the looser behavior browsers exhibit on connection close. + if (buffer.trim()) { + const msg = parseRecord(buffer); + if (msg) yield msg; + } +} + +function parseRecord(record: string): SSEMessage | null { + const lines = record.split(/\r?\n/); + let event = 'message'; + let id: string | undefined; + let retry: number | undefined; + const dataLines: string[] = []; + for (const line of lines) { + if (line === '' || line.startsWith(':')) continue; + const colon = line.indexOf(':'); + const field = colon === -1 ? line : line.slice(0, colon); + // Per spec, a leading space after the colon is stripped. + let value = colon === -1 ? '' : line.slice(colon + 1); + if (value.startsWith(' ')) value = value.slice(1); + switch (field) { + case 'event': + event = value; + break; + case 'data': + dataLines.push(value); + break; + case 'id': + id = value; + break; + case 'retry': { + const n = Number(value); + if (Number.isFinite(n)) retry = n; + break; + } + } + } + if (dataLines.length === 0 && event === 'message') return null; + return { event, data: dataLines.join('\n'), id, retry }; +} + +interface RenderState { + currentPhase?: string; +} + +/** + * Render SSE deploy events as terse, line-oriented progress to stderr (so stdout stays + * reserved for the final JSON/YAML response document). Phase transitions print once. + */ +export function renderDeployProgress(message: SSEMessage, state: RenderState, output: NodeJS.WritableStream): void { + let parsed: unknown; + try { + parsed = JSON.parse(message.data); + } catch { + parsed = message.data; + } + switch (message.event) { + case 'phase': { + const p = parsed as { phase?: string; status?: string; rolling?: boolean }; + const label = p.phase ?? '?'; + if (p.status === 'start') { + if (state.currentPhase !== label) { + output.write(`${label}…\n`); + state.currentPhase = label; + } + } else if (p.status === 'done') { + output.write(`${label} done\n`); + } else if (p.status === 'error') { + const msg = (parsed as { message?: string }).message ?? 'failed'; + output.write(`${label} ERROR: ${msg}\n`); + } + break; + } + case 'error': { + const e = parsed as { message?: string; code?: string | number }; + output.write(`error: ${e.message ?? message.data}${e.code ? ` (${e.code})` : ''}\n`); + break; + } + case 'done': + // Caller picks up the final result via the SSE iterator; nothing to render here. + break; + } +} diff --git a/components/deploymentOperations.ts b/components/deploymentOperations.ts index 9c7e7aff7..418d1ad33 100644 --- a/components/deploymentOperations.ts +++ b/components/deploymentOperations.ts @@ -6,8 +6,11 @@ import { databases } from '../resources/databases.ts'; import * as terms from '../utility/hdbTerms.ts'; import { ClientError } from '../utility/errors/hdbError.ts'; +import { getActiveEmitter } from './deploymentRecorder.ts'; +import type { ProgressEmitter } from '../server/serverHelpers/progressEmitter.ts'; const DEPLOYMENT_TABLE = terms.SYSTEM_TABLE_NAMES.DEPLOYMENT_TABLE_NAME; +const TERMINAL_STATUSES = new Set(['success', 'failed', 'rolled_back']); interface ListRequest { project?: string; @@ -20,6 +23,8 @@ interface ListRequest { interface GetRequest { deployment_id: string; + // Set by serverHandlers.js when the client asks for `Accept: text/event-stream`. + progress?: ProgressEmitter; } function deploymentTable() { @@ -75,5 +80,94 @@ export async function handleGetDeployment(req: GetRequest): Promise { if (!row) { throw new ClientError(`No deployment found with id '${req.deployment_id}'`); } + + // SSE content-negotiated branch — when serverHandlers.js detects + // `Accept: text/event-stream` it attaches a ProgressEmitter as req.progress and wraps + // our return as the operation's final SSE event. We replay event_log on connect, then + // tail the deployment's live emitter (if it's still running on this node) until it + // reaches a terminal status. The final return value becomes the SSE `done` event. + if (req.progress) { + const sse = req.progress; + const liveEmitter = getActiveEmitter(req.deployment_id); + + // Subscribe FIRST, before reading the row, so any event the recorder emits between + // "now" and the moment we finish reading the historical log still lands. Buffer + // those live events; dedupe by recording the timestamp of the last replayed entry + // and skipping any live event whose timestamp is <= that. Forward everything else. + let lastReplayedTs = 0; + let resolveLive: (() => void) | null = null; + let liveDone = false; + const liveBuffer: Array<{ t: number; event: { event: string; data: unknown } }> = []; + const forwardLive = (e: { event: string; data: unknown }) => { + sse.emit(e.event, e.data); + // A terminal signal — either explicit success/error event from the lifecycle, or + // the recorder's `_recorder_finished` sentinel emitted before it unsubscribes. + const isTerminalEvent = + e.event === '_recorder_finished' || + e.event === 'error' || + (e.event === 'phase' && + e.data && + typeof e.data === 'object' && + (e.data as { phase?: string }).phase === 'success'); + if (isTerminalEvent && !liveDone) { + liveDone = true; + resolveLive?.(); + } + }; + const unsubscribe = liveEmitter + ? liveEmitter.subscribe((event) => { + const t = Date.now(); + if (resolveLive) { + // Replay phase finished; we own the live forward path now. + if (t > lastReplayedTs) forwardLive(event); + } else { + // Still replaying; stash for dedup-and-forward once replay completes. + liveBuffer.push({ t, event }); + } + }) + : null; + + try { + for (const entry of row.event_log ?? []) { + sse.emit(entry.event, entry.data); + if (typeof entry.t === 'number') lastReplayedTs = Math.max(lastReplayedTs, entry.t); + } + + if (liveEmitter && !TERMINAL_STATUSES.has(row.status)) { + await new Promise((resolve) => { + resolveLive = resolve; + // Flush anything that arrived during replay, filtering to events the replay missed. + for (const buffered of liveBuffer) { + if (buffered.t > lastReplayedTs) forwardLive(buffered.event); + } + if (liveDone) resolve(); + // Safety net — if the in-memory emitter is dropped (recorder finished or + // the process recycled) before signaling, poll the row's status as a + // fallback so the client never hangs indefinitely. + const pollTimer = setInterval(async () => { + if (liveDone) { + clearInterval(pollTimer); + return; + } + const live = getActiveEmitter(req.deployment_id); + if (!live || live !== liveEmitter) { + clearInterval(pollTimer); + const latest = await table.get(req.deployment_id); + if (latest && TERMINAL_STATUSES.has(latest.status) && !liveDone) { + liveDone = true; + resolve(); + } + } + }, 500); + }); + } + // Re-read the row so the final SSE payload reflects the post-deploy state. + const finalRow = await table.get(req.deployment_id); + return stripBlob(finalRow ?? row); + } finally { + unsubscribe?.(); + } + } + return stripBlob(row); } diff --git a/components/deploymentRecorder.ts b/components/deploymentRecorder.ts index fc221e753..9d5b78bfa 100644 --- a/components/deploymentRecorder.ts +++ b/components/deploymentRecorder.ts @@ -1,12 +1,12 @@ 'use strict'; -// DeploymentRecorder — Slice A scope. +// DeploymentRecorder — lifecycle owner for one row in system.hdb_deployment. // -// Owns the lifecycle of one row in system.hdb_deployment: creates the pending row at deploy -// start, streams the upload payload into the row's payload_blob (computing sha256 + size -// alongside), and writes the terminal status at the end. Slice B will extend this with -// ProgressEmitter subscription and event_log writes; Slice C will add rollback sourcing -// from the blob. +// Creates the pending row at deploy start, persists the upload payload into the row's +// payload_blob (with sha256 + size), and writes the terminal status at the end. +// Slice B (#641): subscribes to a ProgressEmitter so phase transitions and install lines +// land in event_log as they happen — making the deploy observable by Studio polling +// get_deployment without an attached CLI. Slice C will add rollback sourcing from the blob. import { randomUUID } from 'node:crypto'; import { createHash, Hash } from 'node:crypto'; @@ -16,6 +16,26 @@ import { createBlob } from '../resources/blob.ts'; import * as terms from '../utility/hdbTerms.ts'; import { ClientError } from '../utility/errors/hdbError.ts'; import { hostname } from 'node:os'; +import { ProgressEmitter } from '../server/serverHelpers/progressEmitter.ts'; + +// Bound the event_log so a pathologically chatty install can't grow a row without limit. +// Slice B emits a handful of phase events plus aggregated install summaries; 200 entries +// comfortably covers a real deploy with headroom. When we exceed the cap, drop the middle +// rather than the front — the lifecycle spine (prepare → load → replicate → success) is +// the most valuable context for debugging, and naive front-truncation loses it under a +// chatty `npm install`. +const EVENT_LOG_MAX = 200; +const EVENT_LOG_HEAD_KEEP = 20; + +// In-memory registry of live emitters, keyed by deployment_id. Populated for the lifetime +// of an in-progress deploy on the origin node; get_deployment SSE looks here to tail live +// events after replaying event_log. Per-node, not replicated — peers don't see another +// node's in-progress emitters. Slice B1 scope; cross-node tailing is a later concern. +const activeEmitters = new Map(); + +export function getActiveEmitter(deploymentId: string): ProgressEmitter | undefined { + return activeEmitters.get(deploymentId); +} // Slice A buffers the entire payload in memory before computing the hash and persisting. // This cap prevents an OOM on accidentally-huge uploads while Slice B is in flight. Slice B @@ -40,6 +60,7 @@ interface CreateOptions { user?: string; restart_mode?: 'immediate' | 'rolling' | null; rollback_of?: string | null; + emitter?: ProgressEmitter; } export class DeploymentRecorder { @@ -48,6 +69,9 @@ export class DeploymentRecorder { private hash: Hash | null = null; private byteCount = 0; private finished = false; + private unsubscribe: (() => void) | null = null; + private pendingPut: Promise | null = null; + private dirty = false; private constructor(deploymentId: string, initial: Record) { this.deploymentId = deploymentId; @@ -78,9 +102,68 @@ export class DeploymentRecorder { }; const recorder = new DeploymentRecorder(deploymentId, record); await recorder.put(); + if (options.emitter) { + recorder.subscribeTo(options.emitter); + activeEmitters.set(deploymentId, options.emitter); + } return recorder; } + /** + * Subscribe to a ProgressEmitter. Each event is appended (bounded) to event_log; phase + * events also update the row's `status` and `phase` fields. Writes coalesce: a put is + * always pending after the first event in a burst, so chatty install output collapses + * to one row update per ~100ms instead of one per line. + */ + private subscribeTo(emitter: ProgressEmitter): void { + this.unsubscribe = emitter.subscribe((event) => { + this.appendEvent(event.event, event.data); + }); + } + + private appendEvent(event: string, data: unknown): void { + if (this.finished) return; + const log = this.record.event_log as Array>; + log.push({ t: Date.now(), event, data }); + // Keep the head (lifecycle spine) and tail (most-recent activity); drop the middle. + if (log.length > EVENT_LOG_MAX) { + const tailKeep = EVENT_LOG_MAX - EVENT_LOG_HEAD_KEEP - 1; // -1 for the truncation marker + const removedCount = log.length - EVENT_LOG_HEAD_KEEP - tailKeep; + log.splice(EVENT_LOG_HEAD_KEEP, log.length - EVENT_LOG_HEAD_KEEP - tailKeep, { + t: Date.now(), + event: 'truncated', + data: { dropped_events: removedCount }, + }); + } + // Phase events drive the canonical status/phase fields used by list_deployments. + if (event === 'phase' && data && typeof data === 'object') { + const p = data as { phase?: string; status?: string }; + if (p.phase) this.record.phase = p.phase; + if (p.status === 'start') { + const mapped = startStatusFor(p.phase); + if (mapped) this.record.status = mapped; + } + } + this.scheduleFlush(); + } + + // Coalesce writes: at most one in-flight put at a time. While a put is running, mark + // the record dirty; the chained continuation issues a follow-up put once the prior one + // settles. This keeps event_log writes O(1) puts per burst rather than O(N) per event. + private scheduleFlush(): void { + if (this.pendingPut) { + this.dirty = true; + return; + } + this.pendingPut = this.put().finally(() => { + this.pendingPut = null; + if (this.dirty) { + this.dirty = false; + this.scheduleFlush(); + } + }); + } + /** * Drain a payload source (Buffer or Readable) into the row's payload_blob attribute, * computing sha256 and byte count alongside. After this resolves the row has been @@ -144,7 +227,23 @@ export class DeploymentRecorder { async finish(status: 'success' | 'failed' | 'rolled_back', error?: unknown): Promise { if (this.finished) return; + // Send a terminal sentinel through the emitter (if any) BEFORE we unsubscribe and + // remove it from the registry, so any SSE tail subscribers can resolve their wait + // even on a code path that doesn't emit an explicit `error` event. + const emitter = activeEmitters.get(this.deploymentId); + emitter?.emit('_recorder_finished', { status }); this.finished = true; + this.unsubscribe?.(); + this.unsubscribe = null; + activeEmitters.delete(this.deploymentId); + // Wait for any in-flight coalesced put before mutating + persisting the terminal state. + if (this.pendingPut) { + try { + await this.pendingPut; + } catch { + /* the next put surfaces the error */ + } + } this.record.status = status; this.record.completed_at = Date.now(); if (error) { @@ -172,3 +271,20 @@ export class DeploymentRecorder { await table.put(this.record); } } + +function startStatusFor(phase: string | undefined): DeploymentStatus | null { + switch (phase) { + case 'extract': + return 'extracting'; + case 'install': + return 'installing'; + case 'load': + return 'loading'; + case 'replicate': + return 'replicating'; + case 'restart': + return 'restarting'; + default: + return null; + } +} diff --git a/components/operations.js b/components/operations.js index 0f5cb9540..cc43a3e1b 100644 --- a/components/operations.js +++ b/components/operations.js @@ -19,6 +19,7 @@ const { Resources } = require('../resources/Resources.ts'); const { Application, prepareApplication } = require('./Application.ts'); const { server } = require('../server/Server.ts'); const { DeploymentRecorder } = require('./deploymentRecorder.ts'); +const { ProgressEmitter } = require('../server/serverHelpers/progressEmitter.ts'); /** * Read the settings.js file and return the @@ -388,13 +389,17 @@ async function deployComponent(req) { // Slice A of issue #641: create a hdb_deployment row up front so the deploy is // observable and auditable even if the CLI disconnects. The row also holds the payload - // in a Blob attribute — Slice B will use that for peer delivery; for now it's the - // audit record and the rollback source. + // in a Blob attribute — Slice B uses it as the rollback source. // // Only the origin node records — peers receiving a replicated deploy_component skip // recording so we don't accumulate one row per node for the same deploy. The row will // reach peers via the table's replication once Slice B has them consume it. const isReplicatedExecution = typeof req._deploymentId === 'string'; + // Slice B1 of #641: an SSE-bound caller already attached a ProgressEmitter (created in + // the server handler so it can also drive the response stream). Reuse it; otherwise + // spin up a fresh emitter so the recorder still gets phase events for non-SSE deploys. + const emitter = isReplicatedExecution ? null : (req.progress ?? new ProgressEmitter()); + if (emitter && !req.progress) req.progress = emitter; const recorder = isReplicatedExecution ? null : await DeploymentRecorder.create({ @@ -402,9 +407,12 @@ async function deployComponent(req) { package_identifier: req.package ?? null, user: req.hdb_user?.username, restart_mode: req.restart === 'rolling' ? 'rolling' : req.restart ? 'immediate' : null, + emitter, }); if (recorder) req._deploymentId = recorder.deploymentId; + const emit = (event, data) => emitter?.emit(event, data); + let extractionPayload = req.payload; try { // On the origin, tee the tarball (Buffer or Readable from the multipart parser) @@ -428,7 +436,9 @@ async function deployComponent(req) { }, }); + emit('phase', { phase: 'prepare', status: 'start' }); await prepareApplication(application); + emit('phase', { phase: 'prepare', status: 'done' }); // now we attempt to actually load the component in case there is // an error we can immediately detect and report, but app code should not run on the main thread @@ -439,6 +449,7 @@ async function deployComponent(req) { const componentLoader = require('./componentLoader.ts').default || require('./componentLoader.ts'); let lastError; componentLoader.setErrorReporter((error) => (lastError = error)); + emit('phase', { phase: 'load', status: 'start' }); await componentLoader.loadComponent( application.dirPath, pseudoResources, @@ -448,23 +459,34 @@ async function deployComponent(req) { false, req.project ); + emit('phase', { phase: 'load', status: 'done' }); if (lastError) throw lastError; } const rollingRestart = req.restart === 'rolling'; // if doing a rolling restart set restart to false so that other nodes don't also restart. req.restart = rollingRestart ? false : req.restart; + // ProgressEmitter holds function listeners that can't survive the replication + // channel's serialization, and the recorder is local to origin anyway. Strip both + // before sending so peers see a clean req. + delete req.progress; + emit('phase', { phase: 'replicate', status: 'start' }); let response = await server.replication.replicateOperation(req); + emit('phase', { phase: 'replicate', status: 'done' }); if (req.restart === true) { + emit('phase', { phase: 'restart', status: 'start' }); manageThreads.restartWorkers('http'); + emit('phase', { phase: 'restart', status: 'done' }); response.message = `Successfully deployed: ${application.name}, restarting Harper`; } else if (rollingRestart) { const serverUtilities = require('../server/serverHelpers/serverUtilities.ts'); + emit('phase', { phase: 'restart', status: 'start' }); const jobResponse = await serverUtilities.executeJob({ operation: 'restart_service', service: 'http', replicated: true, }); + emit('phase', { phase: 'restart', status: 'done' }); response.restartJobId = jobResponse.job_id; response.message = `Successfully deployed: ${application.name}, restarting Harper`; @@ -472,10 +494,16 @@ async function deployComponent(req) { if (recorder) { response.deployment_id = recorder.deploymentId; + emit('phase', { phase: 'success', status: 'done' }); await recorder.finish('success'); } return response; } catch (err) { + emit('error', { + message: err?.message ?? String(err), + code: err?.statusCode ?? err?.code, + phase: recorder?.row.phase, + }); if (recorder) await recorder.finish('failed', err); throw err; } diff --git a/integrationTests/deploy/deploy-tracking-events.test.ts b/integrationTests/deploy/deploy-tracking-events.test.ts new file mode 100644 index 000000000..8225b60e8 --- /dev/null +++ b/integrationTests/deploy/deploy-tracking-events.test.ts @@ -0,0 +1,285 @@ +/** + * Deployment tracking — event_log and SSE replay/tail (Slice B1 of issue #641). + * + * Builds on the Slice A audit-record tests by exercising the new ProgressEmitter → + * DeploymentRecorder integration: every successful deploy should populate event_log with + * the lifecycle phases, and `get_deployment` with `Accept: text/event-stream` should + * replay those events to the client and close cleanly for a terminal deploy. + */ +import { suite, test, before, after } from 'node:test'; +import { ok, strictEqual } from 'node:assert/strict'; +import { join } from 'node:path'; +import { mkdtempSync, mkdirSync, writeFileSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { setTimeout as sleep } from 'node:timers/promises'; +import { request } from 'node:http'; +import { Readable } from 'node:stream'; + +import { startHarper, teardownHarper, type ContextWithHarper } from '@harperfast/integration-testing'; + +import { streamPackagedDirectory } from '../../dist/components/packageComponent.js'; +import { buildMultipartBody } from '../../dist/bin/multipartBuilder.js'; + +function postMultipart( + url: URL, + contentType: string, + body: Readable, + auth: { username: string; password: string } +): Promise<{ status: number; body: string }> { + return new Promise((resolve, reject) => { + const req = request( + { + protocol: url.protocol, + hostname: url.hostname, + port: url.port, + method: 'POST', + path: '/', + headers: { + 'Content-Type': contentType, + 'Transfer-Encoding': 'chunked', + 'Authorization': 'Basic ' + Buffer.from(`${auth.username}:${auth.password}`).toString('base64'), + }, + }, + (res) => { + res.setEncoding('utf8'); + let buf = ''; + res.on('data', (chunk) => (buf += chunk)); + res.on('end', () => resolve({ status: res.statusCode ?? 0, body: buf })); + } + ); + req.on('error', reject); + body.on('error', reject); + body.pipe(req); + }); +} + +async function callOperation( + ctx: ContextWithHarper, + op: Record, + headers: Record = {} +): Promise<{ status: number; body: any; rawText: string; contentType: string }> { + const url = new URL(ctx.harper.operationsAPIURL); + const auth = 'Basic ' + Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64'); + const res = await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'Authorization': auth, ...headers }, + body: JSON.stringify(op), + }); + const text = await res.text(); + let parsed: any = text; + try { + parsed = JSON.parse(text); + } catch { + // SSE responses are line-oriented text; the caller will parse them. + } + return { status: res.status, body: parsed, rawText: text, contentType: res.headers.get('content-type') ?? '' }; +} + +suite('Deployment tracking — events + SSE', (ctx: ContextWithHarper) => { + let fixtureDir: string; + let deploymentId: string; + + before(async () => { + await startHarper(ctx); + fixtureDir = mkdtempSync(join(tmpdir(), 'deploy-events-fixture-')); + writeFileSync( + join(fixtureDir, 'config.yaml'), + 'static:\n files: web\ngraphqlSchema:\n files: schema.graphql\nrest: true\n' + ); + writeFileSync(join(fixtureDir, 'schema.graphql'), 'type Query { hello: String }\n'); + mkdirSync(join(fixtureDir, 'web'), { recursive: true }); + writeFileSync(join(fixtureDir, 'web', 'index.html'), '

Hello, Events!

'); + }); + + after(async () => { + try { + rmSync(fixtureDir, { recursive: true, force: true }); + } catch { + // best-effort + } + await teardownHarper(ctx); + }); + + test('verify Harper', async () => { + const response = await fetch(`${ctx.harper.operationsAPIURL}/health`); + strictEqual(response.status, 200); + }); + + test('successful deploy populates event_log with the lifecycle phases', async () => { + const project = 'events-test-application'; + const multipart = buildMultipartBody( + { operation: 'deploy_component', project, restart: false }, + { + name: 'payload', + filename: 'package.tar.gz', + contentType: 'application/gzip', + stream: streamPackagedDirectory(fixtureDir, { skip_node_modules: true }), + } + ); + const url = new URL(ctx.harper.operationsAPIURL); + const response = await postMultipart(url, multipart.contentType, multipart.stream, ctx.harper.admin); + strictEqual(response.status, 200, `expected 200, got ${response.status}: ${response.body}`); + const result = JSON.parse(response.body); + deploymentId = result.deployment_id; + ok(deploymentId, 'deploy response should include deployment_id'); + + // Coalesced writes settle on a microtask boundary; give them a beat. + await sleep(200); + + const got = await callOperation(ctx, { operation: 'get_deployment', deployment_id: deploymentId }); + strictEqual(got.status, 200); + const row = got.body; + ok(Array.isArray(row.event_log), 'event_log should be an array'); + ok(row.event_log.length >= 2, `expected at least 2 events, got ${row.event_log.length}`); + const phases = row.event_log.filter((e: any) => e.event === 'phase').map((e: any) => e.data?.phase); + // We emit prepare → (load) → replicate → success in the lifecycle. Verify the spine. + ok(phases.includes('prepare'), `event_log should include a prepare phase: ${phases.join(',')}`); + ok(phases.includes('replicate'), `event_log should include a replicate phase: ${phases.join(',')}`); + }); + + test('get_deployment with Accept: text/event-stream replays event_log and closes cleanly', async () => { + // Already terminal at this point — Slice B1's SSE branch should replay event_log + // then return the final record as the `done` event. + const got = await callOperation( + ctx, + { operation: 'get_deployment', deployment_id: deploymentId }, + { Accept: 'text/event-stream' } + ); + strictEqual(got.status, 200, `expected 200, got ${got.status}: ${got.rawText}`); + ok(got.contentType.startsWith('text/event-stream'), `expected SSE content-type, got: ${got.contentType}`); + + const text = got.rawText; + // Each SSE record is separated by a blank line. Count phase events. + const records = text.split(/\r?\n\r?\n/).filter((r) => r.includes('event:')); + ok( + records.length >= 2, + `expected at least 2 SSE records (events + done), got ${records.length}.\nraw SSE:\n${text}` + ); + ok( + records.some((r) => r.includes('event: phase')), + `expected at least one phase event in SSE replay.\nraw SSE:\n${text}` + ); + ok( + records.some((r) => r.includes('event: done')), + `expected a final done event in SSE replay.\nraw SSE:\n${text}` + ); + }); + + test('get_deployment SSE tails live events on an in-flight deploy', async () => { + // This test exercises the live-tail branch: subscribe before the deploy completes, + // then assert that phase events arrive from the emitter (not just the historical replay). + const project = 'live-tail-test-application'; + const liveDir = mkdtempSync(join(tmpdir(), 'live-tail-fixture-')); + try { + // package.json is required so installApplication actually runs install_command + // (without it Harper logs "no package.json; skipping install" and the deploy + // completes in <100ms, before the first poll can catch the in-flight row). + writeFileSync( + join(liveDir, 'package.json'), + JSON.stringify({ name: project, version: '0.0.0', dependencies: {} }) + ); + writeFileSync(join(liveDir, 'config.yaml'), 'rest: true\n'); + const multipart = buildMultipartBody( + { + operation: 'deploy_component', + project, + restart: false, + // A 3-second sleep gives us a window to attach the SSE tail before the + // deploy reaches a terminal status. The row exists after the multipart + // upload completes (before prepareApplication), so polling finds it quickly. + install_command: 'sleep 3', + install_timeout: 30_000, + }, + { + name: 'payload', + filename: 'package.tar.gz', + contentType: 'application/gzip', + stream: streamPackagedDirectory(liveDir, { skip_node_modules: true }), + } + ); + const url = new URL(ctx.harper.operationsAPIURL); + + // Start the deploy without awaiting — the HTTP response comes back only after the + // deploy finishes, so this promise stays pending for ~3+ seconds. + const deployPromise = postMultipart(url, multipart.contentType, multipart.stream, ctx.harper.admin); + + // Poll list_deployments until we see the in-flight row. The recorder row is created + // after the multipart body is fully received (which is fast for a tiny fixture). + const TERMINAL = new Set(['success', 'failed', 'rolled_back']); + let inFlightId: string | null = null; + for (let i = 0; i < 15 && !inFlightId; i++) { + await sleep(300); + const listed = await callOperation(ctx, { operation: 'list_deployments', project }); + const rows: any[] = listed.body?.deployments ?? []; + const inFlight = rows.find((d: any) => !TERMINAL.has(d.status)); + if (inFlight) inFlightId = inFlight.deployment_id; + } + ok(inFlightId, 'expected to find an in-flight deployment row within the polling window'); + + // Open SSE tail while the deploy is still running; fetch blocks until the SSE + // stream closes (when the deploy reaches a terminal status and the server ends it). + const [deployResult, sseResult] = await Promise.all([ + deployPromise, + callOperation(ctx, { operation: 'get_deployment', deployment_id: inFlightId }, { Accept: 'text/event-stream' }), + ]); + + strictEqual(deployResult.status, 200, `deploy should succeed: ${deployResult.body}`); + strictEqual(sseResult.status, 200); + ok(sseResult.contentType.startsWith('text/event-stream')); + + const records = sseResult.rawText.split(/\r?\n\r?\n/).filter((r) => r.includes('event:')); + ok( + records.some((r) => r.includes('event: phase')), + `expected at least one phase event in SSE stream.\nraw SSE:\n${sseResult.rawText}` + ); + ok( + records.some((r) => r.includes('event: done')), + `expected a final done event in SSE stream.\nraw SSE:\n${sseResult.rawText}` + ); + } finally { + rmSync(liveDir, { recursive: true, force: true }); + } + }); + + test('failed deploy event_log captures the error event', async () => { + const project = 'broken-events-application'; + const brokenDir = mkdtempSync(join(tmpdir(), 'broken-events-fixture-')); + try { + writeFileSync( + join(brokenDir, 'package.json'), + JSON.stringify({ name: project, version: '0.0.0', dependencies: {} }) + ); + writeFileSync(join(brokenDir, 'config.yaml'), 'rest: true\n'); + const multipart = buildMultipartBody( + { + operation: 'deploy_component', + project, + restart: false, + install_command: 'sh -c "exit 1"', + install_timeout: 30_000, + }, + { + name: 'payload', + filename: 'package.tar.gz', + contentType: 'application/gzip', + stream: streamPackagedDirectory(brokenDir, { skip_node_modules: true }), + } + ); + const url = new URL(ctx.harper.operationsAPIURL); + const response = await postMultipart(url, multipart.contentType, multipart.stream, ctx.harper.admin); + ok(response.status >= 400, `expected an error response, got ${response.status}`); + + await sleep(200); + const listed = await callOperation(ctx, { operation: 'list_deployments', project }); + const failed = listed.body.deployments.find((d: any) => d.project === project); + ok(failed, 'expected to find a failed deployment row'); + strictEqual(failed.status, 'failed'); + const errorEvents = (failed.event_log ?? []).filter((e: any) => e.event === 'error'); + ok(errorEvents.length >= 1, 'expected at least one error event in event_log'); + } finally { + try { + rmSync(brokenDir, { recursive: true, force: true }); + } catch {} + } + }); +}); diff --git a/server/serverHelpers/progressEmitter.ts b/server/serverHelpers/progressEmitter.ts new file mode 100644 index 000000000..7f8659b1e --- /dev/null +++ b/server/serverHelpers/progressEmitter.ts @@ -0,0 +1,110 @@ +import { PassThrough, Readable } from 'node:stream'; + +export interface ProgressEvent { + event: string; + data: unknown; +} + +export type ProgressListener = (event: ProgressEvent) => void; + +/** + * Lightweight pub-sub used to report phase/install/replicate events from a long-running + * operation back to the HTTP layer. We deliberately don't use Node's EventEmitter here: + * we only need broadcast semantics for a small set of event types, and we want the + * `emit(event, data)` shape that matches the SSE wire format directly. + */ +export class ProgressEmitter { + private listeners: ProgressListener[] = []; + + emit(event: string, data: unknown): void { + // Snapshot before iteration so a listener that unsubscribes itself during dispatch + // doesn't shift indexes underneath us. + const snapshot = this.listeners.slice(); + for (const listener of snapshot) { + try { + listener({ event, data }); + } catch { + // A buggy listener must never break the operation. Swallow and continue. + } + } + } + + subscribe(listener: ProgressListener): () => void { + this.listeners.push(listener); + return () => { + const i = this.listeners.indexOf(listener); + if (i !== -1) this.listeners.splice(i, 1); + }; + } +} + +/** + * Wrap a long-running operation so its progress events stream back as Server-Sent Events. + * + * The returned Readable emits one SSE message per `emitter.emit(...)` call, then a final + * `done` (or `error`) event with the operation's result, then ends. The caller is + * expected to set Content-Type: text/event-stream on the response. + */ +export function createSSEResponseStream(emitter: ProgressEmitter, operation: () => Promise): Readable { + const stream = new PassThrough(); + // Prime the stream with an SSE comment line so the response body is non-empty by the time + // Fastify starts piping. Without this, Fastify buffers the PassThrough's internal queue + // until its end and only flushes the final chunk to the wire — making intermediate + // progress events invisible to the client. The comment ": ..." is a valid SSE record + // that consumers ignore, so it's safe filler. + stream.write(`: stream open\n\n`); + + let active = true; + let errorEmitted = false; + const unsubscribe = emitter.subscribe((event) => { + if (active) { + writeSSE(stream, event); + if (event.event === 'error') errorEmitted = true; + } + }); + + const cleanup = () => { + if (active) { + active = false; + unsubscribe(); + } + }; + + // If the client disconnects (Ctrl-C, network drop) stop writing to the stream and + // release the emitter subscription so it doesn't accumulate for the operation lifetime. + stream.on('close', cleanup); + stream.on('end', cleanup); + + operation() + .then((result) => { + if (active) writeSSE(stream, { event: 'done', data: { result } }); + }) + .catch((err) => { + // Only emit a framework-level error event if the operation itself didn't already + // emit one (with richer context like phase) through the emitter subscriber above. + if (active && !errorEmitted) { + writeSSE(stream, { + event: 'error', + data: { + message: err?.message ?? String(err), + code: err?.statusCode ?? err?.code, + }, + }); + } + }) + .finally(() => { + cleanup(); + stream.end(); + }); + + return stream; +} + +function writeSSE(stream: PassThrough, event: ProgressEvent): void { + const data = typeof event.data === 'string' ? event.data : JSON.stringify(event.data); + stream.write(`event: ${event.event}\n`); + for (const line of data.split(/\r?\n/)) { + stream.write(`data: ${line}\n`); + } + stream.write('\n'); +} diff --git a/server/serverHelpers/serverHandlers.js b/server/serverHelpers/serverHandlers.js index 692c9fbc4..60160629e 100644 --- a/server/serverHelpers/serverHandlers.js +++ b/server/serverHelpers/serverHandlers.js @@ -15,6 +15,13 @@ const pAuthorize = util.promisify(auth.authorize); const serverUtilities = require('./serverUtilities.ts'); const { applyImpersonation } = require('../../security/impersonation.ts'); const { createGzip, constants } = require('zlib'); +const { ProgressEmitter, createSSEResponseStream } = require('./progressEmitter.ts'); + +// Operations that support `Accept: text/event-stream` for live progress streaming. The +// handler attaches a ProgressEmitter as req.body.progress so the operation can emit phase +// events; the response body is the SSE-encoded emitter output. Non-SSE clients see the +// historical single-response shape because progress is undefined on that path. +const SSE_PROGRESS_OPERATIONS = new Set([terms.OPERATIONS_ENUM.DEPLOY_COMPONENT, terms.OPERATIONS_ENUM.GET_DEPLOYMENT]); const NO_AUTH_OPERATIONS = [ terms.OPERATIONS_ENUM.CREATE_AUTHENTICATION_TOKENS, @@ -119,6 +126,25 @@ async function handlePostRequest(req, res, _bypassAuth = false) { if (req.body.bypass_auth) delete req.body.bypass_auth; operation_function = serverUtilities.chooseOperation(req.body); + + // SSE progress branch — when the client asks for `text/event-stream` on an operation + // that supports it, run the operation in the background and stream events back as they + // happen. The progress emitter is attached to req.body so the operation handler can + // emit without changing its return signature; non-SSE callers leave `progress` + // undefined and the handler stays on its synchronous result path. + // Optional-chained `req.headers` because unit tests dispatch through this path with + // synthetic req shapes that don't set headers; production Fastify requests always do. + // `Accept` parsing only checks for the text/event-stream token; allow comma-separated + // values like `text/event-stream, application/json` and quality params per RFC 7231. + if (req.headers?.accept?.includes('text/event-stream') && SSE_PROGRESS_OPERATIONS.has(req.body.operation)) { + const emitter = new ProgressEmitter(); + req.body.progress = emitter; + res.header('Content-Type', 'text/event-stream'); + res.header('Cache-Control', 'no-cache'); + res.header('X-Accel-Buffering', 'no'); // disable proxy buffering so events flush in real time + return createSSEResponseStream(emitter, () => serverUtilities.processLocalTransaction(req, operation_function)); + } + let result = await serverUtilities.processLocalTransaction(req, operation_function); if (result instanceof Readable && result.headers) { for (let [name, value] of result.headers) { diff --git a/unitTests/bin/deployRenderer.test.js b/unitTests/bin/deployRenderer.test.js new file mode 100644 index 000000000..9ab450b95 --- /dev/null +++ b/unitTests/bin/deployRenderer.test.js @@ -0,0 +1,108 @@ +'use strict'; + +const assert = require('node:assert'); +const { PassThrough } = require('node:stream'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { DeployRenderer } = require('#src/bin/deployRenderer'); + +function makeOutput() { + const lines = []; + const stream = { write: (s) => lines.push(s), isTTY: false }; + return { stream, lines }; +} + +describe('DeployRenderer', () => { + describe('upload progress (non-TTY)', () => { + it('advances bytes as data flows through the tap and logs at 10% steps', async () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ uploadTotal: 1_000_000, output: stream }); + const source = new PassThrough(); + const tap = renderer.tapUploadStream(source); + // Drain into a sink so backpressure doesn't pause the tap. + const sink = new PassThrough(); + tap.pipe(sink); + sink.on('data', () => {}); + // Write 5×200_000 byte chunks → 1MB total, crossing each 10% threshold. + for (let i = 0; i < 5; i++) source.write(Buffer.alloc(200_000)); + source.end(); + await new Promise((resolve) => sink.on('end', resolve)); + renderer.endUpload(); + // Expect at least one per-10% progress line plus the final "Upload complete" line. + const progressLines = lines.filter((l) => /Uploaded /.test(l)); + assert.ok( + progressLines.length >= 4, + `expected multiple progress lines, got ${progressLines.length}: ${progressLines.join('|')}` + ); + assert.ok( + lines.some((l) => l.startsWith('Upload complete')), + 'should log Upload complete on endUpload' + ); + }); + + it('endUpload is idempotent', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ uploadTotal: 100, output: stream }); + renderer.tapUploadStream(new PassThrough()); + renderer.endUpload(); + renderer.endUpload(); + const completeLines = lines.filter((l) => l.startsWith('Upload complete')); + assert.strictEqual(completeLines.length, 1); + }); + }); + + describe('renderEvent', () => { + function sseMessage(event, data) { + return { event, data: JSON.stringify(data) }; + } + + it('prints a phase line on start, then on done (no duplicate start)', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('phase', { phase: 'extract', status: 'start' })); + renderer.renderEvent(sseMessage('phase', { phase: 'extract', status: 'start' })); + renderer.renderEvent(sseMessage('phase', { phase: 'extract', status: 'done' })); + assert.deepStrictEqual(lines, ['extract…\n', 'extract done\n']); + }); + + it('forwards install stdout/stderr lines and headers the manager name once', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('phase', { phase: 'install', status: 'start' })); + renderer.renderEvent(sseMessage('install', { manager: 'npm', stream: 'stdout', line: 'added 42 packages' })); + renderer.renderEvent(sseMessage('install', { manager: 'npm', stream: 'stdout', line: 'audited 100 packages' })); + renderer.renderEvent( + sseMessage('install', { manager: 'npm', stream: 'stderr', line: 'npm warn deprecated foo' }) + ); + renderer.renderEvent(sseMessage('phase', { phase: 'install', status: 'done' })); + assert.ok(lines.includes('install: using npm\n'), 'expected manager header'); + assert.strictEqual( + lines.filter((l) => l === 'install: using npm\n').length, + 1, + 'manager header logged only once' + ); + assert.ok(lines.includes(' | added 42 packages\n')); + assert.ok(lines.includes(' | audited 100 packages\n')); + assert.ok(lines.includes(' ! npm warn deprecated foo\n'), 'stderr should be tagged with !'); + assert.ok( + lines.some((l) => /install done \(3 log lines\)/.test(l)), + 'install-done should count emitted lines' + ); + }); + + it('renders an error event with code suffix', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('error', { message: 'npm install failed', code: 500 })); + assert.match(lines[0], /error: npm install failed \(500\)/); + }); + + it('renders a phase-error with the message', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('phase', { phase: 'install', status: 'error', message: 'exit code 1' })); + assert.match(lines[0], /install ERROR: exit code 1/); + }); + }); +}); diff --git a/unitTests/bin/sseConsumer.test.js b/unitTests/bin/sseConsumer.test.js new file mode 100644 index 000000000..2bc1c25a2 --- /dev/null +++ b/unitTests/bin/sseConsumer.test.js @@ -0,0 +1,95 @@ +'use strict'; + +const assert = require('node:assert'); +const { Readable, PassThrough } = require('node:stream'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { parseSSE, renderDeployProgress } = require('#src/bin/sseConsumer'); + +async function collectMessages(stream) { + const out = []; + for await (const msg of parseSSE(stream)) out.push(msg); + return out; +} + +describe('parseSSE', () => { + it('parses a single record', async () => { + const stream = Readable.from(['event: phase\ndata: {"phase":"extract"}\n\n']); + const msgs = await collectMessages(stream); + assert.deepStrictEqual(msgs, [{ event: 'phase', data: '{"phase":"extract"}', id: undefined, retry: undefined }]); + }); + + it('joins multi-line data fields with \\n per the SSE spec', async () => { + const stream = Readable.from(['event: install\ndata: line1\ndata: line2\n\n']); + const msgs = await collectMessages(stream); + assert.strictEqual(msgs[0].data, 'line1\nline2'); + }); + + it('handles records split across multiple chunks', async () => { + const stream = new PassThrough(); + const collector = collectMessages(stream); + stream.write('event: phase\n'); + stream.write('data: {"phase":"extract","sta'); + stream.write('tus":"start"}\n\n'); + stream.write('event: phase\ndata: {"phase":"extract","status":"done"}\n\n'); + stream.end(); + const msgs = await collector; + assert.strictEqual(msgs.length, 2); + assert.strictEqual(JSON.parse(msgs[0].data).status, 'start'); + assert.strictEqual(JSON.parse(msgs[1].data).status, 'done'); + }); + + it('handles CRLF line endings', async () => { + const stream = Readable.from(['event: phase\r\ndata: {"x":1}\r\n\r\n']); + const msgs = await collectMessages(stream); + assert.strictEqual(msgs[0].event, 'phase'); + assert.deepStrictEqual(JSON.parse(msgs[0].data), { x: 1 }); + }); + + it('strips a single leading space after the colon, per spec', async () => { + const stream = Readable.from(['data: hello\n\n']); + const msgs = await collectMessages(stream); + // only ONE space is stripped, the second remains + assert.strictEqual(msgs[0].data, ' hello'); + }); + + it('ignores comment lines (leading colon)', async () => { + const stream = Readable.from([': heartbeat\nevent: ping\ndata: ok\n\n']); + const msgs = await collectMessages(stream); + assert.strictEqual(msgs[0].event, 'ping'); + assert.strictEqual(msgs[0].data, 'ok'); + }); +}); + +describe('renderDeployProgress', () => { + it('prints a phase line on start and another on done', () => { + const lines = []; + const out = { write: (s) => lines.push(s) }; + const state = {}; + renderDeployProgress({ event: 'phase', data: JSON.stringify({ phase: 'extract', status: 'start' }) }, state, out); + renderDeployProgress({ event: 'phase', data: JSON.stringify({ phase: 'extract', status: 'done' }) }, state, out); + assert.deepStrictEqual(lines, ['extract…\n', 'extract done\n']); + }); + + it('does not repeat the same phase-start line', () => { + const lines = []; + const out = { write: (s) => lines.push(s) }; + const state = {}; + const msg = { event: 'phase', data: JSON.stringify({ phase: 'extract', status: 'start' }) }; + renderDeployProgress(msg, state, out); + renderDeployProgress(msg, state, out); + assert.strictEqual(lines.length, 1); + }); + + it('prints an error line with the message', () => { + const lines = []; + const out = { write: (s) => lines.push(s) }; + renderDeployProgress( + { event: 'error', data: JSON.stringify({ message: 'npm install failed', code: 500 }) }, + {}, + out + ); + assert.match(lines[0], /error: npm install failed \(500\)/); + }); +}); diff --git a/unitTests/server/serverHelpers/progressEmitter.test.js b/unitTests/server/serverHelpers/progressEmitter.test.js new file mode 100644 index 000000000..d2cffbb14 --- /dev/null +++ b/unitTests/server/serverHelpers/progressEmitter.test.js @@ -0,0 +1,140 @@ +'use strict'; + +const assert = require('node:assert'); +const { Readable } = require('node:stream'); +const testUtils = require('../../testUtils.js'); +testUtils.preTestPrep(); + +const { ProgressEmitter, createSSEResponseStream } = require('#src/server/serverHelpers/progressEmitter'); + +function collect(stream) { + return new Promise((resolve, reject) => { + const chunks = []; + stream.on('data', (c) => chunks.push(c)); + stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); + stream.on('error', reject); + }); +} + +function parseSSEBlocks(text) { + return ( + text + .split('\n\n') + .filter((block) => block.trim().length > 0) + .map((block) => { + const out = {}; + for (const line of block.split('\n')) { + const colon = line.indexOf(':'); + if (colon === -1) continue; + const field = line.slice(0, colon); + let value = line.slice(colon + 1); + if (value.startsWith(' ')) value = value.slice(1); + out[field] = value; + } + return out; + }) + // Drop SSE comment records (lines starting with `:`) — they're protocol-level + // liveness hints that the stream is open, not application events. + .filter((rec) => 'event' in rec || 'data' in rec) + ); +} + +describe('ProgressEmitter', () => { + it('delivers events to every subscriber', () => { + const emitter = new ProgressEmitter(); + const a = []; + const b = []; + emitter.subscribe((e) => a.push(e)); + emitter.subscribe((e) => b.push(e)); + emitter.emit('phase', { phase: 'extract', status: 'start' }); + emitter.emit('phase', { phase: 'extract', status: 'done' }); + assert.deepStrictEqual(a, [ + { event: 'phase', data: { phase: 'extract', status: 'start' } }, + { event: 'phase', data: { phase: 'extract', status: 'done' } }, + ]); + assert.deepStrictEqual(b, a); + }); + + it('unsubscribe stops further delivery', () => { + const emitter = new ProgressEmitter(); + const received = []; + const unsubscribe = emitter.subscribe((e) => received.push(e)); + emitter.emit('phase', { phase: 'extract', status: 'start' }); + unsubscribe(); + emitter.emit('phase', { phase: 'extract', status: 'done' }); + assert.strictEqual(received.length, 1); + }); + + it('swallows listener exceptions so operations are never broken by a buggy subscriber', () => { + const emitter = new ProgressEmitter(); + emitter.subscribe(() => { + throw new Error('listener boom'); + }); + const ok = []; + emitter.subscribe((e) => ok.push(e)); + emitter.emit('phase', { phase: 'extract', status: 'start' }); + assert.strictEqual(ok.length, 1); + }); +}); + +describe('createSSEResponseStream', () => { + it('streams emitter events then a terminating `done` event with the operation result', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => { + emitter.emit('phase', { phase: 'extract', status: 'start' }); + await new Promise((r) => setImmediate(r)); + emitter.emit('phase', { phase: 'extract', status: 'done' }); + return { message: 'Successfully deployed: demo' }; + }); + const text = await collect(stream); + const events = parseSSEBlocks(text); + assert.strictEqual(events.length, 3); + assert.strictEqual(events[0].event, 'phase'); + assert.deepStrictEqual(JSON.parse(events[0].data), { phase: 'extract', status: 'start' }); + assert.strictEqual(events[1].event, 'phase'); + assert.deepStrictEqual(JSON.parse(events[1].data), { phase: 'extract', status: 'done' }); + assert.strictEqual(events[2].event, 'done'); + assert.deepStrictEqual(JSON.parse(events[2].data), { result: { message: 'Successfully deployed: demo' } }); + }); + + it('streams an `error` event when the operation rejects', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => { + emitter.emit('phase', { phase: 'extract', status: 'start' }); + const err = new Error('boom'); + err.statusCode = 500; + throw err; + }); + const events = parseSSEBlocks(await collect(stream)); + assert.strictEqual(events[events.length - 1].event, 'error'); + const errData = JSON.parse(events[events.length - 1].data); + assert.strictEqual(errData.message, 'boom'); + assert.strictEqual(errData.code, 500); + }); + + it('does not emit a second error event when the operation already emitted one before throwing', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => { + // Simulates operations.js: emit a rich error event (with phase context) then throw. + emitter.emit('error', { message: 'install failed', code: 500, phase: 'install' }); + const err = new Error('install failed'); + err.statusCode = 500; + throw err; + }); + const events = parseSSEBlocks(await collect(stream)); + const errorEvents = events.filter((e) => e.event === 'error'); + assert.strictEqual(errorEvents.length, 1, `expected exactly 1 error event, got ${errorEvents.length}`); + assert.deepStrictEqual(JSON.parse(errorEvents[0].data), { message: 'install failed', code: 500, phase: 'install' }); + }); + + it('still closes the stream cleanly when the operation emits nothing', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => ({ ok: true })); + const events = parseSSEBlocks(await collect(stream)); + assert.strictEqual(events.length, 1); + assert.strictEqual(events[0].event, 'done'); + }); +}); + +// keep Readable import live for any future tests that need stream sources +void Readable; diff --git a/utility/common_utils.ts b/utility/common_utils.ts index 8b9b14819..26b5e8dd1 100644 --- a/utility/common_utils.ts +++ b/utility/common_utils.ts @@ -756,8 +756,24 @@ export function httpRequest(options: any, data: any): Promise { const req = client.request(options, (response: http.IncomingMessage & { body?: string }) => { + if (streamResponse) { + // Hand the raw stream to the caller; do not setEncoding so binary-safe consumers + // (or SSE parsers that prefer Buffers) still work. + resolve(response); + return; + } response.setEncoding('utf8'); response.body = ''; response.on('data', (chunk) => {