Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 72 additions & 8 deletions bin/cliOperations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,20 @@ import * as fs from 'fs-extra';
import * as YAML from 'yaml';
import { streamPackagedDirectory } from '../components/packageComponent.ts';
import { buildMultipartBody } from './multipartBuilder.ts';
import { parseSSE } from './sseConsumer.ts';
import { DeployRenderer } from './deployRenderer.ts';
import { getHdbPid } from '../utility/processManagement/processManagement.js';
import { initConfig, getConfigPath } from '../config/configUtils.js';

const OP_ALIASES = { deploy: 'deploy_component', package: 'package_component' };

// Operations whose responses should be consumed as text/event-stream so live phase events
// (prepare, load, replicate, restart) render as they happen instead of after the whole
// deploy completes. Add an operation here only after wiring its server-side
// SSE_PROGRESS_OPERATIONS entry — otherwise the server returns the buffered JSON path and
// the SSE parser sees no events.
const SSE_OPERATIONS = new Set(['deploy_component']);

// Properties on `req` that the CLI itself uses for transport/UX, not the operations API.
// They never get serialized into the request body.
const TRANSPORT_ONLY_FIELDS = new Set([
Expand Down Expand Up @@ -191,6 +200,15 @@ async function cliOperations(req: any, skipResponseLog = false) {
options.headers.Authorization = `Bearer ${tokens.operation_token}`;
}
}
const useSse = SSE_OPERATIONS.has(req.operation);
if (useSse) {
options.headers.Accept = 'text/event-stream';
options.streamResponse = true;
}
// One renderer owns the (future) upload bar and the SSE event rendering for a
// multipart deploy. Created here so the upload-stream tap and the SSE consumer
// below share the same instance.
const renderer = req._multipart ? new DeployRenderer({}) : null;
let body;
if (req._multipart) {
const packageStream = req._packageStream;
Expand All @@ -209,20 +227,66 @@ async function cliOperations(req: any, skipResponseLog = false) {
// Use chunked transfer-encoding: we don't know the total size up front because the
// payload is streamed from `tar.pack` and never fully buffered.
options.headers['Transfer-Encoding'] = 'chunked';
body = multipart.stream;
// Tap the body so bytes flowing into the HTTP request advance the upload bar.
// The renderer's Transform is identity — chunks pass through unmodified.
body = renderer ? renderer.tapUploadStream(multipart.stream) : multipart.stream;
} else {
body = req;
}
let response: any = await httpRequest(options, body);

// Upload is done by the time we get the response; tear the bar down before any SSE
// rendering so the bar and event lines don't fight for the same terminal row.
renderer?.endUpload();

let responseData;
try {
responseData = JSON.parse(response.body);
} catch {
responseData = {
status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'),
body: response.body,
};
if (useSse && response.headers['content-type']?.startsWith('text/event-stream')) {
// Consume SSE: render phase events live, capture the final result from the `done`
// event (or the error message from the `error` event). The HTTP status stays 200
// until end-of-stream; failures are signaled in-band.
let finalResult;
let sseError;
for await (const message of parseSSE(response)) {
renderer?.renderEvent(message);
if (message.event === 'done') {
try {
finalResult = JSON.parse(message.data)?.result;
} catch {
finalResult = message.data;
}
} else if (message.event === 'error') {
try {
sseError = JSON.parse(message.data);
} catch {
sseError = { message: message.data };
}
}
}
if (sseError) {
const errMsg = sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError);
console.error(`error: ${errMsg}`);
process.exit(1);
}
responseData = finalResult ?? { message: 'Deploy completed (no result payload).' };
} else {
// When useSse is true, httpRequest returns a raw IncomingMessage (streamResponse mode),
// so .body is undefined. Drain the stream to get the text (e.g. a 401 error body).
let bodyText: string;
if (useSse) {
const chunks: Buffer[] = [];
for await (const chunk of response as AsyncIterable<Buffer>) chunks.push(Buffer.from(chunk));
bodyText = Buffer.concat(chunks).toString('utf8');
} else {
bodyText = response.body;
}
try {
responseData = JSON.parse(bodyText);
} catch {
responseData = {
status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'),
body: bodyText,
};
}
}

let responseLog;
Expand Down
187 changes: 187 additions & 0 deletions bin/deployRenderer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import { Transform } from 'node:stream';
// cli-progress is already a runtime dep of harper (see package.json); using its
// SingleBar to render the upload phase here doesn't add a new dependency.
import cliProgress from 'cli-progress';
import type { SSEMessage } from './sseConsumer.ts';

interface RendererOptions {
uploadTotal?: number;
output?: NodeJS.WritableStream;
}

interface UploadState {
bar: cliProgress.SingleBar | null;
sent: number;
textLastLogged: number;
finished: boolean;
}

interface PhaseState {
current?: string;
installManager?: string;
installLineCount: number;
}

/**
* Deploy-time renderer that owns the progress display across two phases:
*
* 1. Local upload — driven by `tapUploadStream`, which wraps the multipart body so we
* can update a `cli-progress` bar against the precomputed uncompressed source-tree
* total. In a non-TTY environment (CI logs, redirected output) we fall back to
* periodic text lines so logs stay grep-able.
*
* 2. Server-side phases — driven by `renderEvent`, called for each SSE message the
* CLI receives from the operations API. Phase events print one-liners; live
* `install` events (npm/pnpm/yarn stdout) are throttled to one line under a
* "[install]" header so a noisy `npm install` doesn't drown the terminal.
*
* Designed so the two phases hand off cleanly: `endUpload()` tears the bar down (so
* it doesn't compete with subsequent prints) before any SSE events render.
*/
export class DeployRenderer {
private upload: UploadState = { bar: null, sent: 0, textLastLogged: 0, finished: false };
private phase: PhaseState = { installLineCount: 0 };
private output: NodeJS.WritableStream;
private isTty: boolean;
private uploadTotal: number;

constructor(options: RendererOptions = {}) {
this.output = options.output ?? process.stderr;
// Only render a bar when stderr is a real terminal. CI runners, log redirection,
// and pipes look identical from Node's perspective: !isTTY.
this.isTty = Boolean((this.output as NodeJS.WriteStream).isTTY);
this.uploadTotal = options.uploadTotal ?? 0;
}

/**
* Wrap an outbound stream so each byte flowing through it advances the upload bar.
* The Transform is identity — chunks pass through unmodified.
*/
tapUploadStream<T extends NodeJS.ReadableStream>(stream: T): NodeJS.ReadableStream {
this.upload.bar = this.isTty
? new cliProgress.SingleBar(
{
format: 'Uploading [{bar}] {percentage}% | {value}/{total} bytes',
barCompleteChar: '█',
barIncompleteChar: '░',
hideCursor: true,
stream: this.output,
etaBuffer: 50,
},
cliProgress.Presets.shades_classic
)
: null;
this.upload.bar?.start(this.uploadTotal || 1, 0);

const counter = new Transform({
transform: (chunk, _enc, cb) => {
this.upload.sent += chunk.length;
this.tickUpload();
cb(null, chunk);
},
flush: (cb) => {
this.endUpload();
cb();
},
});
stream.on('error', (err) => counter.destroy(err));
stream.pipe(counter);
return counter;
}

endUpload(): void {
if (this.upload.finished) return;
this.upload.finished = true;
if (this.upload.bar) {
// Snap to total so the bar shows 100% even when our uncompressed-total estimate
// is slightly off (gzip output is usually smaller than the source tree).
if (this.uploadTotal > 0) this.upload.bar.update(this.uploadTotal);
this.upload.bar.stop();
this.upload.bar = null;
} else {
this.output.write(`Upload complete (${formatBytes(this.upload.sent)})\n`);
}
}

private tickUpload(): void {
if (this.upload.bar) {
this.upload.bar.update(this.upload.sent);
return;
}
// Non-TTY: log a line every 10% of the total (or every 5MB if total unknown).
const step = this.uploadTotal > 0 ? this.uploadTotal / 10 : 5 * 1024 * 1024;
if (this.upload.sent - this.upload.textLastLogged >= step) {
this.upload.textLastLogged = this.upload.sent;
const pct = this.uploadTotal > 0 ? Math.min(100, Math.floor((this.upload.sent / this.uploadTotal) * 100)) : null;
this.output.write(
pct !== null
? `Uploaded ${formatBytes(this.upload.sent)} / ~${formatBytes(this.uploadTotal)} (${pct}%)\n`
: `Uploaded ${formatBytes(this.upload.sent)}\n`
);
}
}

renderEvent(message: SSEMessage): void {
let parsed: unknown;
try {
parsed = JSON.parse(message.data);
} catch {
parsed = message.data;
}
switch (message.event) {
case 'phase':
this.renderPhase(parsed as { phase?: string; status?: string; message?: string });
break;
case 'install':
this.renderInstall(parsed as { manager?: string; stream?: string; line?: string });
break;
case 'error': {
const e = parsed as { message?: string; code?: string | number };
this.output.write(`error: ${e.message ?? message.data}${e.code ? ` (${e.code})` : ''}\n`);
break;
}
case 'done':
// Caller picks up final result via the SSE iterator; nothing to render here.
break;
}
}

private renderPhase(data: { phase?: string; status?: string; message?: string }): void {
const label = data.phase ?? '?';
if (data.status === 'start') {
if (this.phase.current !== label) {
this.output.write(`${label}…\n`);
this.phase.current = label;
this.phase.installLineCount = 0;
}
} else if (data.status === 'done') {
if (label === 'install' && this.phase.installLineCount > 0) {
this.output.write(`install done (${this.phase.installLineCount} log lines)\n`);
} else {
this.output.write(`${label} done\n`);
}
} else if (data.status === 'error') {
this.output.write(`${label} ERROR: ${data.message ?? 'failed'}\n`);
}
}

private renderInstall(data: { manager?: string; stream?: string; line?: string }): void {
const line = (data.line ?? '').trimEnd();
if (!line) return;
if (data.manager && data.manager !== this.phase.installManager) {
this.phase.installManager = data.manager;
this.output.write(`install: using ${data.manager}\n`);
}
this.phase.installLineCount++;
// Prefix with stream so users can distinguish stderr noise from stdout warnings.
const tag = data.stream === 'stderr' ? '!' : '|';
this.output.write(` ${tag} ${line}\n`);
}
}

function formatBytes(bytes: number): string {
if (bytes < 1024) return `${bytes} B`;
if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KiB`;
if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MiB`;
return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GiB`;
}
Loading
Loading