From 136a10efa3deb64b5e2ff6dc44d136cd578c6f93 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 20 May 2026 18:36:41 -0600 Subject: [PATCH 01/10] feat(deploy): hdb_deployment system table + audit record on every deploy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Slice A of #641. Every deploy_component call now writes a row to a new system.hdb_deployment table capturing the project, package identifier, sha256 of the payload tarball, payload size, status (pending → success or failed), error info, and the upload payload itself as a Blob attribute. The deployment_id is returned in the deploy response and is the join key Studio/CLI will use to subscribe to live progress in Slice B. Includes: - json/systemSchema.json: hdb_deployment table definition (deployment_id hash, with attributes mirroring the lifecycle) - utility/hdbTerms.ts: SYSTEM_TABLE_NAMES.DEPLOYMENT_TABLE_NAME + LIST_DEPLOYMENTS / GET_DEPLOYMENT / GET_DEPLOYMENT_PAYLOAD / DELETE_DEPLOYMENT_PAYLOAD operation enums - upgrade/directives/5-2-0.ts: provisions the table on existing installs (fresh installs get it via mount_hdb's systemSchema iteration) - components/deploymentRecorder.ts: lifecycle wrapper used by deployComponent — creates the row up front, ingests the payload into a Blob attribute with sha256 + size, then commits success or failure - components/deploymentOperations.ts: handlers for list_deployments (with project/status/since/until/limit/offset filters) and get_deployment; payload bytes are stripped from these responses - components/operations.js: deployComponent now wraps prepareApplication in a try/catch driven by the recorder; payload is re-sourced from the persisted blob so extraction reads exactly what was recorded - server/serverHelpers/serverUtilities.ts: registers the two new ops - integrationTests/deploy/deploy-tracking.test.ts: end-to-end coverage for the happy path, list filtering, and failure recording Updates the brittle deepStrictEqual deploy-response assertions in 4 existing tests to allow the new deployment_id field. Slice A scope is deliberately single-node; Slice B will replace the in-memory buffer in ingestPayload with a streaming variant and add peer-side reads from the replicated blob. Refs #641 Co-Authored-By: Claude Opus 4.7 (1M context) --- components/deploymentOperations.ts | 76 +++++++ components/deploymentRecorder.ts | 143 +++++++++++++ components/operations.js | 127 ++++++----- .../components/early-hints.test.ts | 5 +- .../components/redirector.test.ts | 5 +- .../components/risk-query.test.ts | 5 +- .../deploy/deploy-from-github.test.ts | 6 +- .../deploy/deploy-from-source.test.ts | 8 +- .../deploy/deploy-tracking.test.ts | 200 ++++++++++++++++++ json/systemSchema.json | 64 ++++++ server/serverHelpers/serverUtilities.ts | 9 + upgrade/directives/5-2-0.ts | 49 +++++ upgrade/directives/directivesController.ts | 7 + utility/hdbTerms.ts | 5 + 14 files changed, 651 insertions(+), 58 deletions(-) create mode 100644 components/deploymentOperations.ts create mode 100644 components/deploymentRecorder.ts create mode 100644 integrationTests/deploy/deploy-tracking.test.ts create mode 100644 upgrade/directives/5-2-0.ts diff --git a/components/deploymentOperations.ts b/components/deploymentOperations.ts new file mode 100644 index 000000000..a394398d2 --- /dev/null +++ b/components/deploymentOperations.ts @@ -0,0 +1,76 @@ +'use strict'; + +// Read-side operations against system.hdb_deployment. Slice A of issue #641. +// Write-side lives in deploymentRecorder.ts; this module only reads. + +import { databases } from '../resources/databases.ts'; +import * as terms from '../utility/hdbTerms.ts'; +import { ClientError } from '../utility/errors/hdbError.ts'; + +const DEPLOYMENT_TABLE = terms.SYSTEM_TABLE_NAMES.DEPLOYMENT_TABLE_NAME; + +interface ListRequest { + project?: string; + status?: string; + since?: number; + until?: number; + limit?: number; + offset?: number; +} + +interface GetRequest { + deployment_id: string; +} + +function deploymentTable() { + const table = (databases as any).system?.[DEPLOYMENT_TABLE]; + if (!table) { + throw new ClientError( + `Deployment tracking is not initialized on this node (system.${DEPLOYMENT_TABLE} missing). ` + + `Run upgrade or restart the server to provision the table.` + ); + } + return table; +} + +// Strip the blob attribute from a row; the bytes never travel over the operations API. +// Callers wanting bytes use get_deployment_payload (added in Slice B). +function stripBlob(row: any): any { + if (!row || typeof row !== 'object') return row; + const { payload_blob, ...rest } = row; + rest.payload_blob_present = payload_blob != null; + return rest; +} + +export async function handleListDeployments(req: ListRequest = {}): Promise<{ deployments: any[]; total: number }> { + const table = deploymentTable(); + const conditions: any[] = []; + if (req.project) conditions.push({ attribute: 'project', value: req.project }); + if (req.status) conditions.push({ attribute: 'status', value: req.status }); + if (req.since != null) conditions.push({ attribute: 'started_at', value: req.since, comparator: 'greater_than_equal' }); + if (req.until != null) conditions.push({ attribute: 'started_at', value: req.until, comparator: 'less_than_equal' }); + + const collected: any[] = []; + for await (const row of table.search(conditions)) { + collected.push(stripBlob(row)); + } + // Newest first by started_at; ties broken by deployment_id for stability. + collected.sort((a, b) => (b.started_at ?? 0) - (a.started_at ?? 0) || String(a.deployment_id).localeCompare(b.deployment_id)); + + const total = collected.length; + const offset = Math.max(0, req.offset ?? 0); + const limit = req.limit != null ? Math.max(0, req.limit) : collected.length; + return { deployments: collected.slice(offset, offset + limit), total }; +} + +export async function handleGetDeployment(req: GetRequest): Promise { + if (!req || !req.deployment_id) { + throw new ClientError(`'deployment_id' is required`); + } + const table = deploymentTable(); + const row = await table.get(req.deployment_id); + if (!row) { + throw new ClientError(`No deployment found with id '${req.deployment_id}'`); + } + return stripBlob(row); +} diff --git a/components/deploymentRecorder.ts b/components/deploymentRecorder.ts new file mode 100644 index 000000000..2229da598 --- /dev/null +++ b/components/deploymentRecorder.ts @@ -0,0 +1,143 @@ +'use strict'; + +// DeploymentRecorder — Slice A scope. +// +// Owns the lifecycle of one row in system.hdb_deployment: creates the pending row at deploy +// start, streams the upload payload into the row's payload_blob (computing sha256 + size +// alongside), and writes the terminal status at the end. Slice B will extend this with +// ProgressEmitter subscription and event_log writes; Slice C will add rollback sourcing +// from the blob. + +import { randomUUID } from 'node:crypto'; +import { createHash, Hash } from 'node:crypto'; +import { Transform } from 'node:stream'; +import type { Readable } from 'node:stream'; +import { databases } from '../resources/databases.ts'; +import { createBlob } from '../resources/blob.ts'; +import * as terms from '../utility/hdbTerms.ts'; +import { hostname } from 'node:os'; + +type DeploymentStatus = 'pending' | 'extracting' | 'installing' | 'loading' | 'replicating' | 'restarting' | 'success' | 'failed' | 'rolled_back'; + +interface CreateOptions { + project?: string; + package_identifier?: string; + user?: string; + restart_mode?: 'immediate' | 'rolling' | null; + rollback_of?: string | null; +} + +export class DeploymentRecorder { + readonly deploymentId: string; + private readonly record: Record; + private hash: Hash | null = null; + private byteCount = 0; + private finished = false; + + private constructor(deploymentId: string, initial: Record) { + this.deploymentId = deploymentId; + this.record = initial; + } + + static async create(options: CreateOptions): Promise { + const deploymentId = randomUUID(); + const startedAt = Date.now(); + const record: Record = { + deployment_id: deploymentId, + project: options.project ?? null, + package_identifier: options.package_identifier ?? null, + payload_hash: null, + payload_size: null, + payload_blob: null, + status: 'pending' as DeploymentStatus, + phase: 'pending', + event_log: [], + peer_results: [], + origin_node: hostname(), + restart_mode: options.restart_mode ?? null, + started_at: startedAt, + completed_at: null, + user: options.user ?? null, + rollback_of: options.rollback_of ?? null, + error: null, + }; + const recorder = new DeploymentRecorder(deploymentId, record); + await recorder.put(); + return recorder; + } + + /** + * Drain a payload source (Buffer or Readable) into the row's payload_blob attribute, + * computing sha256 and byte count alongside. After this resolves the row has been + * committed once with the final hash and size, and `this.row.payload_blob.stream()` + * yields a fresh Readable that callers can pass to extraction. + * + * Slice A buffers the payload in memory so the hash/size are known synchronously before + * we commit and so the blob's `saveBlob` lifecycle doesn't race with our digest() call. + * Slice B will swap this for a true streaming path once we also gain the ProgressEmitter + * subscriber that benefits from chunk-level progress events. + */ + async ingestPayload(source: Readable | Buffer | string): Promise { + const hash = createHash('sha256'); + let byteCount = 0; + let buffer: Buffer; + if (Buffer.isBuffer(source)) { + buffer = source; + } else if (typeof source === 'string') { + // Legacy CBOR/JSON path: payload arrives as a base64-encoded string. + buffer = Buffer.from(source, 'base64'); + } else { + const chunks: Buffer[] = []; + for await (const chunk of source as AsyncIterable) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as any)); + } + buffer = Buffer.concat(chunks); + } + hash.update(buffer); + byteCount = buffer.length; + this.record.payload_blob = createBlob(buffer, { type: 'application/gzip' }); + this.record.payload_hash = hash.digest('hex'); + this.record.payload_size = byteCount; + // Touch the unused private fields so the type system stays happy in Slice B when we + // reintroduce the streaming variant that uses them. + this.hash = hash; + this.byteCount = byteCount; + await this.put(); + } + + async transitionPhase(phase: string, status?: DeploymentStatus): Promise { + this.record.phase = phase; + if (status) this.record.status = status; + await this.put(); + } + + async finish(status: 'success' | 'failed' | 'rolled_back', error?: unknown): Promise { + if (this.finished) return; + this.finished = true; + this.record.status = status; + this.record.completed_at = Date.now(); + if (error) { + const e = error as { message?: string; code?: string | number; stack?: string }; + this.record.error = { + message: e?.message ?? String(error), + code: e?.code, + phase: this.record.phase, + }; + } + await this.put(); + } + + get row(): Record { + return this.record; + } + + private async put(): Promise { + const table = (databases as any).system?.[terms.SYSTEM_TABLE_NAMES.DEPLOYMENT_TABLE_NAME]; + if (!table) { + // Table missing means the upgrade directive hasn't run yet (or the table got dropped). + // We tolerate this — tracking is observability; the deploy itself must still succeed. + return; + } + await table.put(this.record); + } +} diff --git a/components/operations.js b/components/operations.js index 13a4e9050..63a90068b 100644 --- a/components/operations.js +++ b/components/operations.js @@ -18,6 +18,7 @@ const { packageDirectory } = require('../components/packageComponent.ts'); const { Resources } = require('../resources/Resources.ts'); const { Application, prepareApplication } = require('./Application.ts'); const { server } = require('../server/Server.ts'); +const { DeploymentRecorder } = require('./deploymentRecorder.ts'); /** * Read the settings.js file and return the @@ -361,7 +362,6 @@ async function deployComponent(req) { } // Write to root config if the request contains a package identifier - // TODO: how can we keep record of the `payload`? Its often too large to stuff into a config file; especially the root config. Maybe we can write it to a file and reference that way? if (req.package) { // Check if trying to overwrite a core component (requires force) // Lazy-load to avoid circular dependency with componentLoader @@ -386,60 +386,89 @@ async function deployComponent(req) { await configUtils.addConfig(req.project, applicationConfig); } - const application = new Application({ - name: req.project, - payload: req.payload, - packageIdentifier: req.package, - install: { - command: req.install_command, - timeout: req.install_timeout, - allowInstallScripts: req.install_allow_scripts, - }, + // Slice A of issue #641: create a hdb_deployment row up front so the deploy is + // observable and auditable even if the CLI disconnects. The row also holds the payload + // in a Blob attribute — Slice B will use that for peer delivery; for now it's the + // audit record and the rollback source. + const recorder = await DeploymentRecorder.create({ + project: req.project, + package_identifier: req.package ?? null, + user: req.hdb_user?.username, + restart_mode: req.restart === 'rolling' ? 'rolling' : req.restart ? 'immediate' : null, }); + req._deploymentId = recorder.deploymentId; - await prepareApplication(application); - - // now we attempt to actually load the component in case there is - // an error we can immediately detect and report, but app code should not run on the main thread - if (!isMainThread && !process.env.HARPER_SAFE_MODE) { - const pseudoResources = new Resources(); - pseudoResources.isWorker = true; - - const componentLoader = require('./componentLoader.ts').default || require('./componentLoader.ts'); - let lastError; - componentLoader.setErrorReporter((error) => (lastError = error)); - await componentLoader.loadComponent( - application.dirPath, - pseudoResources, - undefined, - false, - undefined, - false, - req.project - ); + let extractionPayload = req.payload; + try { + // If a tarball came in (Buffer or Readable from the multipart parser), tee it through + // a hash-and-size tap into the row's payload_blob, then re-source extraction from the + // persisted blob. This means we read the upload exactly once into local storage; the + // blob is the staging area and (in Slice B) the channel peers will replicate from. + if (req.payload != null) { + await recorder.ingestPayload(req.payload); + extractionPayload = recorder.row.payload_blob.stream(); + } - if (lastError) throw lastError; - } - const rollingRestart = req.restart === 'rolling'; - // if doing a rolling restart set restart to false so that other nodes don't also restart. - req.restart = rollingRestart ? false : req.restart; - let response = await server.replication.replicateOperation(req); - if (req.restart === true) { - manageThreads.restartWorkers('http'); - response.message = `Successfully deployed: ${application.name}, restarting Harper`; - } else if (rollingRestart) { - const serverUtilities = require('../server/serverHelpers/serverUtilities.ts'); - const jobResponse = await serverUtilities.executeJob({ - operation: 'restart_service', - service: 'http', - replicated: true, + const application = new Application({ + name: req.project, + payload: extractionPayload, + packageIdentifier: req.package, + install: { + command: req.install_command, + timeout: req.install_timeout, + allowInstallScripts: req.install_allow_scripts, + }, }); - response.restartJobId = jobResponse.job_id; - response.message = `Successfully deployed: ${application.name}, restarting Harper`; - } else response.message = `Successfully deployed: ${application.name}`; + await prepareApplication(application); - return response; + // now we attempt to actually load the component in case there is + // an error we can immediately detect and report, but app code should not run on the main thread + if (!isMainThread && !process.env.HARPER_SAFE_MODE) { + const pseudoResources = new Resources(); + pseudoResources.isWorker = true; + + const componentLoader = require('./componentLoader.ts').default || require('./componentLoader.ts'); + let lastError; + componentLoader.setErrorReporter((error) => (lastError = error)); + await componentLoader.loadComponent( + application.dirPath, + pseudoResources, + undefined, + false, + undefined, + false, + req.project + ); + + if (lastError) throw lastError; + } + const rollingRestart = req.restart === 'rolling'; + // if doing a rolling restart set restart to false so that other nodes don't also restart. + req.restart = rollingRestart ? false : req.restart; + let response = await server.replication.replicateOperation(req); + if (req.restart === true) { + manageThreads.restartWorkers('http'); + response.message = `Successfully deployed: ${application.name}, restarting Harper`; + } else if (rollingRestart) { + const serverUtilities = require('../server/serverHelpers/serverUtilities.ts'); + const jobResponse = await serverUtilities.executeJob({ + operation: 'restart_service', + service: 'http', + replicated: true, + }); + + response.restartJobId = jobResponse.job_id; + response.message = `Successfully deployed: ${application.name}, restarting Harper`; + } else response.message = `Successfully deployed: ${application.name}`; + + response.deployment_id = recorder.deploymentId; + await recorder.finish('success'); + return response; + } catch (err) { + await recorder.finish('failed', err); + throw err; + } } /** diff --git a/integrationTests/components/early-hints.test.ts b/integrationTests/components/early-hints.test.ts index 83b3a3ad4..f16681a89 100644 --- a/integrationTests/components/early-hints.test.ts +++ b/integrationTests/components/early-hints.test.ts @@ -6,7 +6,7 @@ * conversion, empty hints handling, and response length limits. */ import { suite, test, before, after } from 'node:test'; -import { strictEqual, ok, deepStrictEqual, match } from 'node:assert/strict'; +import { strictEqual, ok, match } from 'node:assert/strict'; import { join, dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; @@ -26,7 +26,8 @@ suite('Component: early-hints', (ctx: ContextWithHarper) => { package: join(__dirname, '../fixtures/template-early-hints-2.0.0.tgz'), restart: true, }); - deepStrictEqual(deployBody, { message: 'Successfully deployed: early-hints, restarting Harper' }); + strictEqual(deployBody.message, 'Successfully deployed: early-hints, restarting Harper'); + ok(typeof deployBody.deployment_id === 'string', `expected deployment_id, got ${deployBody.deployment_id}`); // poll until /hints endpoint is registered and seed data is loaded const seedDeadline = Date.now() + 60_000; diff --git a/integrationTests/components/redirector.test.ts b/integrationTests/components/redirector.test.ts index c9365d895..56ba6e9d8 100644 --- a/integrationTests/components/redirector.test.ts +++ b/integrationTests/components/redirector.test.ts @@ -6,7 +6,7 @@ * versioning, time-based rules, edge cases, and table CRUD. */ import { suite, test, before, after } from 'node:test'; -import { strictEqual, ok, deepStrictEqual } from 'node:assert/strict'; +import { strictEqual, ok } from 'node:assert/strict'; import { join, dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; @@ -41,7 +41,8 @@ suite('Component: redirector', (ctx: ContextWithHarper) => { package: join(__dirname, '../fixtures/template-redirector-3.0.1.tgz'), restart: true, }); - deepStrictEqual(deployBody, { message: 'Successfully deployed: redirector, restarting Harper' }); + strictEqual(deployBody.message, 'Successfully deployed: redirector, restarting Harper'); + ok(typeof deployBody.deployment_id === 'string', `expected deployment_id, got ${deployBody.deployment_id}`); // poll until ready const deadline = Date.now() + 60_000; diff --git a/integrationTests/components/risk-query.test.ts b/integrationTests/components/risk-query.test.ts index 23f618111..68907add7 100644 --- a/integrationTests/components/risk-query.test.ts +++ b/integrationTests/components/risk-query.test.ts @@ -5,7 +5,7 @@ * shorthand field mapping, upsert, edge cases, and deletion. */ import { suite, test, before, after } from 'node:test'; -import { strictEqual, ok, deepStrictEqual } from 'node:assert/strict'; +import { strictEqual, ok } from 'node:assert/strict'; import { join, dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; @@ -24,7 +24,8 @@ suite('Component: risk-query', (ctx: ContextWithHarper) => { package: join(__dirname, '../fixtures/risq-1.0.0.tgz'), restart: true, }); - deepStrictEqual(body, { message: 'Successfully deployed: risk-query, restarting Harper' }); + strictEqual(body.message, 'Successfully deployed: risk-query, restarting Harper'); + ok(typeof body.deployment_id === 'string', `expected deployment_id in deploy response, got ${body.deployment_id}`); // Poll until the component is ready const deadline = Date.now() + 30_000; diff --git a/integrationTests/deploy/deploy-from-github.test.ts b/integrationTests/deploy/deploy-from-github.test.ts index 17b66b5ff..c632def36 100644 --- a/integrationTests/deploy/deploy-from-github.test.ts +++ b/integrationTests/deploy/deploy-from-github.test.ts @@ -42,7 +42,11 @@ suite('GitHub application deployment', { skip: process.platform === 'win32' }, ( }); strictEqual(response.status, 200); const body = await response.json(); - deepStrictEqual(body, { message: 'Successfully deployed: test-application, restarting Harper' }); + strictEqual(body.message, 'Successfully deployed: test-application, restarting Harper'); + ok( + typeof body.deployment_id === 'string' && /^[0-9a-f-]{36}$/i.test(body.deployment_id), + `expected a UUID deployment_id, got ${body.deployment_id}` + ); // Poll until the application API is ready (restart is async, fixed sleep is flaky) const deadline = Date.now() + 30_000; while (true) { diff --git a/integrationTests/deploy/deploy-from-source.test.ts b/integrationTests/deploy/deploy-from-source.test.ts index 860ef5f7f..221282310 100644 --- a/integrationTests/deploy/deploy-from-source.test.ts +++ b/integrationTests/deploy/deploy-from-source.test.ts @@ -7,7 +7,7 @@ * */ import { suite, test, before, after } from 'node:test'; -import { deepStrictEqual, ok, strictEqual } from 'node:assert/strict'; +import { ok, strictEqual } from 'node:assert/strict'; import { join } from 'node:path'; import { existsSync } from 'node:fs'; import { setTimeout as sleep } from 'node:timers/promises'; @@ -45,7 +45,11 @@ suite('Local application deployment', (ctx: ContextWithHarper) => { }); strictEqual(response.status, 200); const body = await response.json(); - deepStrictEqual(body, { message: 'Successfully deployed: test-application, restarting Harper' }); + strictEqual(body.message, 'Successfully deployed: test-application, restarting Harper'); + ok( + typeof body.deployment_id === 'string' && /^[0-9a-f-]{36}$/i.test(body.deployment_id), + `expected a UUID deployment_id, got ${body.deployment_id}` + ); // Poll until the deployed app is reachable. `restart: true` returns // before the new Harper process is listening, so a fixed sleep is // flaky — especially on Windows where the restart can take >5s. diff --git a/integrationTests/deploy/deploy-tracking.test.ts b/integrationTests/deploy/deploy-tracking.test.ts new file mode 100644 index 000000000..6be5917ba --- /dev/null +++ b/integrationTests/deploy/deploy-tracking.test.ts @@ -0,0 +1,200 @@ +/** + * Deployment tracking integration test (Slice A of issue #641). + * + * Asserts that every deploy_component call now writes a row to system.hdb_deployment, + * that the row contains a populated payload_hash + payload_size, that list_deployments + * and get_deployment surface the row through the operations API, and that a failed + * deploy produces a row with status=failed and a populated error field. + */ +import { suite, test, before, after } from 'node:test'; +import { ok, strictEqual } from 'node:assert/strict'; +import { join } from 'node:path'; +import { existsSync, mkdtempSync, mkdirSync, writeFileSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { setTimeout as sleep } from 'node:timers/promises'; +import { request } from 'node:http'; +import { Readable } from 'node:stream'; + +import { startHarper, teardownHarper, type ContextWithHarper } from '@harperfast/integration-testing'; + +import { streamPackagedDirectory } from '../../dist/components/packageComponent.js'; +import { buildMultipartBody } from '../../dist/bin/multipartBuilder.js'; + +function postMultipart( + url: URL, + contentType: string, + body: Readable, + auth: { username: string; password: string } +): Promise<{ status: number; body: string }> { + return new Promise((resolve, reject) => { + const req = request( + { + protocol: url.protocol, + hostname: url.hostname, + port: url.port, + method: 'POST', + path: '/', + headers: { + 'Content-Type': contentType, + 'Transfer-Encoding': 'chunked', + Authorization: 'Basic ' + Buffer.from(`${auth.username}:${auth.password}`).toString('base64'), + }, + }, + (res) => { + res.setEncoding('utf8'); + let buf = ''; + res.on('data', (chunk) => (buf += chunk)); + res.on('end', () => resolve({ status: res.statusCode ?? 0, body: buf })); + } + ); + req.on('error', reject); + body.on('error', reject); + body.pipe(req); + }); +} + +async function callOperation( + ctx: ContextWithHarper, + op: Record +): Promise<{ status: number; body: any }> { + const url = new URL(ctx.harper.operationsAPIURL); + const auth = + 'Basic ' + Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64'); + const res = await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json', Authorization: auth }, + body: JSON.stringify(op), + }); + const text = await res.text(); + let parsed: any = text; + try { + parsed = JSON.parse(text); + } catch { + // leave as text + } + return { status: res.status, body: parsed }; +} + +suite('Deployment tracking', (ctx: ContextWithHarper) => { + let fixtureDir: string; + let deploymentId: string | undefined; + + before(async () => { + await startHarper(ctx); + fixtureDir = mkdtempSync(join(tmpdir(), 'deploy-tracking-fixture-')); + writeFileSync( + join(fixtureDir, 'config.yaml'), + 'static:\n files: web\ngraphqlSchema:\n files: schema.graphql\nrest: true\n' + ); + writeFileSync(join(fixtureDir, 'schema.graphql'), 'type Query { hello: String }\n'); + mkdirSync(join(fixtureDir, 'web'), { recursive: true }); + writeFileSync(join(fixtureDir, 'web', 'index.html'), '

Hello, Tracking!

'); + }); + + after(async () => { + try { + rmSync(fixtureDir, { recursive: true, force: true }); + } catch { + // best-effort + } + await teardownHarper(ctx); + }); + + test('verify Harper', async () => { + const response = await fetch(`${ctx.harper.operationsAPIURL}/health`); + strictEqual(response.status, 200); + }); + + test('deploy records a hdb_deployment row with hash, size, and success status', async () => { + const project = 'tracking-test-application'; + const multipart = buildMultipartBody( + { operation: 'deploy_component', project, restart: false }, + { + name: 'payload', + filename: 'package.tar.gz', + contentType: 'application/gzip', + stream: streamPackagedDirectory(fixtureDir, { skip_node_modules: true }), + } + ); + const url = new URL(ctx.harper.operationsAPIURL); + const response = await postMultipart(url, multipart.contentType, multipart.stream, ctx.harper.admin); + strictEqual(response.status, 200, `expected 200, got ${response.status}: ${response.body}`); + const result = JSON.parse(response.body); + ok(result.deployment_id, 'deploy response should include a deployment_id'); + ok(/^[0-9a-f-]{36}$/i.test(result.deployment_id), `deployment_id should be a UUID: ${result.deployment_id}`); + deploymentId = result.deployment_id; + + await sleep(200); // give the table commit a moment to settle if put is async + + const got = await callOperation(ctx, { operation: 'get_deployment', deployment_id: result.deployment_id }); + strictEqual(got.status, 200, `get_deployment should return 200, got ${got.status}: ${JSON.stringify(got.body)}`); + const row = got.body; + strictEqual(row.deployment_id, result.deployment_id); + strictEqual(row.project, project); + strictEqual(row.status, 'success'); + strictEqual(row.payload_blob_present, true, 'payload_blob should have been persisted'); + ok(typeof row.payload_hash === 'string' && /^[0-9a-f]{64}$/i.test(row.payload_hash), 'payload_hash should be a sha256 hex string'); + ok(typeof row.payload_size === 'number' && row.payload_size > 0, 'payload_size should be a positive integer'); + ok(typeof row.started_at === 'number' && row.started_at > 0, 'started_at should be set'); + ok(typeof row.completed_at === 'number' && row.completed_at >= row.started_at, 'completed_at should be >= started_at'); + }); + + test('list_deployments surfaces the row, supports project filter', async () => { + const project = 'tracking-test-application'; + const listed = await callOperation(ctx, { operation: 'list_deployments', project }); + strictEqual(listed.status, 200); + ok(listed.body.total >= 1, `expected at least 1 deployment, got ${listed.body.total}`); + const ids = listed.body.deployments.map((d: any) => d.deployment_id); + ok(deploymentId && ids.includes(deploymentId), `listed deployments should include ${deploymentId}`); + // blob bytes must NOT travel back in the list response — only the presence boolean. + ok(!('payload_blob' in listed.body.deployments[0]), 'list_deployments must not include payload_blob bytes'); + ok('payload_blob_present' in listed.body.deployments[0], 'list_deployments should include payload_blob_present flag'); + }); + + test('a failed deploy is recorded with status=failed and error.message', async () => { + const project = 'broken-tracking-application'; + const brokenDir = mkdtempSync(join(tmpdir(), 'broken-fixture-')); + try { + // A package.json so install runs; a guaranteed-failing install_command forces the + // deploy lifecycle into its catch block. Recorder.finish('failed', err) must still + // commit a row with status=failed and a populated error.message. + writeFileSync( + join(brokenDir, 'package.json'), + JSON.stringify({ name: project, version: '0.0.0', dependencies: {} }) + ); + writeFileSync(join(brokenDir, 'config.yaml'), 'rest: true\n'); + const multipart = buildMultipartBody( + { + operation: 'deploy_component', + project, + restart: false, + install_command: 'sh -c "exit 1"', + install_timeout: 30_000, + }, + { + name: 'payload', + filename: 'package.tar.gz', + contentType: 'application/gzip', + stream: streamPackagedDirectory(brokenDir, { skip_node_modules: true }), + } + ); + const url = new URL(ctx.harper.operationsAPIURL); + const response = await postMultipart(url, multipart.contentType, multipart.stream, ctx.harper.admin); + // The HTTP response will be non-200 (error), but the row must still exist. + ok(response.status >= 400, `expected an error response, got ${response.status}: ${response.body}`); + + await sleep(200); + + const listed = await callOperation(ctx, { operation: 'list_deployments', project }); + strictEqual(listed.status, 200); + const failed = listed.body.deployments.find((d: any) => d.project === project); + ok(failed, `expected to find a deployment for ${project} in list`); + strictEqual(failed.status, 'failed'); + ok(failed.error && typeof failed.error.message === 'string' && failed.error.message.length > 0, 'failed deployment should have error.message'); + } finally { + try { + rmSync(brokenDir, { recursive: true, force: true }); + } catch {} + } + }); +}); diff --git a/json/systemSchema.json b/json/systemSchema.json index 19439c1ce..f060f1da0 100644 --- a/json/systemSchema.json +++ b/json/systemSchema.json @@ -369,5 +369,69 @@ "attribute": "composite_id" } ] + }, + "hdb_deployment": { + "hash_attribute": "deployment_id", + "name": "hdb_deployment", + "schema": "system", + "attributes": [ + { + "attribute": "deployment_id" + }, + { + "attribute": "project" + }, + { + "attribute": "package_identifier" + }, + { + "attribute": "payload_hash" + }, + { + "attribute": "payload_size" + }, + { + "attribute": "payload_blob" + }, + { + "attribute": "status" + }, + { + "attribute": "phase" + }, + { + "attribute": "event_log" + }, + { + "attribute": "peer_results" + }, + { + "attribute": "origin_node" + }, + { + "attribute": "restart_mode" + }, + { + "attribute": "started_at" + }, + { + "attribute": "completed_at" + }, + { + "attribute": "user" + }, + { + "attribute": "rollback_of" + }, + { + "attribute": "error" + }, + { + "attribute": "__createdtime__" + }, + { + "attribute": "__updatedtime__" + } + ] } } diff --git a/server/serverHelpers/serverUtilities.ts b/server/serverHelpers/serverUtilities.ts index b0095b52b..b6637a062 100644 --- a/server/serverHelpers/serverUtilities.ts +++ b/server/serverHelpers/serverUtilities.ts @@ -36,6 +36,7 @@ import type { OperationRequest, OperationRequestBody } from '../operationsServer import type { Context } from '../../resources/ResourceInterface.ts'; import * as status from '../status/index.ts'; import * as regDeprecated from '../../resources/registrationDeprecated.ts'; +import * as deploymentOperations from '../../components/deploymentOperations.ts'; const pSearchSearch = util.promisify(search.search); let pEvaluateSql: (sql: string) => Promise; @@ -446,6 +447,14 @@ function initializeOperationFunctionMap(): Map attribute === schema.hash_attribute); + if (primaryKeyAttribute) primaryKeyAttribute.isPrimaryKey = true; + createTable.audit = true; + + await bridge.createTable(DEPLOYMENT_TABLE, createTable); +} + +const directive520 = { + version: '5.2.0', + sync_functions: [] as Array<() => unknown>, + async_functions: [createHdbDeploymentIfMissing] as Array<() => Promise>, +}; + +export default [directive520]; diff --git a/upgrade/directives/directivesController.ts b/upgrade/directives/directivesController.ts index 2aabae041..9f5c9da62 100644 --- a/upgrade/directives/directivesController.ts +++ b/upgrade/directives/directivesController.ts @@ -9,10 +9,17 @@ import * as hdbUtils from '../../utility/common_utils.ts'; import * as hdbTerms from '../../utility/hdbTerms.ts'; import hdbLog from '../../utility/logging/harper_logger.ts'; +import directive520 from './5-2-0.ts'; const { DATA_VERSION, UPGRADE_VERSION } = hdbTerms.UPGRADE_JSON_FIELD_NAMES_ENUM as any; let versions: any = new Map(); +// All directive modules export an array of { version, sync_functions, async_functions }. +// New directives must be imported above and registered here. +for (const directive of directive520) { + versions.set(directive.version, directive); +} + /** * Returns all HDB versions w/ upgrade directives * Note: this does NOT return a list of all versions of HDB diff --git a/utility/hdbTerms.ts b/utility/hdbTerms.ts index 0c65031b0..21fbc86a7 100644 --- a/utility/hdbTerms.ts +++ b/utility/hdbTerms.ts @@ -177,6 +177,7 @@ export const SYSTEM_TABLE_NAMES = { TABLE_TABLE_NAME: 'hdb_table', USER_TABLE_NAME: 'hdb_user', INFO_TABLE_NAME: 'hdb_info', + DEPLOYMENT_TABLE_NAME: 'hdb_deployment', } as const; /** Hash attribute for the system info table */ @@ -297,6 +298,10 @@ export const OPERATIONS_ENUM = { GET_STATUS: 'get_status', SET_STATUS: 'set_status', CLEAR_STATUS: 'clear_status', + LIST_DEPLOYMENTS: 'list_deployments', + GET_DEPLOYMENT: 'get_deployment', + GET_DEPLOYMENT_PAYLOAD: 'get_deployment_payload', + DELETE_DEPLOYMENT_PAYLOAD: 'delete_deployment_payload', } as const; /** Defines valid file types that we are able to handle in 'import_from_s3' ops */ From bacdd541f47bcc182da428f0b3ea3656f0933bcb Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 20 May 2026 18:52:48 -0600 Subject: [PATCH 02/10] fix(deploy): address cross-model review findings for Slice A - Skip recording on replicated executions: peer nodes receiving deploy_component via replicateOperation already have req._deploymentId set by origin, so they no longer spin up a fresh recorder + UUID + row. Prevents N duplicate rows per N-node cluster. - Cap payload at 200 MiB while Slice A buffers in memory. Throws a clear ClientError pointing users at the package-identifier path or Slice B's streaming variant. - Register list_deployments and get_deployment in utility/operation_authorization.ts. Pattern matches get_components: requires_su=true with the operation enum as the named exception so a role can be granted it without SU rights (per the design's permission model). - Add "audit": true to hdb_deployment in systemSchema.json so fresh installs match the audit setting the 5-2-0 upgrade directive applies. - Drop two now-unused imports (Transform from recorder, existsSync from test). - Auto-format pass via npm run format:write. Co-Authored-By: Claude Opus 4.7 (1M context) --- components/deploymentOperations.ts | 7 +++- components/deploymentRecorder.ts | 37 +++++++++++++++-- components/operations.js | 40 ++++++++++++------- .../deploy/deploy-tracking.test.ts | 29 +++++++++----- json/systemSchema.json | 1 + utility/operation_authorization.ts | 9 +++++ 6 files changed, 94 insertions(+), 29 deletions(-) diff --git a/components/deploymentOperations.ts b/components/deploymentOperations.ts index a394398d2..9c7e7aff7 100644 --- a/components/deploymentOperations.ts +++ b/components/deploymentOperations.ts @@ -47,7 +47,8 @@ export async function handleListDeployments(req: ListRequest = {}): Promise<{ de const conditions: any[] = []; if (req.project) conditions.push({ attribute: 'project', value: req.project }); if (req.status) conditions.push({ attribute: 'status', value: req.status }); - if (req.since != null) conditions.push({ attribute: 'started_at', value: req.since, comparator: 'greater_than_equal' }); + if (req.since != null) + conditions.push({ attribute: 'started_at', value: req.since, comparator: 'greater_than_equal' }); if (req.until != null) conditions.push({ attribute: 'started_at', value: req.until, comparator: 'less_than_equal' }); const collected: any[] = []; @@ -55,7 +56,9 @@ export async function handleListDeployments(req: ListRequest = {}): Promise<{ de collected.push(stripBlob(row)); } // Newest first by started_at; ties broken by deployment_id for stability. - collected.sort((a, b) => (b.started_at ?? 0) - (a.started_at ?? 0) || String(a.deployment_id).localeCompare(b.deployment_id)); + collected.sort( + (a, b) => (b.started_at ?? 0) - (a.started_at ?? 0) || String(a.deployment_id).localeCompare(b.deployment_id) + ); const total = collected.length; const offset = Math.max(0, req.offset ?? 0); diff --git a/components/deploymentRecorder.ts b/components/deploymentRecorder.ts index 2229da598..fc221e753 100644 --- a/components/deploymentRecorder.ts +++ b/components/deploymentRecorder.ts @@ -10,14 +10,29 @@ import { randomUUID } from 'node:crypto'; import { createHash, Hash } from 'node:crypto'; -import { Transform } from 'node:stream'; import type { Readable } from 'node:stream'; import { databases } from '../resources/databases.ts'; import { createBlob } from '../resources/blob.ts'; import * as terms from '../utility/hdbTerms.ts'; +import { ClientError } from '../utility/errors/hdbError.ts'; import { hostname } from 'node:os'; -type DeploymentStatus = 'pending' | 'extracting' | 'installing' | 'loading' | 'replicating' | 'restarting' | 'success' | 'failed' | 'rolled_back'; +// Slice A buffers the entire payload in memory before computing the hash and persisting. +// This cap prevents an OOM on accidentally-huge uploads while Slice B is in flight. Slice B +// replaces the buffer with a streaming hash + Blob-source pattern that lifts this limit +// back to whatever the replication path supports. +const SLICE_A_PAYLOAD_LIMIT_BYTES = 200 * 1024 * 1024; + +type DeploymentStatus = + | 'pending' + | 'extracting' + | 'installing' + | 'loading' + | 'replicating' + | 'restarting' + | 'success' + | 'failed' + | 'rolled_back'; interface CreateOptions { project?: string; @@ -88,11 +103,27 @@ export class DeploymentRecorder { buffer = Buffer.from(source, 'base64'); } else { const chunks: Buffer[] = []; + let collected = 0; for await (const chunk of source as AsyncIterable) { - chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as any)); + const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as any); + collected += buf.length; + if (collected > SLICE_A_PAYLOAD_LIMIT_BYTES) { + (source as Readable).destroy?.(); + throw new ClientError( + `Deploy payload exceeds Slice A's interim ${SLICE_A_PAYLOAD_LIMIT_BYTES} byte cap. ` + + `Use a package identifier (npm:/file:/git:) or wait for Slice B's streaming path.` + ); + } + chunks.push(buf); } buffer = Buffer.concat(chunks); } + if (buffer.length > SLICE_A_PAYLOAD_LIMIT_BYTES) { + throw new ClientError( + `Deploy payload (${buffer.length} bytes) exceeds Slice A's interim ${SLICE_A_PAYLOAD_LIMIT_BYTES} byte cap. ` + + `Use a package identifier (npm:/file:/git:) or wait for Slice B's streaming path.` + ); + } hash.update(buffer); byteCount = buffer.length; this.record.payload_blob = createBlob(buffer, { type: 'application/gzip' }); diff --git a/components/operations.js b/components/operations.js index 63a90068b..0f5cb9540 100644 --- a/components/operations.js +++ b/components/operations.js @@ -390,21 +390,29 @@ async function deployComponent(req) { // observable and auditable even if the CLI disconnects. The row also holds the payload // in a Blob attribute — Slice B will use that for peer delivery; for now it's the // audit record and the rollback source. - const recorder = await DeploymentRecorder.create({ - project: req.project, - package_identifier: req.package ?? null, - user: req.hdb_user?.username, - restart_mode: req.restart === 'rolling' ? 'rolling' : req.restart ? 'immediate' : null, - }); - req._deploymentId = recorder.deploymentId; + // + // Only the origin node records — peers receiving a replicated deploy_component skip + // recording so we don't accumulate one row per node for the same deploy. The row will + // reach peers via the table's replication once Slice B has them consume it. + const isReplicatedExecution = typeof req._deploymentId === 'string'; + const recorder = isReplicatedExecution + ? null + : await DeploymentRecorder.create({ + project: req.project, + package_identifier: req.package ?? null, + user: req.hdb_user?.username, + restart_mode: req.restart === 'rolling' ? 'rolling' : req.restart ? 'immediate' : null, + }); + if (recorder) req._deploymentId = recorder.deploymentId; let extractionPayload = req.payload; try { - // If a tarball came in (Buffer or Readable from the multipart parser), tee it through - // a hash-and-size tap into the row's payload_blob, then re-source extraction from the - // persisted blob. This means we read the upload exactly once into local storage; the - // blob is the staging area and (in Slice B) the channel peers will replicate from. - if (req.payload != null) { + // On the origin, tee the tarball (Buffer or Readable from the multipart parser) + // through a hash-and-size tap into the row's payload_blob, then re-source extraction + // from the persisted blob. This is the staging area and (in Slice B) the channel + // peers will replicate from. On peer nodes we skip recording entirely and use the + // raw payload as-is. + if (recorder && req.payload != null) { await recorder.ingestPayload(req.payload); extractionPayload = recorder.row.payload_blob.stream(); } @@ -462,11 +470,13 @@ async function deployComponent(req) { response.message = `Successfully deployed: ${application.name}, restarting Harper`; } else response.message = `Successfully deployed: ${application.name}`; - response.deployment_id = recorder.deploymentId; - await recorder.finish('success'); + if (recorder) { + response.deployment_id = recorder.deploymentId; + await recorder.finish('success'); + } return response; } catch (err) { - await recorder.finish('failed', err); + if (recorder) await recorder.finish('failed', err); throw err; } } diff --git a/integrationTests/deploy/deploy-tracking.test.ts b/integrationTests/deploy/deploy-tracking.test.ts index 6be5917ba..021f1c702 100644 --- a/integrationTests/deploy/deploy-tracking.test.ts +++ b/integrationTests/deploy/deploy-tracking.test.ts @@ -9,7 +9,7 @@ import { suite, test, before, after } from 'node:test'; import { ok, strictEqual } from 'node:assert/strict'; import { join } from 'node:path'; -import { existsSync, mkdtempSync, mkdirSync, writeFileSync, rmSync } from 'node:fs'; +import { mkdtempSync, mkdirSync, writeFileSync, rmSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { setTimeout as sleep } from 'node:timers/promises'; import { request } from 'node:http'; @@ -37,7 +37,7 @@ function postMultipart( headers: { 'Content-Type': contentType, 'Transfer-Encoding': 'chunked', - Authorization: 'Basic ' + Buffer.from(`${auth.username}:${auth.password}`).toString('base64'), + 'Authorization': 'Basic ' + Buffer.from(`${auth.username}:${auth.password}`).toString('base64'), }, }, (res) => { @@ -58,11 +58,10 @@ async function callOperation( op: Record ): Promise<{ status: number; body: any }> { const url = new URL(ctx.harper.operationsAPIURL); - const auth = - 'Basic ' + Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64'); + const auth = 'Basic ' + Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64'); const res = await fetch(url, { method: 'POST', - headers: { 'Content-Type': 'application/json', Authorization: auth }, + headers: { 'Content-Type': 'application/json', 'Authorization': auth }, body: JSON.stringify(op), }); const text = await res.text(); @@ -133,10 +132,16 @@ suite('Deployment tracking', (ctx: ContextWithHarper) => { strictEqual(row.project, project); strictEqual(row.status, 'success'); strictEqual(row.payload_blob_present, true, 'payload_blob should have been persisted'); - ok(typeof row.payload_hash === 'string' && /^[0-9a-f]{64}$/i.test(row.payload_hash), 'payload_hash should be a sha256 hex string'); + ok( + typeof row.payload_hash === 'string' && /^[0-9a-f]{64}$/i.test(row.payload_hash), + 'payload_hash should be a sha256 hex string' + ); ok(typeof row.payload_size === 'number' && row.payload_size > 0, 'payload_size should be a positive integer'); ok(typeof row.started_at === 'number' && row.started_at > 0, 'started_at should be set'); - ok(typeof row.completed_at === 'number' && row.completed_at >= row.started_at, 'completed_at should be >= started_at'); + ok( + typeof row.completed_at === 'number' && row.completed_at >= row.started_at, + 'completed_at should be >= started_at' + ); }); test('list_deployments surfaces the row, supports project filter', async () => { @@ -148,7 +153,10 @@ suite('Deployment tracking', (ctx: ContextWithHarper) => { ok(deploymentId && ids.includes(deploymentId), `listed deployments should include ${deploymentId}`); // blob bytes must NOT travel back in the list response — only the presence boolean. ok(!('payload_blob' in listed.body.deployments[0]), 'list_deployments must not include payload_blob bytes'); - ok('payload_blob_present' in listed.body.deployments[0], 'list_deployments should include payload_blob_present flag'); + ok( + 'payload_blob_present' in listed.body.deployments[0], + 'list_deployments should include payload_blob_present flag' + ); }); test('a failed deploy is recorded with status=failed and error.message', async () => { @@ -190,7 +198,10 @@ suite('Deployment tracking', (ctx: ContextWithHarper) => { const failed = listed.body.deployments.find((d: any) => d.project === project); ok(failed, `expected to find a deployment for ${project} in list`); strictEqual(failed.status, 'failed'); - ok(failed.error && typeof failed.error.message === 'string' && failed.error.message.length > 0, 'failed deployment should have error.message'); + ok( + failed.error && typeof failed.error.message === 'string' && failed.error.message.length > 0, + 'failed deployment should have error.message' + ); } finally { try { rmSync(brokenDir, { recursive: true, force: true }); diff --git a/json/systemSchema.json b/json/systemSchema.json index f060f1da0..c7bfaa29c 100644 --- a/json/systemSchema.json +++ b/json/systemSchema.json @@ -374,6 +374,7 @@ "hash_attribute": "deployment_id", "name": "hdb_deployment", "schema": "system", + "audit": true, "attributes": [ { "attribute": "deployment_id" diff --git a/utility/operation_authorization.ts b/utility/operation_authorization.ts index e54ba6ed9..ee768a049 100644 --- a/utility/operation_authorization.ts +++ b/utility/operation_authorization.ts @@ -40,6 +40,7 @@ import PermissionResponseObject from '../security/data_objects/PermissionRespons import { handleHDBError, hdbErrors } from '../utility/errors/hdbError.ts'; import * as regDeprecated from '../resources/registrationDeprecated.ts'; +import * as deploymentOperations from '../components/deploymentOperations.ts'; const requiredPermissions = new Map(); const DELETE_PERM = 'delete'; @@ -241,6 +242,14 @@ requiredPermissions.set(functionsOperations.addComponent.name, new (permission a requiredPermissions.set(functionsOperations.dropCustomFunctionProject.name, new (permission as any)(true, [])); requiredPermissions.set(functionsOperations.packageComponent.name, new (permission as any)(true, [])); requiredPermissions.set(functionsOperations.deployComponent.name, new (permission as any)(true, [])); +requiredPermissions.set( + deploymentOperations.handleListDeployments.name, + new (permission as any)(true, [], terms.OPERATIONS_ENUM.LIST_DEPLOYMENTS) +); +requiredPermissions.set( + deploymentOperations.handleGetDeployment.name, + new (permission as any)(true, [], terms.OPERATIONS_ENUM.GET_DEPLOYMENT) +); //Below are functions that are currently open to all roles requiredPermissions.set(regDeprecated.getRegistrationInfo.name, new (permission as any)(false, [])); From 5224d3fa03735ae0288c584058adf8a653e95ad1 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 20 May 2026 18:55:16 -0600 Subject: [PATCH 03/10] docs(design): document blob/put streaming race + system-table bootstrap pattern Two non-obvious findings from #641 Slice A that future agents should know: 1. createBlob(readable) + table.put() doesn't synchronously drain the source. The blob's saveBlob runs concurrently; calling hash.digest() after the put resolves can race a still-flushing Transform and throw ERR_CRYPTO_HASH_FINALIZED. 2. Adding a new system table requires three changes: systemSchema.json (fresh installs), SYSTEM_TABLE_NAMES (constant), and an upgrade directive registered in directivesController.ts. The directive shape was undocumented after the .ts refactor cleared old directives. Co-Authored-By: Claude Opus 4.7 (1M context) --- DESIGN.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/DESIGN.md b/DESIGN.md index af2447a86..cb6a6a267 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -63,3 +63,27 @@ The cross-thread subscription path (default `crossThreads`) drives every `Table. - **`databaseSubscriptions.activeCount`** is the count of live `Subscription` instances on a database. It is incremented at the end of `addSubscription` (after the Subscription is created, so the `scope: 'full-database'` early-return path correctly skips counting) and decremented in `Subscription.end()`. `notifyFromTransactionData` short-circuits when this is zero — the reusable rocksdb iterator stays put and resumes from its position the next time a subscriber arrives. Without this short-circuit, an idle database with no subscribers still pays the audit-log iteration cost on every commit during replication backlog catch-up. - **`notifyScheduled` + `setImmediate`** in the `'committed'` listener defers the iteration off the commit microtask. Multiple `'committed'` events that land in the same event-loop turn collapse into one notify pass. `notifyScheduled` stays set for the entire drain — including across yield-and-resume turns — so a re-entry from a new `'committed'` event cannot spawn a second concurrent notify on the same iterator. - **Batched yielding** in `notifyFromTransactionData` (`NOTIFY_BATCH_SIZE`) is gated by `allowYield`. The `'committed'` path passes `allowYield = true`; the `listenToCommits` (same-thread `aftercommit`) path does not, because that path holds an inter-thread `'thread-local-writes'` lock that must not span event-loop turns. `subscribersWithTxns` is carried across yields via `subscriptions.pendingTxnSubscribers` so the `end_txn` signal fires exactly once when the iterator truly drains. When `activeCount` drops to zero mid-yield, the next continuation drops the carry-over to avoid invoking ended subscribers' listeners. + +## `createBlob(readable)` and `table.put()` don't synchronously drain the source + +When a blob attribute is created from a Node `Readable` (e.g. `createBlob(stream)` then `row.payload_blob = blob; await table.put(row)`), the put does **not** wait for the underlying stream to fully drain into the file before resolving. Internally `saveBlob` kicks off a `writeBlobWithStream` pipeline whose `storageInfo.saving` promise is tracked separately. The put resolves once encoding has captured the blob reference; the bytes finish writing concurrently. + +Consequence for callers that wrap the source in a hashing `Transform`: calling `hash.digest('hex')` after `await table.put()` is unsafe — more `chunk.update()` calls can still fire as the stream drains, producing `Error [ERR_CRYPTO_HASH_FINALIZED]: Digest already called`. Options: + +- Buffer first, then hash + put (what `components/deploymentRecorder.ts` does for Slice A — small payloads only). +- Hash via Transform while extraction reads the stream, and only finalize the hash on the Transform's `'end'` event before any second put with the final hash. +- Await `storageInfo.saving` directly if you have a handle to the FileBackedBlob (the cleanest path for streaming). + +Future agents touching `components/deploymentRecorder.ts` for Slice B's streaming variant should pick one of the latter two patterns. + +## System table bootstrap: `systemSchema.json` + upgrade directive + +Adding a new system table (e.g. `hdb_deployment` in #641 Slice A) requires three changes: + +1. **`json/systemSchema.json`** — the table entry. Fresh installs auto-create it via `utility/mount_hdb.ts:createTables()`, which iterates `Object.keys(systemSchema)` on first boot. +2. **`utility/hdbTerms.ts`** — add the table name to `SYSTEM_TABLE_NAMES`. +3. **`upgrade/directives/.ts`** — provisions the table on existing installs that already have a system schema. Registered in `upgrade/directives/directivesController.ts` (which is otherwise empty — its `versions` Map gets populated by these imports). The directive shape is `{ version, sync_functions, async_functions }`; copy `5-2-0.ts` for the canonical pattern (uses `bridge.createTable` to match what `mount_hdb` does on a fresh install). + +System tables replicate by default. To opt out, add the name to `NON_REPLICATING_SYSTEM_TABLES` in `resources/databases.ts`. The check happens after table init and sets `table.replicate = false` per-node. + +If the table needs `audit: true`, set it both in the schema (for fresh installs) **and** on the `CreateTableObject` instance in the directive (for upgrades) — otherwise the two paths diverge. From 6f20f088363b5ace8119a797359926674865f609 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 20 May 2026 21:28:24 -0600 Subject: [PATCH 04/10] =?UTF-8?q?feat(deploy):=20Slice=20B1=20=E2=80=94=20?= =?UTF-8?q?live=20SSE=20progress=20+=20event=5Flog=20on=20hdb=5Fdeployment?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the ProgressEmitter (resurrected from the paused #531) into the new DeploymentRecorder so every deploy_component lifecycle phase is captured on the row's event_log AND streamable live via SSE. Same content-negotiated branch serves get_deployment, letting Studio (or any client) replay a deploy's history and tail in-flight events through a single endpoint. What's new - DeploymentRecorder subscribes to a ProgressEmitter and coalesces writes: every emit appends to a bounded event_log (200 cap, head+tail retention so the lifecycle spine survives a noisy install); chained puts collapse a burst into one round trip. Emits a `_recorder_finished` sentinel on finish() so SSE tailers terminate cleanly even on crash paths. - deployComponent emits prepare/load/replicate/restart/success phase events around their respective steps. Strips req.progress before replicateOperation so peers see a clean payload. Skips recording entirely on replicated (peer-side) executions — origin owns the canonical row. - An in-memory activeEmitters Map keyed by deployment_id lets get_deployment SSE locate the live emitter and tail it. - handlePostRequest gains a content-negotiated SSE branch (req.headers.accept includes text/event-stream + op in SSE_PROGRESS_OPERATIONS). Prime write on the PassThrough so Fastify starts piping immediately — empirically Fastify buffers a returned Readable until end-of-stream without it, collapsing all intermediate writes into a single flush. - get_deployment with SSE subscribes to the live emitter BEFORE reading the row, then replays the historical event_log and dedupes by timestamp so no event is lost in the stitching gap. A polling fallback resolves the SSE promise even if the deploy disappears without signaling a terminal event. - CLI sends Accept: text/event-stream for deploy_component; consumes the SSE response via parseSSE; renders phase/install/error events through DeployRenderer. - httpRequest gains a streamResponse option that yields the raw IncomingMessage as a Readable instead of buffering — what the SSE consumer needs. Ported from #531 (with the multi-line data spec fix, StringDecoder, and disconnect cleanup already applied earlier in the session): - server/serverHelpers/progressEmitter.ts (+ tests) - bin/sseConsumer.ts (+ tests) - bin/deployRenderer.ts (+ tests) Integration coverage: integrationTests/deploy/deploy-tracking-events.test.ts asserts event_log shape on success, SSE replay+done on get_deployment, and the failure path emits an error event into the log. Refs #641 (Slice B1). Co-Authored-By: Claude Opus 4.7 (1M context) --- bin/cliOperations.ts | 80 ++++++- bin/deployRenderer.ts | 187 ++++++++++++++++ bin/sseConsumer.ts | 126 +++++++++++ components/deploymentOperations.ts | 94 ++++++++ components/deploymentRecorder.ts | 128 ++++++++++- components/operations.js | 32 ++- .../deploy/deploy-tracking-events.test.ts | 209 ++++++++++++++++++ server/serverHelpers/progressEmitter.ts | 104 +++++++++ server/serverHelpers/serverHandlers.js | 26 +++ unitTests/bin/deployRenderer.test.js | 108 +++++++++ unitTests/bin/sseConsumer.test.js | 95 ++++++++ .../serverHelpers/progressEmitter.test.js | 125 +++++++++++ utility/common_utils.ts | 16 ++ 13 files changed, 1314 insertions(+), 16 deletions(-) create mode 100644 bin/deployRenderer.ts create mode 100644 bin/sseConsumer.ts create mode 100644 integrationTests/deploy/deploy-tracking-events.test.ts create mode 100644 server/serverHelpers/progressEmitter.ts create mode 100644 unitTests/bin/deployRenderer.test.js create mode 100644 unitTests/bin/sseConsumer.test.js create mode 100644 unitTests/server/serverHelpers/progressEmitter.test.js diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index 882a071b5..a13b1fc30 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -11,11 +11,20 @@ import * as fs from 'fs-extra'; import * as YAML from 'yaml'; import { streamPackagedDirectory } from '../components/packageComponent.ts'; import { buildMultipartBody } from './multipartBuilder.ts'; +import { parseSSE } from './sseConsumer.ts'; +import { DeployRenderer } from './deployRenderer.ts'; import { getHdbPid } from '../utility/processManagement/processManagement.js'; import { initConfig, getConfigPath } from '../config/configUtils.js'; const OP_ALIASES = { deploy: 'deploy_component', package: 'package_component' }; +// Operations whose responses should be consumed as text/event-stream so live phase events +// (prepare, load, replicate, restart) render as they happen instead of after the whole +// deploy completes. Add an operation here only after wiring its server-side +// SSE_PROGRESS_OPERATIONS entry — otherwise the server returns the buffered JSON path and +// the SSE parser sees no events. +const SSE_OPERATIONS = new Set(['deploy_component']); + // Properties on `req` that the CLI itself uses for transport/UX, not the operations API. // They never get serialized into the request body. const TRANSPORT_ONLY_FIELDS = new Set([ @@ -191,6 +200,15 @@ async function cliOperations(req: any, skipResponseLog = false) { options.headers.Authorization = `Bearer ${tokens.operation_token}`; } } + const useSse = SSE_OPERATIONS.has(req.operation); + if (useSse) { + options.headers.Accept = 'text/event-stream'; + options.streamResponse = true; + } + // One renderer owns the (future) upload bar and the SSE event rendering for a + // multipart deploy. Created here so the upload-stream tap and the SSE consumer + // below share the same instance. + const renderer = req._multipart ? new DeployRenderer({}) : null; let body; if (req._multipart) { const packageStream = req._packageStream; @@ -209,20 +227,66 @@ async function cliOperations(req: any, skipResponseLog = false) { // Use chunked transfer-encoding: we don't know the total size up front because the // payload is streamed from `tar.pack` and never fully buffered. options.headers['Transfer-Encoding'] = 'chunked'; - body = multipart.stream; + // Tap the body so bytes flowing into the HTTP request advance the upload bar. + // The renderer's Transform is identity — chunks pass through unmodified. + body = renderer ? renderer.tapUploadStream(multipart.stream) : multipart.stream; } else { body = req; } let response: any = await httpRequest(options, body); + // Upload is done by the time we get the response; tear the bar down before any SSE + // rendering so the bar and event lines don't fight for the same terminal row. + renderer?.endUpload(); + let responseData; - try { - responseData = JSON.parse(response.body); - } catch { - responseData = { - status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'), - body: response.body, - }; + if (useSse && response.headers['content-type']?.startsWith('text/event-stream')) { + // Consume SSE: render phase events live, capture the final result from the `done` + // event (or the error message from the `error` event). The HTTP status stays 200 + // until end-of-stream; failures are signaled in-band. + let finalResult; + let sseError; + for await (const message of parseSSE(response)) { + renderer?.renderEvent(message); + if (message.event === 'done') { + try { + finalResult = JSON.parse(message.data)?.result; + } catch { + finalResult = message.data; + } + } else if (message.event === 'error') { + try { + sseError = JSON.parse(message.data); + } catch { + sseError = { message: message.data }; + } + } + } + if (sseError) { + const errMsg = sseError.message ?? (typeof sseError === 'object' ? JSON.stringify(sseError) : sseError); + console.error(`error: ${errMsg}`); + process.exit(1); + } + responseData = finalResult ?? { message: 'Deploy completed (no result payload).' }; + } else { + // When useSse is true, httpRequest returns a raw IncomingMessage (streamResponse mode), + // so .body is undefined. Drain the stream to get the text (e.g. a 401 error body). + let bodyText: string; + if (useSse) { + const chunks: Buffer[] = []; + for await (const chunk of response as AsyncIterable) chunks.push(Buffer.from(chunk)); + bodyText = Buffer.concat(chunks).toString('utf8'); + } else { + bodyText = response.body; + } + try { + responseData = JSON.parse(bodyText); + } catch { + responseData = { + status: response.statusCode + ' ' + (response.statusMessage || 'Unknown'), + body: bodyText, + }; + } } let responseLog; diff --git a/bin/deployRenderer.ts b/bin/deployRenderer.ts new file mode 100644 index 000000000..cdaf5ca59 --- /dev/null +++ b/bin/deployRenderer.ts @@ -0,0 +1,187 @@ +import { Transform } from 'node:stream'; +// cli-progress is already a runtime dep of harper (see package.json); using its +// SingleBar to render the upload phase here doesn't add a new dependency. +import cliProgress from 'cli-progress'; +import type { SSEMessage } from './sseConsumer.ts'; + +interface RendererOptions { + uploadTotal?: number; + output?: NodeJS.WritableStream; +} + +interface UploadState { + bar: cliProgress.SingleBar | null; + sent: number; + textLastLogged: number; + finished: boolean; +} + +interface PhaseState { + current?: string; + installManager?: string; + installLineCount: number; +} + +/** + * Deploy-time renderer that owns the progress display across two phases: + * + * 1. Local upload — driven by `tapUploadStream`, which wraps the multipart body so we + * can update a `cli-progress` bar against the precomputed uncompressed source-tree + * total. In a non-TTY environment (CI logs, redirected output) we fall back to + * periodic text lines so logs stay grep-able. + * + * 2. Server-side phases — driven by `renderEvent`, called for each SSE message the + * CLI receives from the operations API. Phase events print one-liners; live + * `install` events (npm/pnpm/yarn stdout) are throttled to one line under a + * "[install]" header so a noisy `npm install` doesn't drown the terminal. + * + * Designed so the two phases hand off cleanly: `endUpload()` tears the bar down (so + * it doesn't compete with subsequent prints) before any SSE events render. + */ +export class DeployRenderer { + private upload: UploadState = { bar: null, sent: 0, textLastLogged: 0, finished: false }; + private phase: PhaseState = { installLineCount: 0 }; + private output: NodeJS.WritableStream; + private isTty: boolean; + private uploadTotal: number; + + constructor(options: RendererOptions = {}) { + this.output = options.output ?? process.stderr; + // Only render a bar when stderr is a real terminal. CI runners, log redirection, + // and pipes look identical from Node's perspective: !isTTY. + this.isTty = Boolean((this.output as NodeJS.WriteStream).isTTY); + this.uploadTotal = options.uploadTotal ?? 0; + } + + /** + * Wrap an outbound stream so each byte flowing through it advances the upload bar. + * The Transform is identity — chunks pass through unmodified. + */ + tapUploadStream(stream: T): NodeJS.ReadableStream { + this.upload.bar = this.isTty + ? new cliProgress.SingleBar( + { + format: 'Uploading [{bar}] {percentage}% | {value}/{total} bytes', + barCompleteChar: '█', + barIncompleteChar: '░', + hideCursor: true, + stream: this.output, + etaBuffer: 50, + }, + cliProgress.Presets.shades_classic + ) + : null; + this.upload.bar?.start(this.uploadTotal || 1, 0); + + const counter = new Transform({ + transform: (chunk, _enc, cb) => { + this.upload.sent += chunk.length; + this.tickUpload(); + cb(null, chunk); + }, + flush: (cb) => { + this.endUpload(); + cb(); + }, + }); + stream.on('error', (err) => counter.destroy(err)); + stream.pipe(counter); + return counter; + } + + endUpload(): void { + if (this.upload.finished) return; + this.upload.finished = true; + if (this.upload.bar) { + // Snap to total so the bar shows 100% even when our uncompressed-total estimate + // is slightly off (gzip output is usually smaller than the source tree). + if (this.uploadTotal > 0) this.upload.bar.update(this.uploadTotal); + this.upload.bar.stop(); + this.upload.bar = null; + } else { + this.output.write(`Upload complete (${formatBytes(this.upload.sent)})\n`); + } + } + + private tickUpload(): void { + if (this.upload.bar) { + this.upload.bar.update(this.upload.sent); + return; + } + // Non-TTY: log a line every 10% of the total (or every 5MB if total unknown). + const step = this.uploadTotal > 0 ? this.uploadTotal / 10 : 5 * 1024 * 1024; + if (this.upload.sent - this.upload.textLastLogged >= step) { + this.upload.textLastLogged = this.upload.sent; + const pct = this.uploadTotal > 0 ? Math.min(100, Math.floor((this.upload.sent / this.uploadTotal) * 100)) : null; + this.output.write( + pct !== null + ? `Uploaded ${formatBytes(this.upload.sent)} / ~${formatBytes(this.uploadTotal)} (${pct}%)\n` + : `Uploaded ${formatBytes(this.upload.sent)}\n` + ); + } + } + + renderEvent(message: SSEMessage): void { + let parsed: unknown; + try { + parsed = JSON.parse(message.data); + } catch { + parsed = message.data; + } + switch (message.event) { + case 'phase': + this.renderPhase(parsed as { phase?: string; status?: string; message?: string }); + break; + case 'install': + this.renderInstall(parsed as { manager?: string; stream?: string; line?: string }); + break; + case 'error': { + const e = parsed as { message?: string; code?: string | number }; + this.output.write(`error: ${e.message ?? message.data}${e.code ? ` (${e.code})` : ''}\n`); + break; + } + case 'done': + // Caller picks up final result via the SSE iterator; nothing to render here. + break; + } + } + + private renderPhase(data: { phase?: string; status?: string; message?: string }): void { + const label = data.phase ?? '?'; + if (data.status === 'start') { + if (this.phase.current !== label) { + this.output.write(`${label}…\n`); + this.phase.current = label; + this.phase.installLineCount = 0; + } + } else if (data.status === 'done') { + if (label === 'install' && this.phase.installLineCount > 0) { + this.output.write(`install done (${this.phase.installLineCount} log lines)\n`); + } else { + this.output.write(`${label} done\n`); + } + } else if (data.status === 'error') { + this.output.write(`${label} ERROR: ${data.message ?? 'failed'}\n`); + } + } + + private renderInstall(data: { manager?: string; stream?: string; line?: string }): void { + const line = (data.line ?? '').trimEnd(); + if (!line) return; + if (data.manager && data.manager !== this.phase.installManager) { + this.phase.installManager = data.manager; + this.output.write(`install: using ${data.manager}\n`); + } + this.phase.installLineCount++; + // Prefix with stream so users can distinguish stderr noise from stdout warnings. + const tag = data.stream === 'stderr' ? '!' : '|'; + this.output.write(` ${tag} ${line}\n`); + } +} + +function formatBytes(bytes: number): string { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KiB`; + if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MiB`; + return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GiB`; +} diff --git a/bin/sseConsumer.ts b/bin/sseConsumer.ts new file mode 100644 index 000000000..b89485f15 --- /dev/null +++ b/bin/sseConsumer.ts @@ -0,0 +1,126 @@ +import { StringDecoder } from 'node:string_decoder'; +import type { Readable } from 'node:stream'; + +export interface SSEMessage { + event: string; + data: string; + id?: string; + retry?: number; +} + +/** + * Parse a Readable carrying Server-Sent Events into structured messages. + * + * Yields one `SSEMessage` per blank-line-terminated record. Handles split data: lines, + * CRLF or LF line endings, and arbitrary chunk boundaries — the underlying Node http + * Readable does not guarantee chunks align with SSE record boundaries. + */ +export async function* parseSSE(stream: Readable): AsyncGenerator { + const decoder = new StringDecoder('utf8'); + let buffer = ''; + for await (const chunk of stream) { + buffer += typeof chunk === 'string' ? chunk : decoder.write(chunk); + while (true) { + const recordEnd = buffer.indexOf('\n\n'); + const crlfEnd = buffer.indexOf('\r\n\r\n'); + let endIdx = -1; + let delimLen = 0; + if (recordEnd !== -1 && (crlfEnd === -1 || recordEnd < crlfEnd)) { + endIdx = recordEnd; + delimLen = 2; + } else if (crlfEnd !== -1) { + endIdx = crlfEnd; + delimLen = 4; + } + if (endIdx === -1) break; + const record = buffer.slice(0, endIdx); + buffer = buffer.slice(endIdx + delimLen); + const msg = parseRecord(record); + if (msg) yield msg; + } + } + buffer += decoder.end(); + // Any trailing record without a terminating blank line is treated as a final message, + // matching the looser behavior browsers exhibit on connection close. + if (buffer.trim()) { + const msg = parseRecord(buffer); + if (msg) yield msg; + } +} + +function parseRecord(record: string): SSEMessage | null { + const lines = record.split(/\r?\n/); + let event = 'message'; + let id: string | undefined; + let retry: number | undefined; + const dataLines: string[] = []; + for (const line of lines) { + if (line === '' || line.startsWith(':')) continue; + const colon = line.indexOf(':'); + const field = colon === -1 ? line : line.slice(0, colon); + // Per spec, a leading space after the colon is stripped. + let value = colon === -1 ? '' : line.slice(colon + 1); + if (value.startsWith(' ')) value = value.slice(1); + switch (field) { + case 'event': + event = value; + break; + case 'data': + dataLines.push(value); + break; + case 'id': + id = value; + break; + case 'retry': { + const n = Number(value); + if (Number.isFinite(n)) retry = n; + break; + } + } + } + if (dataLines.length === 0 && event === 'message') return null; + return { event, data: dataLines.join('\n'), id, retry }; +} + +interface RenderState { + currentPhase?: string; +} + +/** + * Render SSE deploy events as terse, line-oriented progress to stderr (so stdout stays + * reserved for the final JSON/YAML response document). Phase transitions print once. + */ +export function renderDeployProgress(message: SSEMessage, state: RenderState, output: NodeJS.WritableStream): void { + let parsed: unknown; + try { + parsed = JSON.parse(message.data); + } catch { + parsed = message.data; + } + switch (message.event) { + case 'phase': { + const p = parsed as { phase?: string; status?: string; rolling?: boolean }; + const label = p.phase ?? '?'; + if (p.status === 'start') { + if (state.currentPhase !== label) { + output.write(`${label}…\n`); + state.currentPhase = label; + } + } else if (p.status === 'done') { + output.write(`${label} done\n`); + } else if (p.status === 'error') { + const msg = (parsed as { message?: string }).message ?? 'failed'; + output.write(`${label} ERROR: ${msg}\n`); + } + break; + } + case 'error': { + const e = parsed as { message?: string; code?: string | number }; + output.write(`error: ${e.message ?? message.data}${e.code ? ` (${e.code})` : ''}\n`); + break; + } + case 'done': + // Caller picks up the final result via the SSE iterator; nothing to render here. + break; + } +} diff --git a/components/deploymentOperations.ts b/components/deploymentOperations.ts index 9c7e7aff7..418d1ad33 100644 --- a/components/deploymentOperations.ts +++ b/components/deploymentOperations.ts @@ -6,8 +6,11 @@ import { databases } from '../resources/databases.ts'; import * as terms from '../utility/hdbTerms.ts'; import { ClientError } from '../utility/errors/hdbError.ts'; +import { getActiveEmitter } from './deploymentRecorder.ts'; +import type { ProgressEmitter } from '../server/serverHelpers/progressEmitter.ts'; const DEPLOYMENT_TABLE = terms.SYSTEM_TABLE_NAMES.DEPLOYMENT_TABLE_NAME; +const TERMINAL_STATUSES = new Set(['success', 'failed', 'rolled_back']); interface ListRequest { project?: string; @@ -20,6 +23,8 @@ interface ListRequest { interface GetRequest { deployment_id: string; + // Set by serverHandlers.js when the client asks for `Accept: text/event-stream`. + progress?: ProgressEmitter; } function deploymentTable() { @@ -75,5 +80,94 @@ export async function handleGetDeployment(req: GetRequest): Promise { if (!row) { throw new ClientError(`No deployment found with id '${req.deployment_id}'`); } + + // SSE content-negotiated branch — when serverHandlers.js detects + // `Accept: text/event-stream` it attaches a ProgressEmitter as req.progress and wraps + // our return as the operation's final SSE event. We replay event_log on connect, then + // tail the deployment's live emitter (if it's still running on this node) until it + // reaches a terminal status. The final return value becomes the SSE `done` event. + if (req.progress) { + const sse = req.progress; + const liveEmitter = getActiveEmitter(req.deployment_id); + + // Subscribe FIRST, before reading the row, so any event the recorder emits between + // "now" and the moment we finish reading the historical log still lands. Buffer + // those live events; dedupe by recording the timestamp of the last replayed entry + // and skipping any live event whose timestamp is <= that. Forward everything else. + let lastReplayedTs = 0; + let resolveLive: (() => void) | null = null; + let liveDone = false; + const liveBuffer: Array<{ t: number; event: { event: string; data: unknown } }> = []; + const forwardLive = (e: { event: string; data: unknown }) => { + sse.emit(e.event, e.data); + // A terminal signal — either explicit success/error event from the lifecycle, or + // the recorder's `_recorder_finished` sentinel emitted before it unsubscribes. + const isTerminalEvent = + e.event === '_recorder_finished' || + e.event === 'error' || + (e.event === 'phase' && + e.data && + typeof e.data === 'object' && + (e.data as { phase?: string }).phase === 'success'); + if (isTerminalEvent && !liveDone) { + liveDone = true; + resolveLive?.(); + } + }; + const unsubscribe = liveEmitter + ? liveEmitter.subscribe((event) => { + const t = Date.now(); + if (resolveLive) { + // Replay phase finished; we own the live forward path now. + if (t > lastReplayedTs) forwardLive(event); + } else { + // Still replaying; stash for dedup-and-forward once replay completes. + liveBuffer.push({ t, event }); + } + }) + : null; + + try { + for (const entry of row.event_log ?? []) { + sse.emit(entry.event, entry.data); + if (typeof entry.t === 'number') lastReplayedTs = Math.max(lastReplayedTs, entry.t); + } + + if (liveEmitter && !TERMINAL_STATUSES.has(row.status)) { + await new Promise((resolve) => { + resolveLive = resolve; + // Flush anything that arrived during replay, filtering to events the replay missed. + for (const buffered of liveBuffer) { + if (buffered.t > lastReplayedTs) forwardLive(buffered.event); + } + if (liveDone) resolve(); + // Safety net — if the in-memory emitter is dropped (recorder finished or + // the process recycled) before signaling, poll the row's status as a + // fallback so the client never hangs indefinitely. + const pollTimer = setInterval(async () => { + if (liveDone) { + clearInterval(pollTimer); + return; + } + const live = getActiveEmitter(req.deployment_id); + if (!live || live !== liveEmitter) { + clearInterval(pollTimer); + const latest = await table.get(req.deployment_id); + if (latest && TERMINAL_STATUSES.has(latest.status) && !liveDone) { + liveDone = true; + resolve(); + } + } + }, 500); + }); + } + // Re-read the row so the final SSE payload reflects the post-deploy state. + const finalRow = await table.get(req.deployment_id); + return stripBlob(finalRow ?? row); + } finally { + unsubscribe?.(); + } + } + return stripBlob(row); } diff --git a/components/deploymentRecorder.ts b/components/deploymentRecorder.ts index fc221e753..9d5b78bfa 100644 --- a/components/deploymentRecorder.ts +++ b/components/deploymentRecorder.ts @@ -1,12 +1,12 @@ 'use strict'; -// DeploymentRecorder — Slice A scope. +// DeploymentRecorder — lifecycle owner for one row in system.hdb_deployment. // -// Owns the lifecycle of one row in system.hdb_deployment: creates the pending row at deploy -// start, streams the upload payload into the row's payload_blob (computing sha256 + size -// alongside), and writes the terminal status at the end. Slice B will extend this with -// ProgressEmitter subscription and event_log writes; Slice C will add rollback sourcing -// from the blob. +// Creates the pending row at deploy start, persists the upload payload into the row's +// payload_blob (with sha256 + size), and writes the terminal status at the end. +// Slice B (#641): subscribes to a ProgressEmitter so phase transitions and install lines +// land in event_log as they happen — making the deploy observable by Studio polling +// get_deployment without an attached CLI. Slice C will add rollback sourcing from the blob. import { randomUUID } from 'node:crypto'; import { createHash, Hash } from 'node:crypto'; @@ -16,6 +16,26 @@ import { createBlob } from '../resources/blob.ts'; import * as terms from '../utility/hdbTerms.ts'; import { ClientError } from '../utility/errors/hdbError.ts'; import { hostname } from 'node:os'; +import { ProgressEmitter } from '../server/serverHelpers/progressEmitter.ts'; + +// Bound the event_log so a pathologically chatty install can't grow a row without limit. +// Slice B emits a handful of phase events plus aggregated install summaries; 200 entries +// comfortably covers a real deploy with headroom. When we exceed the cap, drop the middle +// rather than the front — the lifecycle spine (prepare → load → replicate → success) is +// the most valuable context for debugging, and naive front-truncation loses it under a +// chatty `npm install`. +const EVENT_LOG_MAX = 200; +const EVENT_LOG_HEAD_KEEP = 20; + +// In-memory registry of live emitters, keyed by deployment_id. Populated for the lifetime +// of an in-progress deploy on the origin node; get_deployment SSE looks here to tail live +// events after replaying event_log. Per-node, not replicated — peers don't see another +// node's in-progress emitters. Slice B1 scope; cross-node tailing is a later concern. +const activeEmitters = new Map(); + +export function getActiveEmitter(deploymentId: string): ProgressEmitter | undefined { + return activeEmitters.get(deploymentId); +} // Slice A buffers the entire payload in memory before computing the hash and persisting. // This cap prevents an OOM on accidentally-huge uploads while Slice B is in flight. Slice B @@ -40,6 +60,7 @@ interface CreateOptions { user?: string; restart_mode?: 'immediate' | 'rolling' | null; rollback_of?: string | null; + emitter?: ProgressEmitter; } export class DeploymentRecorder { @@ -48,6 +69,9 @@ export class DeploymentRecorder { private hash: Hash | null = null; private byteCount = 0; private finished = false; + private unsubscribe: (() => void) | null = null; + private pendingPut: Promise | null = null; + private dirty = false; private constructor(deploymentId: string, initial: Record) { this.deploymentId = deploymentId; @@ -78,9 +102,68 @@ export class DeploymentRecorder { }; const recorder = new DeploymentRecorder(deploymentId, record); await recorder.put(); + if (options.emitter) { + recorder.subscribeTo(options.emitter); + activeEmitters.set(deploymentId, options.emitter); + } return recorder; } + /** + * Subscribe to a ProgressEmitter. Each event is appended (bounded) to event_log; phase + * events also update the row's `status` and `phase` fields. Writes coalesce: a put is + * always pending after the first event in a burst, so chatty install output collapses + * to one row update per ~100ms instead of one per line. + */ + private subscribeTo(emitter: ProgressEmitter): void { + this.unsubscribe = emitter.subscribe((event) => { + this.appendEvent(event.event, event.data); + }); + } + + private appendEvent(event: string, data: unknown): void { + if (this.finished) return; + const log = this.record.event_log as Array>; + log.push({ t: Date.now(), event, data }); + // Keep the head (lifecycle spine) and tail (most-recent activity); drop the middle. + if (log.length > EVENT_LOG_MAX) { + const tailKeep = EVENT_LOG_MAX - EVENT_LOG_HEAD_KEEP - 1; // -1 for the truncation marker + const removedCount = log.length - EVENT_LOG_HEAD_KEEP - tailKeep; + log.splice(EVENT_LOG_HEAD_KEEP, log.length - EVENT_LOG_HEAD_KEEP - tailKeep, { + t: Date.now(), + event: 'truncated', + data: { dropped_events: removedCount }, + }); + } + // Phase events drive the canonical status/phase fields used by list_deployments. + if (event === 'phase' && data && typeof data === 'object') { + const p = data as { phase?: string; status?: string }; + if (p.phase) this.record.phase = p.phase; + if (p.status === 'start') { + const mapped = startStatusFor(p.phase); + if (mapped) this.record.status = mapped; + } + } + this.scheduleFlush(); + } + + // Coalesce writes: at most one in-flight put at a time. While a put is running, mark + // the record dirty; the chained continuation issues a follow-up put once the prior one + // settles. This keeps event_log writes O(1) puts per burst rather than O(N) per event. + private scheduleFlush(): void { + if (this.pendingPut) { + this.dirty = true; + return; + } + this.pendingPut = this.put().finally(() => { + this.pendingPut = null; + if (this.dirty) { + this.dirty = false; + this.scheduleFlush(); + } + }); + } + /** * Drain a payload source (Buffer or Readable) into the row's payload_blob attribute, * computing sha256 and byte count alongside. After this resolves the row has been @@ -144,7 +227,23 @@ export class DeploymentRecorder { async finish(status: 'success' | 'failed' | 'rolled_back', error?: unknown): Promise { if (this.finished) return; + // Send a terminal sentinel through the emitter (if any) BEFORE we unsubscribe and + // remove it from the registry, so any SSE tail subscribers can resolve their wait + // even on a code path that doesn't emit an explicit `error` event. + const emitter = activeEmitters.get(this.deploymentId); + emitter?.emit('_recorder_finished', { status }); this.finished = true; + this.unsubscribe?.(); + this.unsubscribe = null; + activeEmitters.delete(this.deploymentId); + // Wait for any in-flight coalesced put before mutating + persisting the terminal state. + if (this.pendingPut) { + try { + await this.pendingPut; + } catch { + /* the next put surfaces the error */ + } + } this.record.status = status; this.record.completed_at = Date.now(); if (error) { @@ -172,3 +271,20 @@ export class DeploymentRecorder { await table.put(this.record); } } + +function startStatusFor(phase: string | undefined): DeploymentStatus | null { + switch (phase) { + case 'extract': + return 'extracting'; + case 'install': + return 'installing'; + case 'load': + return 'loading'; + case 'replicate': + return 'replicating'; + case 'restart': + return 'restarting'; + default: + return null; + } +} diff --git a/components/operations.js b/components/operations.js index 0f5cb9540..cc43a3e1b 100644 --- a/components/operations.js +++ b/components/operations.js @@ -19,6 +19,7 @@ const { Resources } = require('../resources/Resources.ts'); const { Application, prepareApplication } = require('./Application.ts'); const { server } = require('../server/Server.ts'); const { DeploymentRecorder } = require('./deploymentRecorder.ts'); +const { ProgressEmitter } = require('../server/serverHelpers/progressEmitter.ts'); /** * Read the settings.js file and return the @@ -388,13 +389,17 @@ async function deployComponent(req) { // Slice A of issue #641: create a hdb_deployment row up front so the deploy is // observable and auditable even if the CLI disconnects. The row also holds the payload - // in a Blob attribute — Slice B will use that for peer delivery; for now it's the - // audit record and the rollback source. + // in a Blob attribute — Slice B uses it as the rollback source. // // Only the origin node records — peers receiving a replicated deploy_component skip // recording so we don't accumulate one row per node for the same deploy. The row will // reach peers via the table's replication once Slice B has them consume it. const isReplicatedExecution = typeof req._deploymentId === 'string'; + // Slice B1 of #641: an SSE-bound caller already attached a ProgressEmitter (created in + // the server handler so it can also drive the response stream). Reuse it; otherwise + // spin up a fresh emitter so the recorder still gets phase events for non-SSE deploys. + const emitter = isReplicatedExecution ? null : (req.progress ?? new ProgressEmitter()); + if (emitter && !req.progress) req.progress = emitter; const recorder = isReplicatedExecution ? null : await DeploymentRecorder.create({ @@ -402,9 +407,12 @@ async function deployComponent(req) { package_identifier: req.package ?? null, user: req.hdb_user?.username, restart_mode: req.restart === 'rolling' ? 'rolling' : req.restart ? 'immediate' : null, + emitter, }); if (recorder) req._deploymentId = recorder.deploymentId; + const emit = (event, data) => emitter?.emit(event, data); + let extractionPayload = req.payload; try { // On the origin, tee the tarball (Buffer or Readable from the multipart parser) @@ -428,7 +436,9 @@ async function deployComponent(req) { }, }); + emit('phase', { phase: 'prepare', status: 'start' }); await prepareApplication(application); + emit('phase', { phase: 'prepare', status: 'done' }); // now we attempt to actually load the component in case there is // an error we can immediately detect and report, but app code should not run on the main thread @@ -439,6 +449,7 @@ async function deployComponent(req) { const componentLoader = require('./componentLoader.ts').default || require('./componentLoader.ts'); let lastError; componentLoader.setErrorReporter((error) => (lastError = error)); + emit('phase', { phase: 'load', status: 'start' }); await componentLoader.loadComponent( application.dirPath, pseudoResources, @@ -448,23 +459,34 @@ async function deployComponent(req) { false, req.project ); + emit('phase', { phase: 'load', status: 'done' }); if (lastError) throw lastError; } const rollingRestart = req.restart === 'rolling'; // if doing a rolling restart set restart to false so that other nodes don't also restart. req.restart = rollingRestart ? false : req.restart; + // ProgressEmitter holds function listeners that can't survive the replication + // channel's serialization, and the recorder is local to origin anyway. Strip both + // before sending so peers see a clean req. + delete req.progress; + emit('phase', { phase: 'replicate', status: 'start' }); let response = await server.replication.replicateOperation(req); + emit('phase', { phase: 'replicate', status: 'done' }); if (req.restart === true) { + emit('phase', { phase: 'restart', status: 'start' }); manageThreads.restartWorkers('http'); + emit('phase', { phase: 'restart', status: 'done' }); response.message = `Successfully deployed: ${application.name}, restarting Harper`; } else if (rollingRestart) { const serverUtilities = require('../server/serverHelpers/serverUtilities.ts'); + emit('phase', { phase: 'restart', status: 'start' }); const jobResponse = await serverUtilities.executeJob({ operation: 'restart_service', service: 'http', replicated: true, }); + emit('phase', { phase: 'restart', status: 'done' }); response.restartJobId = jobResponse.job_id; response.message = `Successfully deployed: ${application.name}, restarting Harper`; @@ -472,10 +494,16 @@ async function deployComponent(req) { if (recorder) { response.deployment_id = recorder.deploymentId; + emit('phase', { phase: 'success', status: 'done' }); await recorder.finish('success'); } return response; } catch (err) { + emit('error', { + message: err?.message ?? String(err), + code: err?.statusCode ?? err?.code, + phase: recorder?.row.phase, + }); if (recorder) await recorder.finish('failed', err); throw err; } diff --git a/integrationTests/deploy/deploy-tracking-events.test.ts b/integrationTests/deploy/deploy-tracking-events.test.ts new file mode 100644 index 000000000..54e00a4d2 --- /dev/null +++ b/integrationTests/deploy/deploy-tracking-events.test.ts @@ -0,0 +1,209 @@ +/** + * Deployment tracking — event_log and SSE replay/tail (Slice B1 of issue #641). + * + * Builds on the Slice A audit-record tests by exercising the new ProgressEmitter → + * DeploymentRecorder integration: every successful deploy should populate event_log with + * the lifecycle phases, and `get_deployment` with `Accept: text/event-stream` should + * replay those events to the client and close cleanly for a terminal deploy. + */ +import { suite, test, before, after } from 'node:test'; +import { ok, strictEqual } from 'node:assert/strict'; +import { join } from 'node:path'; +import { mkdtempSync, mkdirSync, writeFileSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { setTimeout as sleep } from 'node:timers/promises'; +import { request } from 'node:http'; +import { Readable } from 'node:stream'; + +import { startHarper, teardownHarper, type ContextWithHarper } from '@harperfast/integration-testing'; + +import { streamPackagedDirectory } from '../../dist/components/packageComponent.js'; +import { buildMultipartBody } from '../../dist/bin/multipartBuilder.js'; + +function postMultipart( + url: URL, + contentType: string, + body: Readable, + auth: { username: string; password: string } +): Promise<{ status: number; body: string }> { + return new Promise((resolve, reject) => { + const req = request( + { + protocol: url.protocol, + hostname: url.hostname, + port: url.port, + method: 'POST', + path: '/', + headers: { + 'Content-Type': contentType, + 'Transfer-Encoding': 'chunked', + 'Authorization': 'Basic ' + Buffer.from(`${auth.username}:${auth.password}`).toString('base64'), + }, + }, + (res) => { + res.setEncoding('utf8'); + let buf = ''; + res.on('data', (chunk) => (buf += chunk)); + res.on('end', () => resolve({ status: res.statusCode ?? 0, body: buf })); + } + ); + req.on('error', reject); + body.on('error', reject); + body.pipe(req); + }); +} + +async function callOperation( + ctx: ContextWithHarper, + op: Record, + headers: Record = {} +): Promise<{ status: number; body: any; rawText: string; contentType: string }> { + const url = new URL(ctx.harper.operationsAPIURL); + const auth = 'Basic ' + Buffer.from(`${ctx.harper.admin.username}:${ctx.harper.admin.password}`).toString('base64'); + const res = await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'Authorization': auth, ...headers }, + body: JSON.stringify(op), + }); + const text = await res.text(); + let parsed: any = text; + try { + parsed = JSON.parse(text); + } catch { + // SSE responses are line-oriented text; the caller will parse them. + } + return { status: res.status, body: parsed, rawText: text, contentType: res.headers.get('content-type') ?? '' }; +} + +suite('Deployment tracking — events + SSE', (ctx: ContextWithHarper) => { + let fixtureDir: string; + let deploymentId: string; + + before(async () => { + await startHarper(ctx); + fixtureDir = mkdtempSync(join(tmpdir(), 'deploy-events-fixture-')); + writeFileSync( + join(fixtureDir, 'config.yaml'), + 'static:\n files: web\ngraphqlSchema:\n files: schema.graphql\nrest: true\n' + ); + writeFileSync(join(fixtureDir, 'schema.graphql'), 'type Query { hello: String }\n'); + mkdirSync(join(fixtureDir, 'web'), { recursive: true }); + writeFileSync(join(fixtureDir, 'web', 'index.html'), '

Hello, Events!

'); + }); + + after(async () => { + try { + rmSync(fixtureDir, { recursive: true, force: true }); + } catch { + // best-effort + } + await teardownHarper(ctx); + }); + + test('verify Harper', async () => { + const response = await fetch(`${ctx.harper.operationsAPIURL}/health`); + strictEqual(response.status, 200); + }); + + test('successful deploy populates event_log with the lifecycle phases', async () => { + const project = 'events-test-application'; + const multipart = buildMultipartBody( + { operation: 'deploy_component', project, restart: false }, + { + name: 'payload', + filename: 'package.tar.gz', + contentType: 'application/gzip', + stream: streamPackagedDirectory(fixtureDir, { skip_node_modules: true }), + } + ); + const url = new URL(ctx.harper.operationsAPIURL); + const response = await postMultipart(url, multipart.contentType, multipart.stream, ctx.harper.admin); + strictEqual(response.status, 200, `expected 200, got ${response.status}: ${response.body}`); + const result = JSON.parse(response.body); + deploymentId = result.deployment_id; + ok(deploymentId, 'deploy response should include deployment_id'); + + // Coalesced writes settle on a microtask boundary; give them a beat. + await sleep(200); + + const got = await callOperation(ctx, { operation: 'get_deployment', deployment_id: deploymentId }); + strictEqual(got.status, 200); + const row = got.body; + ok(Array.isArray(row.event_log), 'event_log should be an array'); + ok(row.event_log.length >= 2, `expected at least 2 events, got ${row.event_log.length}`); + const phases = row.event_log.filter((e: any) => e.event === 'phase').map((e: any) => e.data?.phase); + // We emit prepare → (load) → replicate → success in the lifecycle. Verify the spine. + ok(phases.includes('prepare'), `event_log should include a prepare phase: ${phases.join(',')}`); + ok(phases.includes('replicate'), `event_log should include a replicate phase: ${phases.join(',')}`); + }); + + test('get_deployment with Accept: text/event-stream replays event_log and closes cleanly', async () => { + // Already terminal at this point — Slice B1's SSE branch should replay event_log + // then return the final record as the `done` event. + const got = await callOperation( + ctx, + { operation: 'get_deployment', deployment_id: deploymentId }, + { Accept: 'text/event-stream' } + ); + strictEqual(got.status, 200, `expected 200, got ${got.status}: ${got.rawText}`); + ok(got.contentType.startsWith('text/event-stream'), `expected SSE content-type, got: ${got.contentType}`); + + const text = got.rawText; + // Each SSE record is separated by a blank line. Count phase events. + const records = text.split(/\r?\n\r?\n/).filter((r) => r.includes('event:')); + ok( + records.length >= 2, + `expected at least 2 SSE records (events + done), got ${records.length}.\nraw SSE:\n${text}` + ); + ok( + records.some((r) => r.includes('event: phase')), + `expected at least one phase event in SSE replay.\nraw SSE:\n${text}` + ); + ok( + records.some((r) => r.includes('event: done')), + `expected a final done event in SSE replay.\nraw SSE:\n${text}` + ); + }); + + test('failed deploy event_log captures the error event', async () => { + const project = 'broken-events-application'; + const brokenDir = mkdtempSync(join(tmpdir(), 'broken-events-fixture-')); + try { + writeFileSync( + join(brokenDir, 'package.json'), + JSON.stringify({ name: project, version: '0.0.0', dependencies: {} }) + ); + writeFileSync(join(brokenDir, 'config.yaml'), 'rest: true\n'); + const multipart = buildMultipartBody( + { + operation: 'deploy_component', + project, + restart: false, + install_command: 'sh -c "exit 1"', + install_timeout: 30_000, + }, + { + name: 'payload', + filename: 'package.tar.gz', + contentType: 'application/gzip', + stream: streamPackagedDirectory(brokenDir, { skip_node_modules: true }), + } + ); + const url = new URL(ctx.harper.operationsAPIURL); + const response = await postMultipart(url, multipart.contentType, multipart.stream, ctx.harper.admin); + ok(response.status >= 400, `expected an error response, got ${response.status}`); + + await sleep(200); + const listed = await callOperation(ctx, { operation: 'list_deployments', project }); + const failed = listed.body.deployments.find((d: any) => d.project === project); + ok(failed, 'expected to find a failed deployment row'); + strictEqual(failed.status, 'failed'); + const errorEvents = (failed.event_log ?? []).filter((e: any) => e.event === 'error'); + ok(errorEvents.length >= 1, 'expected at least one error event in event_log'); + } finally { + try { + rmSync(brokenDir, { recursive: true, force: true }); + } catch {} + } + }); +}); diff --git a/server/serverHelpers/progressEmitter.ts b/server/serverHelpers/progressEmitter.ts new file mode 100644 index 000000000..e77b60a8e --- /dev/null +++ b/server/serverHelpers/progressEmitter.ts @@ -0,0 +1,104 @@ +import { PassThrough, Readable } from 'node:stream'; + +export interface ProgressEvent { + event: string; + data: unknown; +} + +export type ProgressListener = (event: ProgressEvent) => void; + +/** + * Lightweight pub-sub used to report phase/install/replicate events from a long-running + * operation back to the HTTP layer. We deliberately don't use Node's EventEmitter here: + * we only need broadcast semantics for a small set of event types, and we want the + * `emit(event, data)` shape that matches the SSE wire format directly. + */ +export class ProgressEmitter { + private listeners: ProgressListener[] = []; + + emit(event: string, data: unknown): void { + // Snapshot before iteration so a listener that unsubscribes itself during dispatch + // doesn't shift indexes underneath us. + const snapshot = this.listeners.slice(); + for (const listener of snapshot) { + try { + listener({ event, data }); + } catch { + // A buggy listener must never break the operation. Swallow and continue. + } + } + } + + subscribe(listener: ProgressListener): () => void { + this.listeners.push(listener); + return () => { + const i = this.listeners.indexOf(listener); + if (i !== -1) this.listeners.splice(i, 1); + }; + } +} + +/** + * Wrap a long-running operation so its progress events stream back as Server-Sent Events. + * + * The returned Readable emits one SSE message per `emitter.emit(...)` call, then a final + * `done` (or `error`) event with the operation's result, then ends. The caller is + * expected to set Content-Type: text/event-stream on the response. + */ +export function createSSEResponseStream(emitter: ProgressEmitter, operation: () => Promise): Readable { + const stream = new PassThrough(); + // Prime the stream with an SSE comment line so the response body is non-empty by the time + // Fastify starts piping. Without this, Fastify buffers the PassThrough's internal queue + // until its end and only flushes the final chunk to the wire — making intermediate + // progress events invisible to the client. The comment ": ..." is a valid SSE record + // that consumers ignore, so it's safe filler. + stream.write(`: stream open\n\n`); + + let active = true; + const unsubscribe = emitter.subscribe((event) => { + if (active) writeSSE(stream, event); + }); + + const cleanup = () => { + if (active) { + active = false; + unsubscribe(); + } + }; + + // If the client disconnects (Ctrl-C, network drop) stop writing to the stream and + // release the emitter subscription so it doesn't accumulate for the operation lifetime. + stream.on('close', cleanup); + stream.on('end', cleanup); + + operation() + .then((result) => { + if (active) writeSSE(stream, { event: 'done', data: { result } }); + }) + .catch((err) => { + if (active) { + writeSSE(stream, { + event: 'error', + data: { + message: err?.message ?? String(err), + code: err?.statusCode ?? err?.code, + }, + }); + } + }) + .finally(() => { + cleanup(); + stream.end(); + }); + + return stream; +} + +function writeSSE(stream: PassThrough, event: ProgressEvent): void { + const data = typeof event.data === 'string' ? event.data : JSON.stringify(event.data); + stream.write(`event: ${event.event}\n`); + for (const line of data.split(/\r?\n/)) { + stream.write(`data: ${line}\n`); + } + stream.write('\n'); +} diff --git a/server/serverHelpers/serverHandlers.js b/server/serverHelpers/serverHandlers.js index 692c9fbc4..60160629e 100644 --- a/server/serverHelpers/serverHandlers.js +++ b/server/serverHelpers/serverHandlers.js @@ -15,6 +15,13 @@ const pAuthorize = util.promisify(auth.authorize); const serverUtilities = require('./serverUtilities.ts'); const { applyImpersonation } = require('../../security/impersonation.ts'); const { createGzip, constants } = require('zlib'); +const { ProgressEmitter, createSSEResponseStream } = require('./progressEmitter.ts'); + +// Operations that support `Accept: text/event-stream` for live progress streaming. The +// handler attaches a ProgressEmitter as req.body.progress so the operation can emit phase +// events; the response body is the SSE-encoded emitter output. Non-SSE clients see the +// historical single-response shape because progress is undefined on that path. +const SSE_PROGRESS_OPERATIONS = new Set([terms.OPERATIONS_ENUM.DEPLOY_COMPONENT, terms.OPERATIONS_ENUM.GET_DEPLOYMENT]); const NO_AUTH_OPERATIONS = [ terms.OPERATIONS_ENUM.CREATE_AUTHENTICATION_TOKENS, @@ -119,6 +126,25 @@ async function handlePostRequest(req, res, _bypassAuth = false) { if (req.body.bypass_auth) delete req.body.bypass_auth; operation_function = serverUtilities.chooseOperation(req.body); + + // SSE progress branch — when the client asks for `text/event-stream` on an operation + // that supports it, run the operation in the background and stream events back as they + // happen. The progress emitter is attached to req.body so the operation handler can + // emit without changing its return signature; non-SSE callers leave `progress` + // undefined and the handler stays on its synchronous result path. + // Optional-chained `req.headers` because unit tests dispatch through this path with + // synthetic req shapes that don't set headers; production Fastify requests always do. + // `Accept` parsing only checks for the text/event-stream token; allow comma-separated + // values like `text/event-stream, application/json` and quality params per RFC 7231. + if (req.headers?.accept?.includes('text/event-stream') && SSE_PROGRESS_OPERATIONS.has(req.body.operation)) { + const emitter = new ProgressEmitter(); + req.body.progress = emitter; + res.header('Content-Type', 'text/event-stream'); + res.header('Cache-Control', 'no-cache'); + res.header('X-Accel-Buffering', 'no'); // disable proxy buffering so events flush in real time + return createSSEResponseStream(emitter, () => serverUtilities.processLocalTransaction(req, operation_function)); + } + let result = await serverUtilities.processLocalTransaction(req, operation_function); if (result instanceof Readable && result.headers) { for (let [name, value] of result.headers) { diff --git a/unitTests/bin/deployRenderer.test.js b/unitTests/bin/deployRenderer.test.js new file mode 100644 index 000000000..9ab450b95 --- /dev/null +++ b/unitTests/bin/deployRenderer.test.js @@ -0,0 +1,108 @@ +'use strict'; + +const assert = require('node:assert'); +const { PassThrough } = require('node:stream'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { DeployRenderer } = require('#src/bin/deployRenderer'); + +function makeOutput() { + const lines = []; + const stream = { write: (s) => lines.push(s), isTTY: false }; + return { stream, lines }; +} + +describe('DeployRenderer', () => { + describe('upload progress (non-TTY)', () => { + it('advances bytes as data flows through the tap and logs at 10% steps', async () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ uploadTotal: 1_000_000, output: stream }); + const source = new PassThrough(); + const tap = renderer.tapUploadStream(source); + // Drain into a sink so backpressure doesn't pause the tap. + const sink = new PassThrough(); + tap.pipe(sink); + sink.on('data', () => {}); + // Write 5×200_000 byte chunks → 1MB total, crossing each 10% threshold. + for (let i = 0; i < 5; i++) source.write(Buffer.alloc(200_000)); + source.end(); + await new Promise((resolve) => sink.on('end', resolve)); + renderer.endUpload(); + // Expect at least one per-10% progress line plus the final "Upload complete" line. + const progressLines = lines.filter((l) => /Uploaded /.test(l)); + assert.ok( + progressLines.length >= 4, + `expected multiple progress lines, got ${progressLines.length}: ${progressLines.join('|')}` + ); + assert.ok( + lines.some((l) => l.startsWith('Upload complete')), + 'should log Upload complete on endUpload' + ); + }); + + it('endUpload is idempotent', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ uploadTotal: 100, output: stream }); + renderer.tapUploadStream(new PassThrough()); + renderer.endUpload(); + renderer.endUpload(); + const completeLines = lines.filter((l) => l.startsWith('Upload complete')); + assert.strictEqual(completeLines.length, 1); + }); + }); + + describe('renderEvent', () => { + function sseMessage(event, data) { + return { event, data: JSON.stringify(data) }; + } + + it('prints a phase line on start, then on done (no duplicate start)', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('phase', { phase: 'extract', status: 'start' })); + renderer.renderEvent(sseMessage('phase', { phase: 'extract', status: 'start' })); + renderer.renderEvent(sseMessage('phase', { phase: 'extract', status: 'done' })); + assert.deepStrictEqual(lines, ['extract…\n', 'extract done\n']); + }); + + it('forwards install stdout/stderr lines and headers the manager name once', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('phase', { phase: 'install', status: 'start' })); + renderer.renderEvent(sseMessage('install', { manager: 'npm', stream: 'stdout', line: 'added 42 packages' })); + renderer.renderEvent(sseMessage('install', { manager: 'npm', stream: 'stdout', line: 'audited 100 packages' })); + renderer.renderEvent( + sseMessage('install', { manager: 'npm', stream: 'stderr', line: 'npm warn deprecated foo' }) + ); + renderer.renderEvent(sseMessage('phase', { phase: 'install', status: 'done' })); + assert.ok(lines.includes('install: using npm\n'), 'expected manager header'); + assert.strictEqual( + lines.filter((l) => l === 'install: using npm\n').length, + 1, + 'manager header logged only once' + ); + assert.ok(lines.includes(' | added 42 packages\n')); + assert.ok(lines.includes(' | audited 100 packages\n')); + assert.ok(lines.includes(' ! npm warn deprecated foo\n'), 'stderr should be tagged with !'); + assert.ok( + lines.some((l) => /install done \(3 log lines\)/.test(l)), + 'install-done should count emitted lines' + ); + }); + + it('renders an error event with code suffix', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('error', { message: 'npm install failed', code: 500 })); + assert.match(lines[0], /error: npm install failed \(500\)/); + }); + + it('renders a phase-error with the message', () => { + const { stream, lines } = makeOutput(); + const renderer = new DeployRenderer({ output: stream }); + renderer.renderEvent(sseMessage('phase', { phase: 'install', status: 'error', message: 'exit code 1' })); + assert.match(lines[0], /install ERROR: exit code 1/); + }); + }); +}); diff --git a/unitTests/bin/sseConsumer.test.js b/unitTests/bin/sseConsumer.test.js new file mode 100644 index 000000000..2bc1c25a2 --- /dev/null +++ b/unitTests/bin/sseConsumer.test.js @@ -0,0 +1,95 @@ +'use strict'; + +const assert = require('node:assert'); +const { Readable, PassThrough } = require('node:stream'); +const testUtils = require('../testUtils.js'); +testUtils.preTestPrep(); + +const { parseSSE, renderDeployProgress } = require('#src/bin/sseConsumer'); + +async function collectMessages(stream) { + const out = []; + for await (const msg of parseSSE(stream)) out.push(msg); + return out; +} + +describe('parseSSE', () => { + it('parses a single record', async () => { + const stream = Readable.from(['event: phase\ndata: {"phase":"extract"}\n\n']); + const msgs = await collectMessages(stream); + assert.deepStrictEqual(msgs, [{ event: 'phase', data: '{"phase":"extract"}', id: undefined, retry: undefined }]); + }); + + it('joins multi-line data fields with \\n per the SSE spec', async () => { + const stream = Readable.from(['event: install\ndata: line1\ndata: line2\n\n']); + const msgs = await collectMessages(stream); + assert.strictEqual(msgs[0].data, 'line1\nline2'); + }); + + it('handles records split across multiple chunks', async () => { + const stream = new PassThrough(); + const collector = collectMessages(stream); + stream.write('event: phase\n'); + stream.write('data: {"phase":"extract","sta'); + stream.write('tus":"start"}\n\n'); + stream.write('event: phase\ndata: {"phase":"extract","status":"done"}\n\n'); + stream.end(); + const msgs = await collector; + assert.strictEqual(msgs.length, 2); + assert.strictEqual(JSON.parse(msgs[0].data).status, 'start'); + assert.strictEqual(JSON.parse(msgs[1].data).status, 'done'); + }); + + it('handles CRLF line endings', async () => { + const stream = Readable.from(['event: phase\r\ndata: {"x":1}\r\n\r\n']); + const msgs = await collectMessages(stream); + assert.strictEqual(msgs[0].event, 'phase'); + assert.deepStrictEqual(JSON.parse(msgs[0].data), { x: 1 }); + }); + + it('strips a single leading space after the colon, per spec', async () => { + const stream = Readable.from(['data: hello\n\n']); + const msgs = await collectMessages(stream); + // only ONE space is stripped, the second remains + assert.strictEqual(msgs[0].data, ' hello'); + }); + + it('ignores comment lines (leading colon)', async () => { + const stream = Readable.from([': heartbeat\nevent: ping\ndata: ok\n\n']); + const msgs = await collectMessages(stream); + assert.strictEqual(msgs[0].event, 'ping'); + assert.strictEqual(msgs[0].data, 'ok'); + }); +}); + +describe('renderDeployProgress', () => { + it('prints a phase line on start and another on done', () => { + const lines = []; + const out = { write: (s) => lines.push(s) }; + const state = {}; + renderDeployProgress({ event: 'phase', data: JSON.stringify({ phase: 'extract', status: 'start' }) }, state, out); + renderDeployProgress({ event: 'phase', data: JSON.stringify({ phase: 'extract', status: 'done' }) }, state, out); + assert.deepStrictEqual(lines, ['extract…\n', 'extract done\n']); + }); + + it('does not repeat the same phase-start line', () => { + const lines = []; + const out = { write: (s) => lines.push(s) }; + const state = {}; + const msg = { event: 'phase', data: JSON.stringify({ phase: 'extract', status: 'start' }) }; + renderDeployProgress(msg, state, out); + renderDeployProgress(msg, state, out); + assert.strictEqual(lines.length, 1); + }); + + it('prints an error line with the message', () => { + const lines = []; + const out = { write: (s) => lines.push(s) }; + renderDeployProgress( + { event: 'error', data: JSON.stringify({ message: 'npm install failed', code: 500 }) }, + {}, + out + ); + assert.match(lines[0], /error: npm install failed \(500\)/); + }); +}); diff --git a/unitTests/server/serverHelpers/progressEmitter.test.js b/unitTests/server/serverHelpers/progressEmitter.test.js new file mode 100644 index 000000000..774048184 --- /dev/null +++ b/unitTests/server/serverHelpers/progressEmitter.test.js @@ -0,0 +1,125 @@ +'use strict'; + +const assert = require('node:assert'); +const { Readable } = require('node:stream'); +const testUtils = require('../../testUtils.js'); +testUtils.preTestPrep(); + +const { ProgressEmitter, createSSEResponseStream } = require('#src/server/serverHelpers/progressEmitter'); + +function collect(stream) { + return new Promise((resolve, reject) => { + const chunks = []; + stream.on('data', (c) => chunks.push(c)); + stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); + stream.on('error', reject); + }); +} + +function parseSSEBlocks(text) { + return ( + text + .split('\n\n') + .filter((block) => block.trim().length > 0) + .map((block) => { + const out = {}; + for (const line of block.split('\n')) { + const colon = line.indexOf(':'); + if (colon === -1) continue; + const field = line.slice(0, colon); + let value = line.slice(colon + 1); + if (value.startsWith(' ')) value = value.slice(1); + out[field] = value; + } + return out; + }) + // Drop SSE comment records (lines starting with `:`) — they're protocol-level + // liveness hints that the stream is open, not application events. + .filter((rec) => 'event' in rec || 'data' in rec) + ); +} + +describe('ProgressEmitter', () => { + it('delivers events to every subscriber', () => { + const emitter = new ProgressEmitter(); + const a = []; + const b = []; + emitter.subscribe((e) => a.push(e)); + emitter.subscribe((e) => b.push(e)); + emitter.emit('phase', { phase: 'extract', status: 'start' }); + emitter.emit('phase', { phase: 'extract', status: 'done' }); + assert.deepStrictEqual(a, [ + { event: 'phase', data: { phase: 'extract', status: 'start' } }, + { event: 'phase', data: { phase: 'extract', status: 'done' } }, + ]); + assert.deepStrictEqual(b, a); + }); + + it('unsubscribe stops further delivery', () => { + const emitter = new ProgressEmitter(); + const received = []; + const unsubscribe = emitter.subscribe((e) => received.push(e)); + emitter.emit('phase', { phase: 'extract', status: 'start' }); + unsubscribe(); + emitter.emit('phase', { phase: 'extract', status: 'done' }); + assert.strictEqual(received.length, 1); + }); + + it('swallows listener exceptions so operations are never broken by a buggy subscriber', () => { + const emitter = new ProgressEmitter(); + emitter.subscribe(() => { + throw new Error('listener boom'); + }); + const ok = []; + emitter.subscribe((e) => ok.push(e)); + emitter.emit('phase', { phase: 'extract', status: 'start' }); + assert.strictEqual(ok.length, 1); + }); +}); + +describe('createSSEResponseStream', () => { + it('streams emitter events then a terminating `done` event with the operation result', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => { + emitter.emit('phase', { phase: 'extract', status: 'start' }); + await new Promise((r) => setImmediate(r)); + emitter.emit('phase', { phase: 'extract', status: 'done' }); + return { message: 'Successfully deployed: demo' }; + }); + const text = await collect(stream); + const events = parseSSEBlocks(text); + assert.strictEqual(events.length, 3); + assert.strictEqual(events[0].event, 'phase'); + assert.deepStrictEqual(JSON.parse(events[0].data), { phase: 'extract', status: 'start' }); + assert.strictEqual(events[1].event, 'phase'); + assert.deepStrictEqual(JSON.parse(events[1].data), { phase: 'extract', status: 'done' }); + assert.strictEqual(events[2].event, 'done'); + assert.deepStrictEqual(JSON.parse(events[2].data), { result: { message: 'Successfully deployed: demo' } }); + }); + + it('streams an `error` event when the operation rejects', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => { + emitter.emit('phase', { phase: 'extract', status: 'start' }); + const err = new Error('boom'); + err.statusCode = 500; + throw err; + }); + const events = parseSSEBlocks(await collect(stream)); + assert.strictEqual(events[events.length - 1].event, 'error'); + const errData = JSON.parse(events[events.length - 1].data); + assert.strictEqual(errData.message, 'boom'); + assert.strictEqual(errData.code, 500); + }); + + it('still closes the stream cleanly when the operation emits nothing', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => ({ ok: true })); + const events = parseSSEBlocks(await collect(stream)); + assert.strictEqual(events.length, 1); + assert.strictEqual(events[0].event, 'done'); + }); +}); + +// keep Readable import live for any future tests that need stream sources +void Readable; diff --git a/utility/common_utils.ts b/utility/common_utils.ts index 8b9b14819..26b5e8dd1 100644 --- a/utility/common_utils.ts +++ b/utility/common_utils.ts @@ -756,8 +756,24 @@ export function httpRequest(options: any, data: any): Promise { const req = client.request(options, (response: http.IncomingMessage & { body?: string }) => { + if (streamResponse) { + // Hand the raw stream to the caller; do not setEncoding so binary-safe consumers + // (or SSE parsers that prefer Buffers) still work. + resolve(response); + return; + } response.setEncoding('utf8'); response.body = ''; response.on('data', (chunk) => { From e46ecea9c87a730ae5173e14f319aa1bec1a4d96 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 20 May 2026 22:03:28 -0600 Subject: [PATCH 05/10] fix(deploy): suppress duplicate SSE error event; add live-tail integration test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When operations.js emits an `error` event through the ProgressEmitter before throwing, createSSEResponseStream's .catch handler was writing a second error SSE record — dropping the phase context from the first. Fix: track whether the subscriber already forwarded an error event and skip the framework fallback if so. Adds a unit test for the dedup behavior and an integration test covering the live-tail SSE branch (liveEmitter && !TERMINAL_STATUSES) which was previously unexercised — the new test opens get_deployment SSE against an in-flight deploy using a sleep 3 install command. Co-Authored-By: Claude Sonnet 4.6 --- .../deploy/deploy-tracking-events.test.ts | 69 +++++++++++++++++++ server/serverHelpers/progressEmitter.ts | 10 ++- .../serverHelpers/progressEmitter.test.js | 15 ++++ 3 files changed, 92 insertions(+), 2 deletions(-) diff --git a/integrationTests/deploy/deploy-tracking-events.test.ts b/integrationTests/deploy/deploy-tracking-events.test.ts index 54e00a4d2..eb0446319 100644 --- a/integrationTests/deploy/deploy-tracking-events.test.ts +++ b/integrationTests/deploy/deploy-tracking-events.test.ts @@ -165,6 +165,75 @@ suite('Deployment tracking — events + SSE', (ctx: ContextWithHarper) => { ); }); + test('get_deployment SSE tails live events on an in-flight deploy', async () => { + // This test exercises the live-tail branch: subscribe before the deploy completes, + // then assert that phase events arrive from the emitter (not just the historical replay). + const project = 'live-tail-test-application'; + const liveDir = mkdtempSync(join(tmpdir(), 'live-tail-fixture-')); + try { + writeFileSync(join(liveDir, 'config.yaml'), 'rest: true\n'); + const multipart = buildMultipartBody( + { + operation: 'deploy_component', + project, + restart: false, + // A 3-second sleep gives us a window to attach the SSE tail before the + // deploy reaches a terminal status. The row exists after the multipart + // upload completes (before prepareApplication), so polling finds it quickly. + install_command: 'sleep 3', + install_timeout: 30_000, + }, + { + name: 'payload', + filename: 'package.tar.gz', + contentType: 'application/gzip', + stream: streamPackagedDirectory(liveDir, { skip_node_modules: true }), + } + ); + const url = new URL(ctx.harper.operationsAPIURL); + + // Start the deploy without awaiting — the HTTP response comes back only after the + // deploy finishes, so this promise stays pending for ~3+ seconds. + const deployPromise = postMultipart(url, multipart.contentType, multipart.stream, ctx.harper.admin); + + // Poll list_deployments until we see the in-flight row. The recorder row is created + // after the multipart body is fully received (which is fast for a tiny fixture). + const TERMINAL = new Set(['success', 'failed', 'rolled_back']); + let inFlightId: string | null = null; + for (let i = 0; i < 15 && !inFlightId; i++) { + await sleep(300); + const listed = await callOperation(ctx, { operation: 'list_deployments', project }); + const rows: any[] = listed.body?.deployments ?? []; + const inFlight = rows.find((d: any) => !TERMINAL.has(d.status)); + if (inFlight) inFlightId = inFlight.deployment_id; + } + ok(inFlightId, 'expected to find an in-flight deployment row within the polling window'); + + // Open SSE tail while the deploy is still running; fetch blocks until the SSE + // stream closes (when the deploy reaches a terminal status and the server ends it). + const [deployResult, sseResult] = await Promise.all([ + deployPromise, + callOperation(ctx, { operation: 'get_deployment', deployment_id: inFlightId }, { Accept: 'text/event-stream' }), + ]); + + strictEqual(deployResult.status, 200, `deploy should succeed: ${deployResult.body}`); + strictEqual(sseResult.status, 200); + ok(sseResult.contentType.startsWith('text/event-stream')); + + const records = sseResult.rawText.split(/\r?\n\r?\n/).filter((r) => r.includes('event:')); + ok( + records.some((r) => r.includes('event: phase')), + `expected at least one phase event in SSE stream.\nraw SSE:\n${sseResult.rawText}` + ); + ok( + records.some((r) => r.includes('event: done')), + `expected a final done event in SSE stream.\nraw SSE:\n${sseResult.rawText}` + ); + } finally { + rmSync(liveDir, { recursive: true, force: true }); + } + }); + test('failed deploy event_log captures the error event', async () => { const project = 'broken-events-application'; const brokenDir = mkdtempSync(join(tmpdir(), 'broken-events-fixture-')); diff --git a/server/serverHelpers/progressEmitter.ts b/server/serverHelpers/progressEmitter.ts index e77b60a8e..7f8659b1e 100644 --- a/server/serverHelpers/progressEmitter.ts +++ b/server/serverHelpers/progressEmitter.ts @@ -55,8 +55,12 @@ export function createSSEResponseStream(emitter: ProgressEmitter, operation: () stream.write(`: stream open\n\n`); let active = true; + let errorEmitted = false; const unsubscribe = emitter.subscribe((event) => { - if (active) writeSSE(stream, event); + if (active) { + writeSSE(stream, event); + if (event.event === 'error') errorEmitted = true; + } }); const cleanup = () => { @@ -76,7 +80,9 @@ export function createSSEResponseStream(emitter: ProgressEmitter, operation: () if (active) writeSSE(stream, { event: 'done', data: { result } }); }) .catch((err) => { - if (active) { + // Only emit a framework-level error event if the operation itself didn't already + // emit one (with richer context like phase) through the emitter subscriber above. + if (active && !errorEmitted) { writeSSE(stream, { event: 'error', data: { diff --git a/unitTests/server/serverHelpers/progressEmitter.test.js b/unitTests/server/serverHelpers/progressEmitter.test.js index 774048184..d2cffbb14 100644 --- a/unitTests/server/serverHelpers/progressEmitter.test.js +++ b/unitTests/server/serverHelpers/progressEmitter.test.js @@ -112,6 +112,21 @@ describe('createSSEResponseStream', () => { assert.strictEqual(errData.code, 500); }); + it('does not emit a second error event when the operation already emitted one before throwing', async () => { + const emitter = new ProgressEmitter(); + const stream = createSSEResponseStream(emitter, async () => { + // Simulates operations.js: emit a rich error event (with phase context) then throw. + emitter.emit('error', { message: 'install failed', code: 500, phase: 'install' }); + const err = new Error('install failed'); + err.statusCode = 500; + throw err; + }); + const events = parseSSEBlocks(await collect(stream)); + const errorEvents = events.filter((e) => e.event === 'error'); + assert.strictEqual(errorEvents.length, 1, `expected exactly 1 error event, got ${errorEvents.length}`); + assert.deepStrictEqual(JSON.parse(errorEvents[0].data), { message: 'install failed', code: 500, phase: 'install' }); + }); + it('still closes the stream cleanly when the operation emits nothing', async () => { const emitter = new ProgressEmitter(); const stream = createSSEResponseStream(emitter, async () => ({ ok: true })); From 054ae0aa15d56d683365112748b276bd1318dcc2 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Thu, 21 May 2026 14:46:19 -0600 Subject: [PATCH 06/10] fix(test): add package.json to live-tail fixture so install_command fires Without a package.json Harper skips install entirely ("no package.json; skipping install"), so the deploy completes in <100ms and the polling loop never catches an in-flight row. The other install-command tests in this file already include package.json for the same reason. Co-Authored-By: Claude Sonnet 4.6 --- integrationTests/deploy/deploy-tracking-events.test.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/integrationTests/deploy/deploy-tracking-events.test.ts b/integrationTests/deploy/deploy-tracking-events.test.ts index eb0446319..8225b60e8 100644 --- a/integrationTests/deploy/deploy-tracking-events.test.ts +++ b/integrationTests/deploy/deploy-tracking-events.test.ts @@ -171,6 +171,13 @@ suite('Deployment tracking — events + SSE', (ctx: ContextWithHarper) => { const project = 'live-tail-test-application'; const liveDir = mkdtempSync(join(tmpdir(), 'live-tail-fixture-')); try { + // package.json is required so installApplication actually runs install_command + // (without it Harper logs "no package.json; skipping install" and the deploy + // completes in <100ms, before the first poll can catch the in-flight row). + writeFileSync( + join(liveDir, 'package.json'), + JSON.stringify({ name: project, version: '0.0.0', dependencies: {} }) + ); writeFileSync(join(liveDir, 'config.yaml'), 'rest: true\n'); const multipart = buildMultipartBody( { From 7475dd4cbe7e8c7c0dda457b2843803f228a9a7c Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Fri, 22 May 2026 20:01:18 -0600 Subject: [PATCH 07/10] fix(deploy): fix upload progress bar total and remove noisy non-TTY log lines Two bugs in the upload progress display: 1. DeployRenderer was instantiated without uploadTotal, so the bar initialised with total=1 and immediately showed 100% on the first chunk. Fix: pre-walk the project directory with getPackagedDirectorySize() and pass the uncompressed size as the total. The bar moves as gzipped bytes are sent and endUpload() snaps it to 100% on completion. 2. The non-TTY path emitted a log line every 5 MiB ("Uploaded 5.0 MiB", "Uploaded 10.0 MiB", ...) during the upload, cluttering CI logs. Fix: remove intermediate lines entirely; endUpload() now prints one "Uploaded X MiB" line on completion. Bar format updated to show human-readable bytes via cli-progress payload tokens ({value_fmt} / ~{total_fmt}) instead of raw byte counts. Co-Authored-By: Claude Sonnet 4.6 --- bin/cliOperations.ts | 16 +++++++---- bin/deployRenderer.ts | 40 ++++++++++++-------------- components/packageComponent.ts | 42 ++++++++++++++++++++++++++++ unitTests/bin/deployRenderer.test.js | 19 ++++--------- 4 files changed, 76 insertions(+), 41 deletions(-) diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index a13b1fc30..f71539104 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -9,7 +9,7 @@ import { httpRequest } from '../utility/common_utils.ts'; import * as path from 'path'; import * as fs from 'fs-extra'; import * as YAML from 'yaml'; -import { streamPackagedDirectory } from '../components/packageComponent.ts'; +import { streamPackagedDirectory, getPackagedDirectorySize } from '../components/packageComponent.ts'; import { buildMultipartBody } from './multipartBuilder.ts'; import { parseSSE } from './sseConsumer.ts'; import { DeployRenderer } from './deployRenderer.ts'; @@ -46,13 +46,17 @@ const PREPARE_OPERATION: any = { const projectPath = process.cwd(); if (!req.project) req.project = path.basename(projectPath); + const packageOptions = { + skip_node_modules: req.skip_node_modules !== false, + skip_symlinks: req.skip_symlinks === true, + }; // Stream the tar+gzip directly to the server as the file part of a multipart body. // This bypasses the Node Buffer 2 GB cap that the previous CBOR-encoded path was // subject to, so large components can deploy without materializing in memory. - req._packageStream = streamPackagedDirectory(projectPath, { - skip_node_modules: req.skip_node_modules !== false, - skip_symlinks: req.skip_symlinks === true, - }); + req._packageStream = streamPackagedDirectory(projectPath, packageOptions); + // Pre-walk the directory for an uncompressed-size estimate. The gzipped wire size + // will be smaller, so the bar won't reach 100% on its own — endUpload() snaps it. + req._uploadSizeEstimate = await getPackagedDirectorySize(projectPath, packageOptions); req._multipart = true; }, }; @@ -208,7 +212,7 @@ async function cliOperations(req: any, skipResponseLog = false) { // One renderer owns the (future) upload bar and the SSE event rendering for a // multipart deploy. Created here so the upload-stream tap and the SSE consumer // below share the same instance. - const renderer = req._multipart ? new DeployRenderer({}) : null; + const renderer = req._multipart ? new DeployRenderer({ uploadTotal: req._uploadSizeEstimate ?? 0 }) : null; let body; if (req._multipart) { const packageStream = req._packageStream; diff --git a/bin/deployRenderer.ts b/bin/deployRenderer.ts index cdaf5ca59..e505802c6 100644 --- a/bin/deployRenderer.ts +++ b/bin/deployRenderer.ts @@ -12,7 +12,6 @@ interface RendererOptions { interface UploadState { bar: cliProgress.SingleBar | null; sent: number; - textLastLogged: number; finished: boolean; } @@ -27,8 +26,8 @@ interface PhaseState { * * 1. Local upload — driven by `tapUploadStream`, which wraps the multipart body so we * can update a `cli-progress` bar against the precomputed uncompressed source-tree - * total. In a non-TTY environment (CI logs, redirected output) we fall back to - * periodic text lines so logs stay grep-able. + * total. The bar moves as gzipped bytes are sent and snaps to 100% on completion. + * In a non-TTY environment a single "Uploaded X MiB" line is printed on completion. * * 2. Server-side phases — driven by `renderEvent`, called for each SSE message the * CLI receives from the operations API. Phase events print one-liners; live @@ -39,7 +38,7 @@ interface PhaseState { * it doesn't compete with subsequent prints) before any SSE events render. */ export class DeployRenderer { - private upload: UploadState = { bar: null, sent: 0, textLastLogged: 0, finished: false }; + private upload: UploadState = { bar: null, sent: 0, finished: false }; private phase: PhaseState = { installLineCount: 0 }; private output: NodeJS.WritableStream; private isTty: boolean; @@ -61,17 +60,22 @@ export class DeployRenderer { this.upload.bar = this.isTty ? new cliProgress.SingleBar( { - format: 'Uploading [{bar}] {percentage}% | {value}/{total} bytes', + // {value_fmt} and {total_fmt} are payload tokens updated in tickUpload/endUpload. + // uploadTotal is the uncompressed source size; gzip output is smaller so the + // bar won't naturally reach 100% — endUpload() snaps it on completion. + format: 'Uploading [{bar}] {percentage}% | {value_fmt} / ~{total_fmt}', barCompleteChar: '█', barIncompleteChar: '░', hideCursor: true, stream: this.output, - etaBuffer: 50, }, cliProgress.Presets.shades_classic ) : null; - this.upload.bar?.start(this.uploadTotal || 1, 0); + this.upload.bar?.start(this.uploadTotal || 1, 0, { + value_fmt: formatBytes(0), + total_fmt: formatBytes(this.uploadTotal), + }); const counter = new Transform({ transform: (chunk, _enc, cb) => { @@ -95,30 +99,22 @@ export class DeployRenderer { if (this.upload.bar) { // Snap to total so the bar shows 100% even when our uncompressed-total estimate // is slightly off (gzip output is usually smaller than the source tree). - if (this.uploadTotal > 0) this.upload.bar.update(this.uploadTotal); + const finalPayload = { value_fmt: formatBytes(this.upload.sent), total_fmt: formatBytes(this.uploadTotal) }; + if (this.uploadTotal > 0) this.upload.bar.update(this.uploadTotal, finalPayload); this.upload.bar.stop(); this.upload.bar = null; } else { - this.output.write(`Upload complete (${formatBytes(this.upload.sent)})\n`); + // Non-TTY: single completion line, no intermediate chatter. + this.output.write(`Uploaded ${formatBytes(this.upload.sent)}\n`); } } private tickUpload(): void { + if (this.upload.finished) return; if (this.upload.bar) { - this.upload.bar.update(this.upload.sent); - return; - } - // Non-TTY: log a line every 10% of the total (or every 5MB if total unknown). - const step = this.uploadTotal > 0 ? this.uploadTotal / 10 : 5 * 1024 * 1024; - if (this.upload.sent - this.upload.textLastLogged >= step) { - this.upload.textLastLogged = this.upload.sent; - const pct = this.uploadTotal > 0 ? Math.min(100, Math.floor((this.upload.sent / this.uploadTotal) * 100)) : null; - this.output.write( - pct !== null - ? `Uploaded ${formatBytes(this.upload.sent)} / ~${formatBytes(this.uploadTotal)} (${pct}%)\n` - : `Uploaded ${formatBytes(this.upload.sent)}\n` - ); + this.upload.bar.update(this.upload.sent, { value_fmt: formatBytes(this.upload.sent) }); } + // Non-TTY: no intermediate lines — endUpload() prints the final size on completion. } renderEvent(message: SSEMessage): void { diff --git a/components/packageComponent.ts b/components/packageComponent.ts index 6075e8432..c1b8a0cbc 100644 --- a/components/packageComponent.ts +++ b/components/packageComponent.ts @@ -1,4 +1,5 @@ import { join } from 'path'; +import { stat, readdir } from 'node:fs/promises'; import { Readable } from 'node:stream'; import tar from 'tar-fs'; import { createGzip } from 'node:zlib'; @@ -36,6 +37,47 @@ export function streamPackagedDirectory(directory: string, options: PackageOptio return packStream.pipe(gzip); } +/** + * Walk `directory` and return the total uncompressed size of all files that + * `streamPackagedDirectory` would include with the same options. Used by the + * CLI to give the upload progress bar a realistic total. The uncompressed size + * won't equal the gzipped wire size, but it gives the bar a steady trajectory: + * the bar moves as bytes are sent and snaps to 100% when the upload finishes. + */ +export async function getPackagedDirectorySize( + directory: string, + options: PackageOptions = DEFAULT_OPTIONS +): Promise { + let total = 0; + const walk = async (dir: string): Promise => { + let entries; + try { + entries = await readdir(dir, { withFileTypes: true }); + } catch { + return; // unreadable directory — skip + } + for (const entry of entries) { + const fullPath = join(dir, entry.name); + if (options.skip_node_modules && (entry.name === 'node_modules' || fullPath.includes(join('cache', 'webpack')))) { + continue; + } + if (entry.isDirectory()) { + await walk(fullPath); + } else { + if (options.skip_symlinks && entry.isSymbolicLink()) continue; + try { + const s = await stat(fullPath); // follows symlinks, matching tar dereference + total += s.size; + } catch { + // inaccessible file — skip + } + } + } + }; + await walk(directory); + return total; +} + /** * Package a directory into a tar+gzip buffer. Retained for callers that need * an in-memory payload (small deploys, tests). For large directories prefer diff --git a/unitTests/bin/deployRenderer.test.js b/unitTests/bin/deployRenderer.test.js index 9ab450b95..3fc5a9c57 100644 --- a/unitTests/bin/deployRenderer.test.js +++ b/unitTests/bin/deployRenderer.test.js @@ -15,7 +15,7 @@ function makeOutput() { describe('DeployRenderer', () => { describe('upload progress (non-TTY)', () => { - it('advances bytes as data flows through the tap and logs at 10% steps', async () => { + it('counts bytes as data flows through the tap and prints a single line on completion', async () => { const { stream, lines } = makeOutput(); const renderer = new DeployRenderer({ uploadTotal: 1_000_000, output: stream }); const source = new PassThrough(); @@ -24,21 +24,14 @@ describe('DeployRenderer', () => { const sink = new PassThrough(); tap.pipe(sink); sink.on('data', () => {}); - // Write 5×200_000 byte chunks → 1MB total, crossing each 10% threshold. + // Write 5×200_000 byte chunks → 1MB total. for (let i = 0; i < 5; i++) source.write(Buffer.alloc(200_000)); source.end(); await new Promise((resolve) => sink.on('end', resolve)); renderer.endUpload(); - // Expect at least one per-10% progress line plus the final "Upload complete" line. - const progressLines = lines.filter((l) => /Uploaded /.test(l)); - assert.ok( - progressLines.length >= 4, - `expected multiple progress lines, got ${progressLines.length}: ${progressLines.join('|')}` - ); - assert.ok( - lines.some((l) => l.startsWith('Upload complete')), - 'should log Upload complete on endUpload' - ); + // No intermediate progress lines — only the final completion line. + assert.strictEqual(lines.length, 1, `expected 1 line, got ${lines.length}: ${lines.join('|')}`); + assert.ok(lines[0].startsWith('Uploaded '), `expected Uploaded line, got: ${lines[0]}`); }); it('endUpload is idempotent', () => { @@ -47,7 +40,7 @@ describe('DeployRenderer', () => { renderer.tapUploadStream(new PassThrough()); renderer.endUpload(); renderer.endUpload(); - const completeLines = lines.filter((l) => l.startsWith('Upload complete')); + const completeLines = lines.filter((l) => l.startsWith('Uploaded ')); assert.strictEqual(completeLines.length, 1); }); }); From 85b681f6b1e15fa7d7c34c491b4146a987255298 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Fri, 22 May 2026 20:14:39 -0600 Subject: [PATCH 08/10] fix: remove premature endUpload() that snapped bar to 100% before upload finished MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For SSE deploys, httpRequest resolves (streamResponse: true) when response headers arrive — which is shortly after the multipart file header is seen by busboy, not after all file data is sent. Calling endUpload() at that point snapped the bar to 100% with only ~50 KiB counted. The counter Transform's flush callback in tapUploadStream already calls endUpload() at the correct moment: when every tar.gz chunk has flowed through the counter and into the HTTP request body. Co-Authored-By: Claude Sonnet 4.6 --- bin/cliOperations.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index f71539104..3a468f2e8 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -239,9 +239,10 @@ async function cliOperations(req: any, skipResponseLog = false) { } let response: any = await httpRequest(options, body); - // Upload is done by the time we get the response; tear the bar down before any SSE - // rendering so the bar and event lines don't fight for the same terminal row. - renderer?.endUpload(); + // endUpload() is called from the counter Transform's flush callback in tapUploadStream + // once all multipart bytes have flowed through. For SSE deploys, httpRequest resolves + // when response headers arrive (streamResponse: true), which happens before the full + // upload completes — calling endUpload() here would snap the bar prematurely. let responseData; if (useSse && response.headers['content-type']?.startsWith('text/event-stream')) { From 8be0e692600ba76f198966be5ef63cd4a42e4161 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Fri, 22 May 2026 21:01:51 -0600 Subject: [PATCH 09/10] fix: count pre-gzip bytes for upload progress so bar tracks accurately to 100% MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the bar counted gzip-compressed wire bytes against an uncompressed directory-size total, so for a 100 MiB component the bar would only reach ~30% before endUpload() snapped it to 100%. - `streamPackagedDirectory` now accepts an optional `onBytes` callback invoked for each raw tar chunk before gzip; both counter and total are uncompressed. - `DeployRenderer.countUploadBytes(n)` is the new public method for this; the counter Transform's `transform` callback no longer counts (it only exists for the `flush` → `endUpload()` signal). - `cliOperations` defers `streamPackagedDirectory` until after the renderer is created, so the `onBytes` callback can be wired directly without a lazy closure. Co-Authored-By: Claude Sonnet 4.6 --- bin/cliOperations.ts | 33 +++++++++++++++++----------- bin/deployRenderer.ts | 17 ++++++++++++-- components/packageComponent.ts | 15 ++++++++++++- unitTests/bin/deployRenderer.test.js | 18 ++++++++++----- 4 files changed, 61 insertions(+), 22 deletions(-) diff --git a/bin/cliOperations.ts b/bin/cliOperations.ts index 3a468f2e8..676d7e2b7 100644 --- a/bin/cliOperations.ts +++ b/bin/cliOperations.ts @@ -50,12 +50,12 @@ const PREPARE_OPERATION: any = { skip_node_modules: req.skip_node_modules !== false, skip_symlinks: req.skip_symlinks === true, }; - // Stream the tar+gzip directly to the server as the file part of a multipart body. - // This bypasses the Node Buffer 2 GB cap that the previous CBOR-encoded path was - // subject to, so large components can deploy without materializing in memory. - req._packageStream = streamPackagedDirectory(projectPath, packageOptions); - // Pre-walk the directory for an uncompressed-size estimate. The gzipped wire size - // will be smaller, so the bar won't reach 100% on its own — endUpload() snaps it. + // Store path + options for deferred stream creation after the renderer is set up, + // so the pre-gzip onBytes callback can be wired directly to renderer.countUploadBytes. + req._projectPath = projectPath; + req._packageOptions = packageOptions; + // Pre-walk the directory for an uncompressed-size estimate. Both the progress counter + // and this total are in uncompressed units so the bar tracks to 100% naturally. req._uploadSizeEstimate = await getPackagedDirectorySize(projectPath, packageOptions); req._multipart = true; }, @@ -215,18 +215,25 @@ async function cliOperations(req: any, skipResponseLog = false) { const renderer = req._multipart ? new DeployRenderer({ uploadTotal: req._uploadSizeEstimate ?? 0 }) : null; let body; if (req._multipart) { - const packageStream = req._packageStream; + // Create the package stream here — after the renderer exists — so we can pass + // renderer.countUploadBytes as the onBytes callback. Both progress and total are + // uncompressed bytes, so the bar tracks accurately to 100% without premature snapping. + const packageStream = streamPackagedDirectory( + req._projectPath, + req._packageOptions, + renderer ? (n) => renderer.countUploadBytes(n) : undefined + ); const fields = {}; for (const [key, value] of Object.entries(req)) { if (key.startsWith('_') || TRANSPORT_ONLY_FIELDS.has(key)) continue; fields[key] = value; } - const multipart = buildMultipartBody( - fields, - packageStream - ? { name: 'payload', filename: 'package.tar.gz', contentType: 'application/gzip', stream: packageStream } - : undefined - ); + const multipart = buildMultipartBody(fields, { + name: 'payload', + filename: 'package.tar.gz', + contentType: 'application/gzip', + stream: packageStream, + }); options.headers['Content-Type'] = multipart.contentType; // Use chunked transfer-encoding: we don't know the total size up front because the // payload is streamed from `tar.pack` and never fully buffered. diff --git a/bin/deployRenderer.ts b/bin/deployRenderer.ts index e505802c6..91677faf7 100644 --- a/bin/deployRenderer.ts +++ b/bin/deployRenderer.ts @@ -79,8 +79,9 @@ export class DeployRenderer { const counter = new Transform({ transform: (chunk, _enc, cb) => { - this.upload.sent += chunk.length; - this.tickUpload(); + // Bytes are counted externally via countUploadBytes() on the pre-gzip tar + // stream so progress and total are both in uncompressed units. The Transform + // is kept here solely to get the flush callback that signals upload completion. cb(null, chunk); }, flush: (cb) => { @@ -93,6 +94,18 @@ export class DeployRenderer { return counter; } + /** + * Record `n` pre-gzip bytes read from the tar pack stream. Called for each + * raw tar chunk by the `onBytes` callback passed to `streamPackagedDirectory`, + * keeping progress and total in the same (uncompressed) unit so the bar + * tracks smoothly and doesn't terminate far short of 100%. + */ + countUploadBytes(n: number): void { + if (this.upload.finished) return; + this.upload.sent += n; + this.tickUpload(); + } + endUpload(): void { if (this.upload.finished) return; this.upload.finished = true; diff --git a/components/packageComponent.ts b/components/packageComponent.ts index c1b8a0cbc..b481c512f 100644 --- a/components/packageComponent.ts +++ b/components/packageComponent.ts @@ -15,8 +15,16 @@ const DEFAULT_OPTIONS: PackageOptions = { skip_node_modules: false, skip_symlink * Package a directory into a tar+gzip stream. The returned Readable can be * piped directly into an HTTP request body, avoiding the Node.js 2GB Buffer * cap that the buffered variant runs into for large components. + * + * @param onBytes - Optional callback invoked with the byte length of each raw + * tar chunk *before* gzip compression. Useful for tracking upload progress + * against an uncompressed-size total (e.g. from `getPackagedDirectorySize`). */ -export function streamPackagedDirectory(directory: string, options: PackageOptions = DEFAULT_OPTIONS): Readable { +export function streamPackagedDirectory( + directory: string, + options: PackageOptions = DEFAULT_OPTIONS, + onBytes?: (n: number) => void +): Readable { const packStream = tar.pack(directory, { dereference: !options.skip_symlinks, ignore: options.skip_node_modules @@ -34,6 +42,11 @@ export function streamPackagedDirectory(directory: string, options: PackageOptio const gzip = createGzip(); // Propagate pack errors onto the gzip stream so a single consumer can listen packStream.on('error', (err) => gzip.destroy(err)); + if (onBytes) { + // Attaching a 'data' listener after pipe() is safe — the stream is already + // in flowing mode and Node's EventEmitter supports multiple listeners. + packStream.on('data', (chunk: Buffer) => onBytes(chunk.length)); + } return packStream.pipe(gzip); } diff --git a/unitTests/bin/deployRenderer.test.js b/unitTests/bin/deployRenderer.test.js index 3fc5a9c57..0e438f506 100644 --- a/unitTests/bin/deployRenderer.test.js +++ b/unitTests/bin/deployRenderer.test.js @@ -15,23 +15,29 @@ function makeOutput() { describe('DeployRenderer', () => { describe('upload progress (non-TTY)', () => { - it('counts bytes as data flows through the tap and prints a single line on completion', async () => { + it('counts pre-gzip bytes via countUploadBytes and prints a single completion line', async () => { const { stream, lines } = makeOutput(); - const renderer = new DeployRenderer({ uploadTotal: 1_000_000, output: stream }); + // 4 × 262_144 = 1_048_576 bytes = exactly 1 MiB (clean binary unit). + const CHUNK = 262_144; + const renderer = new DeployRenderer({ uploadTotal: 4 * CHUNK, output: stream }); const source = new PassThrough(); const tap = renderer.tapUploadStream(source); // Drain into a sink so backpressure doesn't pause the tap. const sink = new PassThrough(); tap.pipe(sink); sink.on('data', () => {}); - // Write 5×200_000 byte chunks → 1MB total. - for (let i = 0; i < 5; i++) source.write(Buffer.alloc(200_000)); + // Simulate pre-gzip counting via countUploadBytes — in production this is driven + // by the tar pack stream's onBytes callback; the tap Transform no longer counts bytes. + for (let i = 0; i < 4; i++) { + const chunk = Buffer.alloc(CHUNK); + source.write(chunk); + renderer.countUploadBytes(chunk.length); + } source.end(); await new Promise((resolve) => sink.on('end', resolve)); - renderer.endUpload(); // No intermediate progress lines — only the final completion line. assert.strictEqual(lines.length, 1, `expected 1 line, got ${lines.length}: ${lines.join('|')}`); - assert.ok(lines[0].startsWith('Uploaded '), `expected Uploaded line, got: ${lines[0]}`); + assert.match(lines[0], /^Uploaded 1\.0 MiB/, `expected Uploaded 1.0 MiB, got: ${lines[0]}`); }); it('endUpload is idempotent', () => { From 6b040a93736da4d2241e722cc37436473b94954b Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Fri, 22 May 2026 21:13:58 -0600 Subject: [PATCH 10/10] Update components/deploymentOperations.ts Co-authored-by: claude[bot] <209825114+claude[bot]@users.noreply.github.com> --- components/deploymentOperations.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/deploymentOperations.ts b/components/deploymentOperations.ts index 418d1ad33..43f33475f 100644 --- a/components/deploymentOperations.ts +++ b/components/deploymentOperations.ts @@ -86,7 +86,7 @@ export async function handleGetDeployment(req: GetRequest): Promise { // our return as the operation's final SSE event. We replay event_log on connect, then // tail the deployment's live emitter (if it's still running on this node) until it // reaches a terminal status. The final return value becomes the SSE `done` event. - if (req.progress) { + if (req.progress && typeof (req.progress as any).emit === 'function') { const sse = req.progress; const liveEmitter = getActiveEmitter(req.deployment_id);