Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,27 @@ The cross-thread subscription path (default `crossThreads`) drives every `Table.
- **`databaseSubscriptions.activeCount`** is the count of live `Subscription` instances on a database. It is incremented at the end of `addSubscription` (after the Subscription is created, so the `scope: 'full-database'` early-return path correctly skips counting) and decremented in `Subscription.end()`. `notifyFromTransactionData` short-circuits when this is zero — the reusable rocksdb iterator stays put and resumes from its position the next time a subscriber arrives. Without this short-circuit, an idle database with no subscribers still pays the audit-log iteration cost on every commit during replication backlog catch-up.
- **`notifyScheduled` + `setImmediate`** in the `'committed'` listener defers the iteration off the commit microtask. Multiple `'committed'` events that land in the same event-loop turn collapse into one notify pass. `notifyScheduled` stays set for the entire drain — including across yield-and-resume turns — so a re-entry from a new `'committed'` event cannot spawn a second concurrent notify on the same iterator.
- **Batched yielding** in `notifyFromTransactionData` (`NOTIFY_BATCH_SIZE`) is gated by `allowYield`. The `'committed'` path passes `allowYield = true`; the `listenToCommits` (same-thread `aftercommit`) path does not, because that path holds an inter-thread `'thread-local-writes'` lock that must not span event-loop turns. `subscribersWithTxns` is carried across yields via `subscriptions.pendingTxnSubscribers` so the `end_txn` signal fires exactly once when the iterator truly drains. When `activeCount` drops to zero mid-yield, the next continuation drops the carry-over to avoid invoking ended subscribers' listeners.

## `createBlob(readable)` and `table.put()` don't synchronously drain the source

When a blob attribute is created from a Node `Readable` (e.g. `createBlob(stream)` then `row.payload_blob = blob; await table.put(row)`), the put does **not** wait for the underlying stream to fully drain into the file before resolving. Internally `saveBlob` kicks off a `writeBlobWithStream` pipeline whose `storageInfo.saving` promise is tracked separately. The put resolves once encoding has captured the blob reference; the bytes finish writing concurrently.

Consequence for callers that wrap the source in a hashing `Transform`: calling `hash.digest('hex')` after `await table.put()` is unsafe — more `chunk.update()` calls can still fire as the stream drains, producing `Error [ERR_CRYPTO_HASH_FINALIZED]: Digest already called`. Options:

- Buffer first, then hash + put (what `components/deploymentRecorder.ts` does for Slice A — small payloads only).
- Hash via Transform while extraction reads the stream, and only finalize the hash on the Transform's `'end'` event before any second put with the final hash.
- Await `storageInfo.saving` directly if you have a handle to the FileBackedBlob (the cleanest path for streaming).

Future agents touching `components/deploymentRecorder.ts` for Slice B's streaming variant should pick one of the latter two patterns.

## System table bootstrap: `systemSchema.json` + upgrade directive

Adding a new system table (e.g. `hdb_deployment` in #641 Slice A) requires three changes:

1. **`json/systemSchema.json`** — the table entry. Fresh installs auto-create it via `utility/mount_hdb.ts:createTables()`, which iterates `Object.keys(systemSchema)` on first boot.
2. **`utility/hdbTerms.ts`** — add the table name to `SYSTEM_TABLE_NAMES`.
3. **`upgrade/directives/<version>.ts`** — provisions the table on existing installs that already have a system schema. Registered in `upgrade/directives/directivesController.ts` (which is otherwise empty — its `versions` Map gets populated by these imports). The directive shape is `{ version, sync_functions, async_functions }`; copy `5-2-0.ts` for the canonical pattern (uses `bridge.createTable` to match what `mount_hdb` does on a fresh install).

System tables replicate by default. To opt out, add the name to `NON_REPLICATING_SYSTEM_TABLES` in `resources/databases.ts`. The check happens after table init and sets `table.replicate = false` per-node.

If the table needs `audit: true`, set it both in the schema (for fresh installs) **and** on the `CreateTableObject` instance in the directive (for upgrades) — otherwise the two paths diverge.
118 changes: 97 additions & 21 deletions bin/cliOperations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,22 @@ import { httpRequest } from '../utility/common_utils.ts';
import * as path from 'path';
import * as fs from 'fs-extra';
import * as YAML from 'yaml';
import { streamPackagedDirectory } from '../components/packageComponent.ts';
import { streamPackagedDirectory, getPackagedDirectorySize } from '../components/packageComponent.ts';
import { buildMultipartBody } from './multipartBuilder.ts';
import { parseSSE } from './sseConsumer.ts';
import { DeployRenderer } from './deployRenderer.ts';
import { getHdbPid } from '../utility/processManagement/processManagement.js';
import { initConfig, getConfigPath } from '../config/configUtils.js';

const OP_ALIASES = { deploy: 'deploy_component', package: 'package_component' };

// Operations whose responses should be consumed as text/event-stream so live phase events
// (prepare, load, replicate, restart) render as they happen instead of after the whole
// deploy completes. Add an operation here only after wiring its server-side
// SSE_PROGRESS_OPERATIONS entry — otherwise the server returns the buffered JSON path and
// the SSE parser sees no events.
const SSE_OPERATIONS = new Set(['deploy_component']);

// Properties on `req` that the CLI itself uses for transport/UX, not the operations API.
// They never get serialized into the request body.
const TRANSPORT_ONLY_FIELDS = new Set([
Expand All @@ -37,13 +46,17 @@ const PREPARE_OPERATION: any = {

const projectPath = process.cwd();
if (!req.project) req.project = path.basename(projectPath);
// Stream the tar+gzip directly to the server as the file part of a multipart body.
// This bypasses the Node Buffer 2 GB cap that the previous CBOR-encoded path was
// subject to, so large components can deploy without materializing in memory.
req._packageStream = streamPackagedDirectory(projectPath, {
const packageOptions = {
skip_node_modules: req.skip_node_modules !== false,
skip_symlinks: req.skip_symlinks === true,
});
};
// Store path + options for deferred stream creation after the renderer is set up,
// so the pre-gzip onBytes callback can be wired directly to renderer.countUploadBytes.
req._projectPath = projectPath;
req._packageOptions = packageOptions;
// Pre-walk the directory for an uncompressed-size estimate. Both the progress counter
// and this total are in uncompressed units so the bar tracks to 100% naturally.
req._uploadSizeEstimate = await getPackagedDirectorySize(projectPath, packageOptions);
req._multipart = true;
},
};
Expand Down Expand Up @@ -191,38 +204,101 @@ async function cliOperations(req: any, skipResponseLog = false) {
options.headers.Authorization = `Bearer ${tokens.operation_token}`;
}
}
const useSse = SSE_OPERATIONS.has(req.operation);
if (useSse) {
options.headers.Accept = 'text/event-stream';
options.streamResponse = true;
}
// One renderer owns the (future) upload bar and the SSE event rendering for a
// multipart deploy. Created here so the upload-stream tap and the SSE consumer
// below share the same instance.
const renderer = req._multipart ? new DeployRenderer({ uploadTotal: req._uploadSizeEstimate ?? 0 }) : null;
let body;
if (req._multipart) {
const packageStream = req._packageStream;
// Create the package stream here — after the renderer exists — so we can pass
// renderer.countUploadBytes as the onBytes callback. Both progress and total are
// uncompressed bytes, so the bar tracks accurately to 100% without premature snapping.
const packageStream = streamPackagedDirectory(
req._projectPath,
req._packageOptions,
renderer ? (n) => renderer.countUploadBytes(n) : undefined
);
const fields = {};
for (const [key, value] of Object.entries(req)) {
if (key.startsWith('_') || TRANSPORT_ONLY_FIELDS.has(key)) continue;
fields[key] = value;
}
const multipart = buildMultipartBody(
fields,
packageStream
? { name: 'payload', filename: 'package.tar.gz', contentType: 'application/gzip', stream: packageStream }
: undefined
);
const multipart = buildMultipartBody(fields, {
name: 'payload',
filename: 'package.tar.gz',
contentType: 'application/gzip',
stream: packageStream,
});
options.headers['Content-Type'] = multipart.contentType;
// Use chunked transfer-encoding: we don't know the total size up front because the
// payload is streamed from `tar.pack` and never fully buffered.
options.headers['Transfer-Encoding'] = 'chunked';
body = multipart.stream;
// Tap the body so bytes flowing into the HTTP request advance the upload bar.
// The renderer's Transform is identity — chunks pass through unmodified.
body = renderer ? renderer.tapUploadStream(multipart.stream) : multipart.stream;
} else {
body = req;
}
let response: any = await httpRequest(options, body);

// endUpload() is called from the counter Transform's flush callback in tapUploadStream
// once all multipart bytes have flowed through. For SSE deploys, httpRequest resolves
// when response headers arrive (streamResponse: true), which happens before the full
// upload completes — calling endUpload() here would snap the bar prematurely.

let responseData;
try {
responseData = JSON.parse(response.body);
} catch {
responseData = {
status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'),
body: response.body,
};
if (useSse && response.headers['content-type']?.startsWith('text/event-stream')) {
// Consume SSE: render phase events live, capture the final result from the `done`
// event (or the error message from the `error` event). The HTTP status stays 200
// until end-of-stream; failures are signaled in-band.
let finalResult;
let sseError;
for await (const message of parseSSE(response)) {
renderer?.renderEvent(message);
if (message.event === 'done') {
try {
finalResult = JSON.parse(message.data)?.result;
} catch {
finalResult = message.data;
}
} else if (message.event === 'error') {
try {
sseError = JSON.parse(message.data);
} catch {
sseError = { message: message.data };
}
}
}
if (sseError) {
const errMsg = sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError);
console.error(`error: ${errMsg}`);
process.exit(1);
}
responseData = finalResult ?? { message: 'Deploy completed (no result payload).' };
} else {
// When useSse is true, httpRequest returns a raw IncomingMessage (streamResponse mode),
// so .body is undefined. Drain the stream to get the text (e.g. a 401 error body).
let bodyText: string;
if (useSse) {
const chunks: Buffer[] = [];
for await (const chunk of response as AsyncIterable<Buffer>) chunks.push(Buffer.from(chunk));
bodyText = Buffer.concat(chunks).toString('utf8');
} else {
bodyText = response.body;
}
try {
responseData = JSON.parse(bodyText);
} catch {
responseData = {
status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'),
body: bodyText,
};
}
}

let responseLog;
Expand Down
196 changes: 196 additions & 0 deletions bin/deployRenderer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import { Transform } from 'node:stream';
// cli-progress is already a runtime dep of harper (see package.json); using its
// SingleBar to render the upload phase here doesn't add a new dependency.
import cliProgress from 'cli-progress';
import type { SSEMessage } from './sseConsumer.ts';

interface RendererOptions {
uploadTotal?: number;
output?: NodeJS.WritableStream;
}

interface UploadState {
bar: cliProgress.SingleBar | null;
sent: number;
finished: boolean;
}

interface PhaseState {
current?: string;
installManager?: string;
installLineCount: number;
}

/**
* Deploy-time renderer that owns the progress display across two phases:
*
* 1. Local upload — driven by `tapUploadStream`, which wraps the multipart body so we
* can update a `cli-progress` bar against the precomputed uncompressed source-tree
* total. The bar moves as gzipped bytes are sent and snaps to 100% on completion.
* In a non-TTY environment a single "Uploaded X MiB" line is printed on completion.
*
* 2. Server-side phases — driven by `renderEvent`, called for each SSE message the
* CLI receives from the operations API. Phase events print one-liners; live
* `install` events (npm/pnpm/yarn stdout) are throttled to one line under a
* "[install]" header so a noisy `npm install` doesn't drown the terminal.
*
* Designed so the two phases hand off cleanly: `endUpload()` tears the bar down (so
* it doesn't compete with subsequent prints) before any SSE events render.
*/
export class DeployRenderer {
private upload: UploadState = { bar: null, sent: 0, finished: false };
private phase: PhaseState = { installLineCount: 0 };
private output: NodeJS.WritableStream;
private isTty: boolean;
private uploadTotal: number;

constructor(options: RendererOptions = {}) {
this.output = options.output ?? process.stderr;
// Only render a bar when stderr is a real terminal. CI runners, log redirection,
// and pipes look identical from Node's perspective: !isTTY.
this.isTty = Boolean((this.output as NodeJS.WriteStream).isTTY);
this.uploadTotal = options.uploadTotal ?? 0;
}

/**
* Wrap an outbound stream so each byte flowing through it advances the upload bar.
* The Transform is identity — chunks pass through unmodified.
*/
tapUploadStream<T extends NodeJS.ReadableStream>(stream: T): NodeJS.ReadableStream {
this.upload.bar = this.isTty
? new cliProgress.SingleBar(
{
// {value_fmt} and {total_fmt} are payload tokens updated in tickUpload/endUpload.
// uploadTotal is the uncompressed source size; gzip output is smaller so the
// bar won't naturally reach 100% — endUpload() snaps it on completion.
format: 'Uploading [{bar}] {percentage}% | {value_fmt} / ~{total_fmt}',
barCompleteChar: '█',
barIncompleteChar: '░',
hideCursor: true,
stream: this.output,
},
cliProgress.Presets.shades_classic
)
: null;
this.upload.bar?.start(this.uploadTotal || 1, 0, {
value_fmt: formatBytes(0),
total_fmt: formatBytes(this.uploadTotal),
});

const counter = new Transform({
transform: (chunk, _enc, cb) => {
// Bytes are counted externally via countUploadBytes() on the pre-gzip tar
// stream so progress and total are both in uncompressed units. The Transform
// is kept here solely to get the flush callback that signals upload completion.
cb(null, chunk);
},
flush: (cb) => {
this.endUpload();
cb();
},
});
stream.on('error', (err) => counter.destroy(err));
stream.pipe(counter);
return counter;
}

/**
* Record `n` pre-gzip bytes read from the tar pack stream. Called for each
* raw tar chunk by the `onBytes` callback passed to `streamPackagedDirectory`,
* keeping progress and total in the same (uncompressed) unit so the bar
* tracks smoothly and doesn't terminate far short of 100%.
*/
countUploadBytes(n: number): void {
if (this.upload.finished) return;
this.upload.sent += n;
this.tickUpload();
}

endUpload(): void {
if (this.upload.finished) return;
this.upload.finished = true;
if (this.upload.bar) {
// Snap to total so the bar shows 100% even when our uncompressed-total estimate
// is slightly off (gzip output is usually smaller than the source tree).
const finalPayload = { value_fmt: formatBytes(this.upload.sent), total_fmt: formatBytes(this.uploadTotal) };
if (this.uploadTotal > 0) this.upload.bar.update(this.uploadTotal, finalPayload);
this.upload.bar.stop();
this.upload.bar = null;
} else {
// Non-TTY: single completion line, no intermediate chatter.
this.output.write(`Uploaded ${formatBytes(this.upload.sent)}\n`);
}
}

private tickUpload(): void {
if (this.upload.finished) return;
if (this.upload.bar) {
this.upload.bar.update(this.upload.sent, { value_fmt: formatBytes(this.upload.sent) });
}
// Non-TTY: no intermediate lines — endUpload() prints the final size on completion.
}

renderEvent(message: SSEMessage): void {
let parsed: unknown;
try {
parsed = JSON.parse(message.data);
} catch {
parsed = message.data;
}
switch (message.event) {
case 'phase':
this.renderPhase(parsed as { phase?: string; status?: string; message?: string });
break;
case 'install':
this.renderInstall(parsed as { manager?: string; stream?: string; line?: string });
break;
case 'error': {
const e = parsed as { message?: string; code?: string | number };
this.output.write(`error: ${e.message ?? message.data}${e.code ? ` (${e.code})` : ''}\n`);
break;
}
case 'done':
// Caller picks up final result via the SSE iterator; nothing to render here.
break;
}
}

private renderPhase(data: { phase?: string; status?: string; message?: string }): void {
const label = data.phase ?? '?';
if (data.status === 'start') {
if (this.phase.current !== label) {
this.output.write(`${label}…\n`);
this.phase.current = label;
this.phase.installLineCount = 0;
}
} else if (data.status === 'done') {
if (label === 'install' && this.phase.installLineCount > 0) {
this.output.write(`install done (${this.phase.installLineCount} log lines)\n`);
} else {
this.output.write(`${label} done\n`);
}
} else if (data.status === 'error') {
this.output.write(`${label} ERROR: ${data.message ?? 'failed'}\n`);
}
}

private renderInstall(data: { manager?: string; stream?: string; line?: string }): void {
const line = (data.line ?? '').trimEnd();
if (!line) return;
if (data.manager && data.manager !== this.phase.installManager) {
this.phase.installManager = data.manager;
this.output.write(`install: using ${data.manager}\n`);
}
this.phase.installLineCount++;
// Prefix with stream so users can distinguish stderr noise from stdout warnings.
const tag = data.stream === 'stderr' ? '!' : '|';
this.output.write(` ${tag} ${line}\n`);
}
}

function formatBytes(bytes: number): string {
if (bytes < 1024) return `${bytes} B`;
if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KiB`;
if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MiB`;
return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GiB`;
}
Loading
Loading