diff --git a/core b/core index 274e011a1..e190a4838 160000 --- a/core +++ b/core @@ -1 +1 @@ -Subproject commit 274e011a14e63a2e5125891fa49ce496a1a4f24c +Subproject commit e190a48382df83573db88ef131b1ff3f39957cc6 diff --git a/replication/deployRelay.ts b/replication/deployRelay.ts new file mode 100644 index 000000000..665a9940c --- /dev/null +++ b/replication/deployRelay.ts @@ -0,0 +1,266 @@ +import { createReadStream } from 'node:fs'; +import { stat } from 'node:fs/promises'; +import { request as httpsRequest } from 'node:https'; +import { request as httpRequest } from 'node:http'; +import { URL } from 'node:url'; +import { buildMultipartBody } from '../core/bin/multipartBuilder.ts'; +import { get } from '../core/utility/environment/environmentManager.js'; +import { CONFIG_PARAMS } from '../core/utility/hdbTerms.ts'; +import * as logger from '../core/utility/logging/harper_logger.js'; + +interface NodeLike { + name?: string; + url?: string; + host?: string; + port?: number; + verify_tls?: boolean; + rejectUnauthorized?: boolean; + /** Test/proxy override for the operations API base URL. */ + operationsApiUrl?: string; +} + +interface NodeRelayResult { + node: string | undefined; + status: 'success' | 'failed'; + message?: string; + reason?: string; + statusCode?: number; + [key: string]: unknown; +} + +/** + * Injectable dependencies — keeps relayDeployToNode unit-testable without mocking ESM + * modules. Production callers use the default (real `create_authentication_tokens` over + * the replication WS). + */ +export interface RelayDeps { + mintToken: (node: NodeLike) => Promise; +} + +const defaultDeps: RelayDeps = { + mintToken: mintOperationToken, +}; + +// CLI/transport-only fields that must never be replayed to a peer. The streaming-deploy +// origin's `req` carries a few internal flags (e.g. the staged payload path, progress +// emitter) that have no meaning on the peer side and would only confuse its validation. +const NON_FORWARDABLE_FIELDS = new Set([ + 'payload', // the Readable is exhausted on the origin; peers receive the file part + 'progress', // ProgressEmitter is local to the origin + 'hdb_user', // peer authenticates the request itself; don't forward our identity + 'fastifyResponse', + 'baseRequest', + 'baseResponse', +]); + +/** + * Relay a streamed `deploy_component` request to a single peer over direct HTTPS, + * bypassing the WebSocket replication frame which can't carry multi-GB payloads. + * + * Flow: + * 1. Mint a short-lived operation token via the existing replication WS connection + * (`create_authentication_tokens` runs against the peer's auth context, so the token + * it returns is scoped to the replication user the peer already trusts). + * 2. Re-stream the staged payload file as the file part of a multipart/form-data POST + * to the peer's operations API. The payload is read from disk fresh per relay attempt + * so retries (handled by the caller) get a usable stream. + * 3. Parse the JSON response and return per-peer status. + * + * The peer processes the request as a normal local deploy (with `replicated: false` so it + * doesn't fan out further). Failures here are returned as a `failed` result; the caller + * decides whether one peer failing aborts the whole deploy (see HarperFast/harper#524's + * "per-peer status with retry" semantics). + */ +export async function relayDeployToNode( + node: NodeLike, + req: Record, + payloadPath: string, + deps: RelayDeps = defaultDeps +): Promise { + const fields = buildForwardableFields(req); + let payloadSize: number; + try { + payloadSize = (await stat(payloadPath)).size; + } catch (err) { + return { + node: node.name, + status: 'failed', + reason: `staged payload missing: ${(err as Error).message}`, + }; + } + + let token: string; + try { + token = await deps.mintToken(node); + } catch (err) { + return { + node: node.name, + status: 'failed', + reason: `token mint failed: ${(err as Error).message ?? String(err)}`, + }; + } + + const target = resolveOperationsApiUrl(node); + const multipart = buildMultipartBody(fields, { + name: 'payload', + filename: 'package.tar.gz', + contentType: 'application/gzip', + stream: createReadStream(payloadPath), + }); + + try { + const response = await sendMultipart(target, token, multipart, node, payloadSize); + return { node: node.name, status: 'success', ...response }; + } catch (err: any) { + return { + node: node.name, + status: 'failed', + reason: err?.message ?? String(err), + statusCode: err?.statusCode, + }; + } +} + +async function mintOperationToken(node: NodeLike): Promise { + // `create_authentication_tokens` against an already-authenticated peer connection + // returns a token tied to the connection's user (no username/password needed). + // Dynamic-import to avoid pulling the full replicator (and its transitive deps) into + // every consumer of this module — production code that calls relayDeployToNode has the + // replicator loaded anyway, so the cost is paid once and cached. + const { sendOperationToNode } = await import('./replicator.ts'); + const response: any = await sendOperationToNode(node, { operation: 'create_authentication_tokens' }, undefined); + const token = response?.operation_token ?? response?.results?.operation_token; + if (!token || typeof token !== 'string') { + throw new Error('peer did not return an operation_token'); + } + return token; +} + +function buildForwardableFields(req: Record): Record { + const out: Record = {}; + for (const [key, value] of Object.entries(req)) { + if (key.startsWith('_') || NON_FORWARDABLE_FIELDS.has(key)) continue; + out[key] = value; + } + // Critical: the peer must NOT re-replicate. Without this the deploy would fan out from + // each peer back to every other node, which would either loop or storm depending on the + // replication implementation. + out.replicated = false; + return out; +} + +function resolveOperationsApiUrl(node: NodeLike): URL { + // A node config can override the operations API URL directly (used by tests and by + // deployments that put the ops API behind a proxy). Otherwise fall back to the local + // node's configured ops API port; cluster topologies typically use uniform ports. + if (node.operationsApiUrl) return new URL(node.operationsApiUrl); + const securePort = get(CONFIG_PARAMS.OPERATIONSAPI_NETWORK_SECUREPORT); + const insecurePort = get(CONFIG_PARAMS.OPERATIONSAPI_NETWORK_PORT); + const port = node.port ?? securePort ?? insecurePort ?? 9925; + const protocol = securePort ? 'https:' : 'http:'; + const hostname = extractHostname(node); + return new URL(`${protocol}//${hostname}:${port}/`); +} + +function extractHostname(node: NodeLike): string { + if (node.host) return node.host; + if (node.url) { + try { + return new URL(node.url).hostname; + } catch { + // fall through + } + } + if (node.name) { + // node.name is sometimes "host" and sometimes "host:port" — strip the port. + const colon = node.name.lastIndexOf(':'); + return colon > 0 && /^\d+$/.test(node.name.slice(colon + 1)) ? node.name.slice(0, colon) : node.name; + } + throw new Error('node has no hostname (missing name/url/host)'); +} + +interface MultipartBody { + contentType: string; + stream: NodeJS.ReadableStream; +} + +function sendMultipart( + target: URL, + token: string, + multipart: MultipartBody, + node: NodeLike, + contentLengthHint: number +): Promise> { + return new Promise((resolve, reject) => { + const request = target.protocol === 'https:' ? httpsRequest : httpRequest; + // Per-node TLS verification flag, mirroring how setNode reads it (`verify_tls` from + // the node config maps to `rejectUnauthorized`). Default: verify, matching WS + // replication's default posture. + const verifyTls = node.rejectUnauthorized ?? node.verify_tls ?? true; + const req = request( + { + protocol: target.protocol, + hostname: target.hostname, + port: target.port || (target.protocol === 'https:' ? 443 : 80), + method: 'POST', + path: '/', + headers: { + 'Content-Type': multipart.contentType, + 'Transfer-Encoding': 'chunked', + 'Authorization': `Bearer ${token}`, + // SNI hint for the cluster-CA-verifying peer; matches WS replication. + 'Host': target.hostname, + }, + // Reuse the cluster's TLS trust posture: verify peer cert against the cluster + // CAs when verifyTls is enabled (default). The same flag governs replication WS. + rejectUnauthorized: verifyTls, + servername: target.hostname, + }, + (res) => { + let body = ''; + res.setEncoding('utf8'); + res.on('data', (chunk) => (body += chunk)); + res.on('end', () => { + const statusCode = res.statusCode ?? 0; + if (statusCode >= 200 && statusCode < 300) { + try { + resolve(JSON.parse(body)); + } catch { + resolve({ message: body }); + } + } else { + const err = new Error(extractErrorMessage(body) || `HTTP ${statusCode}`); + (err as any).statusCode = statusCode; + reject(err); + } + }); + } + ); + req.on('error', reject); + multipart.stream.on('error', (err) => { + req.destroy(err); + reject(err); + }); + logger.debug?.( + `Relaying deploy to ${node.name ?? target.hostname} via ${target.href} (~${formatBytes(contentLengthHint)})` + ); + multipart.stream.pipe(req); + }); +} + +function extractErrorMessage(body: string): string | undefined { + if (!body) return undefined; + try { + const parsed = JSON.parse(body); + return parsed?.error ?? parsed?.message ?? body.slice(0, 200); + } catch { + return body.slice(0, 200); + } +} + +function formatBytes(bytes: number): string { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KiB`; + if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MiB`; + return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GiB`; +} diff --git a/replication/replicator.ts b/replication/replicator.ts index 876c0527d..e3644ed20 100644 --- a/replication/replicator.ts +++ b/replication/replicator.ts @@ -642,6 +642,34 @@ export async function replicateOperation(req) { 'to nodes', server.nodes.map((node) => node.name) ); + + // `deploy_component` with a staged payload is replicated via direct-HTTPS multipart + // streaming rather than the default WS sendOperation path — the latter buffers the + // whole operation into a single WS frame, which can't carry payloads larger than the + // 2 GB Buffer cap. The staged path is set by core's deployComponent when there's a + // streaming payload AND peers to replicate to. See HarperFast/harper#524. + const useDeployRelay = req.operation === 'deploy_component' && typeof req._stagedPayloadPath === 'string'; + if (useDeployRelay) { + const { relayDeployToNode } = await import('./deployRelay.ts'); + const payloadPath = req._stagedPayloadPath; + const replicatedResults = await Promise.allSettled( + server.nodes.map((node) => relayDeployToNode(node, req, payloadPath)) + ); + (response as any).replicated = replicatedResults.map((settledResult, index) => { + if (settledResult.status === 'rejected') { + return { + node: server.nodes[index]?.name, + status: 'failed', + reason: settledResult.reason?.toString?.() ?? String(settledResult.reason), + }; + } + const result = settledResult.value as { node?: string; [key: string]: unknown }; + if (!result.node) result.node = server.nodes[index]?.name; + return result; + }); + return response; + } + const replicatedResults = await Promise.allSettled( server.nodes.map((node) => { // do all the nodes in parallel diff --git a/unitTests/replication/deployRelay.test.mjs b/unitTests/replication/deployRelay.test.mjs new file mode 100644 index 000000000..5dde8a0d1 --- /dev/null +++ b/unitTests/replication/deployRelay.test.mjs @@ -0,0 +1,231 @@ +/** + * Unit test for the direct-HTTPS deploy relay. + * + * Spins up a local HTTP server playing the role of a peer's operations API, stubs the + * JWT mint function, and verifies that `relayDeployToNode` posts a multipart/form-data + * body containing the expected fields and the staged payload file, with the right + * Authorization header, and parses the response correctly. + * + * Run: `node --test unitTests/replication/deployRelay.test.mjs` + */ +import { test, describe } from 'node:test'; +import { strict as assert } from 'node:assert'; +import { createServer } from 'node:http'; +import { mkdtemp, writeFile, rm } from 'node:fs/promises'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { relayDeployToNode } from '#src/replication/deployRelay'; + +/** + * Minimal multipart parser sufficient for tests — extracts field-name → string-value pairs + * and the single file part's bytes/filename/mime. Doesn't depend on busboy (which would + * mean adding a harper-pro dep just for tests). Assumes the request body fits in memory + * (true for the test-sized fixtures here). + */ +function parseMultipart(body, boundary) { + const out = { fields: {}, fileBytes: Buffer.alloc(0), fileFilename: undefined, fileMimeType: undefined }; + const sep = Buffer.from('\r\n--' + boundary); + const parts = splitBuffer(Buffer.concat([Buffer.from('\r\n'), body]), sep); + for (const part of parts) { + // Drop the leading empty segment and the trailing `--` closing marker. + if (part.length === 0 || part.equals(Buffer.from('--\r\n'))) continue; + // Each part: \r\nheader-line\r\nheader-line\r\n\r\nbody\r\n + const split = indexOfDouble(part); + if (split === -1) continue; + const headers = part.slice(0, split).toString('utf8'); + let value = part.slice(split + 4); + if (value.length >= 2 && value[value.length - 2] === 0x0d && value[value.length - 1] === 0x0a) { + value = value.slice(0, -2); + } + const nameMatch = /name="([^"]+)"/.exec(headers); + const filenameMatch = /filename="([^"]*)"/.exec(headers); + const typeMatch = /Content-Type:\s*([^\r\n]+)/i.exec(headers); + if (!nameMatch) continue; + if (filenameMatch) { + out.fileBytes = value; + out.fileFilename = filenameMatch[1]; + out.fileMimeType = typeMatch?.[1]; + } else { + out.fields[nameMatch[1]] = value.toString('utf8'); + } + } + return out; +} + +function splitBuffer(buf, sep) { + const out = []; + let start = 0; + while (start <= buf.length) { + const idx = buf.indexOf(sep, start); + if (idx === -1) { + out.push(buf.slice(start)); + break; + } + out.push(buf.slice(start, idx)); + start = idx + sep.length; + } + return out; +} + +function indexOfDouble(buf) { + for (let i = 0; i < buf.length - 3; i++) { + if (buf[i] === 0x0d && buf[i + 1] === 0x0a && buf[i + 2] === 0x0d && buf[i + 3] === 0x0a) return i; + } + return -1; +} + +async function withPeerServer(handler) { + const received = { + method: '', + url: '', + headers: {}, + fields: {}, + fileBytes: Buffer.alloc(0), + fileFilename: undefined, + fileMimeType: undefined, + }; + const server = createServer((req, res) => { + received.method = req.method ?? ''; + received.url = req.url ?? ''; + received.headers = req.headers; + const contentType = req.headers['content-type'] || ''; + if (typeof contentType === 'string' && contentType.startsWith('multipart/form-data')) { + const boundary = /boundary=([^;]+)/.exec(contentType)?.[1]; + if (!boundary) { + res.statusCode = 400; + res.end('no boundary'); + return; + } + const chunks = []; + req.on('data', (c) => chunks.push(c)); + req.on('end', () => { + const parsed = parseMultipart(Buffer.concat(chunks), boundary); + Object.assign(received, parsed); + const out = handler(received); + res.statusCode = out.status; + res.setHeader('content-type', 'application/json'); + res.end(typeof out.body === 'string' ? out.body : JSON.stringify(out.body)); + }); + } else { + res.statusCode = 415; + res.end('expected multipart'); + } + }); + await new Promise((resolve) => server.listen(0, '127.0.0.1', () => resolve())); + const addr = server.address(); + if (!addr || typeof addr === 'string') throw new Error('no server address'); + const port = addr.port; + return { + port, + received, + close: () => new Promise((resolve) => server.close(() => resolve())), + }; +} + +describe('relayDeployToNode', () => { + test('streams a multipart deploy and parses the JSON response', async () => { + const tmp = await mkdtemp(join(tmpdir(), 'relay-test-')); + const payloadPath = join(tmp, 'payload.tar.gz'); + const payloadBytes = Buffer.alloc(100 * 1024).fill(0xab); + await writeFile(payloadPath, payloadBytes); + const server = await withPeerServer(() => ({ + status: 200, + body: { message: 'Successfully deployed: demo' }, + })); + try { + const node = { + name: 'peer-1', + host: '127.0.0.1', + port: server.port, + rejectUnauthorized: false, + }; + const result = await relayDeployToNode( + node, + { operation: 'deploy_component', project: 'demo', restart: true, payload: 'should-be-stripped' }, + payloadPath, + { mintToken: async () => 'test-jwt-token' } + ); + assert.equal(result.status, 'success'); + assert.equal(result.node, 'peer-1'); + assert.equal(result.message, 'Successfully deployed: demo'); + assert.equal(server.received.method, 'POST'); + assert.equal(server.received.headers.authorization, 'Bearer test-jwt-token'); + assert.equal(server.received.fields.operation, 'deploy_component'); + assert.equal(server.received.fields.project, 'demo'); + assert.equal(server.received.fields.restart, 'true', 'JSON-encoded booleans on the wire'); + assert.equal(server.received.fields.replicated, 'false', 'peer must NOT re-replicate'); + assert.equal(server.received.fields.payload, undefined, 'CLI/internal fields are stripped'); + assert.equal(server.received.fileBytes.length, payloadBytes.length, 'file part is intact'); + assert.deepEqual(server.received.fileBytes, payloadBytes); + assert.equal(server.received.fileFilename, 'package.tar.gz'); + assert.equal(server.received.fileMimeType, 'application/gzip'); + } finally { + await server.close(); + await rm(tmp, { recursive: true, force: true }); + } + }); + + test('returns a failed result when the peer responds with a 4xx/5xx', async () => { + const tmp = await mkdtemp(join(tmpdir(), 'relay-test-')); + const payloadPath = join(tmp, 'payload.tar.gz'); + await writeFile(payloadPath, 'data'); + const server = await withPeerServer(() => ({ + status: 500, + body: { error: 'Failed to install dependencies for demo' }, + })); + try { + const result = await relayDeployToNode( + { name: 'peer-1', host: '127.0.0.1', port: server.port, rejectUnauthorized: false }, + { operation: 'deploy_component', project: 'demo' }, + payloadPath, + { mintToken: async () => 'token' } + ); + assert.equal(result.status, 'failed'); + assert.equal(result.statusCode, 500); + assert.match(String(result.reason), /Failed to install dependencies/); + } finally { + await server.close(); + await rm(tmp, { recursive: true, force: true }); + } + }); + + test('returns a failed result when the token mint fails (no HTTP call attempted)', async () => { + const tmp = await mkdtemp(join(tmpdir(), 'relay-test-')); + const payloadPath = join(tmp, 'payload.tar.gz'); + await writeFile(payloadPath, 'data'); + let httpHit = false; + const server = await withPeerServer(() => { + httpHit = true; + return { status: 200, body: {} }; + }); + try { + const result = await relayDeployToNode( + { name: 'peer-1', host: '127.0.0.1', port: server.port, rejectUnauthorized: false }, + { operation: 'deploy_component', project: 'demo' }, + payloadPath, + { + mintToken: async () => { + throw new Error('peer rejected token request'); + }, + } + ); + assert.equal(result.status, 'failed'); + assert.match(String(result.reason), /token mint failed: peer rejected token request/); + assert.equal(httpHit, false, 'no HTTPS call should be made when token mint fails'); + } finally { + await server.close(); + await rm(tmp, { recursive: true, force: true }); + } + }); + + test('returns a failed result when the staged payload file is missing', async () => { + const result = await relayDeployToNode( + { name: 'peer-1', host: '127.0.0.1', port: 1, rejectUnauthorized: false }, + { operation: 'deploy_component', project: 'demo' }, + '/nonexistent/path/payload.tar.gz', + { mintToken: async () => 'token' } + ); + assert.equal(result.status, 'failed'); + assert.match(String(result.reason), /staged payload missing/); + }); +});