Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
2f8334d
feat(kafka): add kafka-probe deep module + kafkajs runtime dep
May 4, 2026
87626ea
feat(kafka): wire opportunistic verification into endpoint.register
May 4, 2026
9322a42
feat(kafka): wire opportunistic probe through route + CLI + api-client
May 4, 2026
d5d3e47
test(kafka): add testcontainers helper + integration tests for kafka-…
May 4, 2026
2fd4cfc
test(kafka): live-probe e2e scenario + raised coverage ratchet
May 4, 2026
d52874c
test(kafka): cover force=true on probe success and add logger regress…
May 4, 2026
bad5198
fix(kafka): scope coverage config to src/** and reset ratchet to actuals
May 4, 2026
b8186a9
test(kafka): drive remaining src branches and ratchet coverage to 100…
May 4, 2026
150ea05
fix(kafka): correct probe() contract docs and translate input errors …
May 4, 2026
2233b29
refactor(cli): drop gratuitous `result as any` casts on kafka registe…
May 4, 2026
4a4870e
fix(kafka): carry probe error string through KafkaEndpointProbeFailed…
May 4, 2026
fa83487
fix(cli): tighten kafka route parsers to reject empty credential strings
May 4, 2026
c6b881a
refactor(kafka): lift shared Kafka auth types out of probe-specific n…
May 4, 2026
3e7d3c0
refactor(cli): extract Kafka request parsers into dedicated module
May 4, 2026
0fbca30
refactor(kafka): export ProbeResult→KafkaEndpointProbeOutcome adapter
May 4, 2026
fceb6d8
docs(kafka): add JSDoc summaries to public types and @internal markers
May 4, 2026
aad9b8e
docs(kafka): explain probe timeout values and their relationship
May 4, 2026
3230d00
refactor(kafka): drop conditional spreads on optional fields
May 4, 2026
f76382d
refactor(kafka): inline single-use RawProbeOutcome internal type
May 4, 2026
8bce92f
docs(kafka): drop JSDoc summaries that restate the symbol name
May 4, 2026
6f76df7
fix(kafka): allow one-way TLS for SSL/SASL_SSL probes (no forced mTLS)
May 4, 2026
4f2fe83
fix(cli): reject explicitly invalid Kafka auth payloads with HTTP 400
May 4, 2026
1886bcb
feat(cli): support --sasl-mechanism for SCRAM-backed brokers
May 4, 2026
a876f0c
feat(cli): support --password-stdin and DKG_KAFKA_PASSWORD for non-ar…
May 5, 2026
78cd48e
fix(cli): fail fast on partial or misplaced SASL credentials
May 5, 2026
a19287d
fix(cli): probe SSL endpoints regardless of client cert/key (consiste…
May 5, 2026
554e8e0
fix(cli): reject protocol/credential mismatch on Kafka register HTTP …
May 5, 2026
8b66ea8
fix(kafka): map admin.connect SASL auth failures to "failed" status, …
May 5, 2026
05d8313
fix(kafka): reject SSL with mismatched client cert/key as input error
May 5, 2026
859b66f
feat(cli): surface probe status and error on Kafka register 422 respo…
May 5, 2026
3e51bae
fix(cli): reject sasl/ssl without securityProtocol on Kafka register …
May 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion packages/cli/src/api-client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { readFile } from 'node:fs/promises';
import { basename } from 'node:path';
import type { SecurityProtocol } from '@origintrail-official/dkg-kafka';
import { readApiPort, readPid, isProcessRunning } from './config.js';
import { loadTokens } from './auth.js';

Expand Down Expand Up @@ -556,11 +557,35 @@ export class ApiClient {
broker: string;
topic: string;
messageFormat: string;
// Opportunistic verification fields (slice 04). All optional; when omitted
// the daemon skips the probe and the KA records `verificationStatus:
// "unattempted"`.
securityProtocol?: SecurityProtocol;
sasl?: {
mechanism?: 'plain' | 'scram-sha-256' | 'scram-sha-512';
username: string;
password: string;
};
ssl?: {
ca?: string;
cert?: string;
key?: string;
caPath?: string;
certPath?: string;
keyPath?: string;
rejectUnauthorized?: boolean;
};
/** When true, KA is registered even if the probe fails. Sent as ?force=true. */
force?: boolean;
}): Promise<{
uri: string;
contextGraphId: string;
verificationStatus?: 'unattempted' | 'verified' | 'failed';
verifiedAt?: string;
}> {
return this.post('/api/kafka/endpoint', request);
const { force, ...body } = request;
const path = force ? '/api/kafka/endpoint?force=true' : '/api/kafka/endpoint';
return this.post(path, body);
}

async signJoinRequest(contextGraphId: string): Promise<{
Expand Down
184 changes: 181 additions & 3 deletions packages/cli/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import { spawn, execSync } from 'node:child_process';
import { createReadStream } from 'node:fs';
import { fileURLToPath } from 'node:url';
import { join } from 'node:path';
import { writeFile, unlink } from 'node:fs/promises';
import { readFile, writeFile, unlink } from 'node:fs/promises';
import { ethers } from 'ethers';
import type { SecurityProtocol } from '@origintrail-official/dkg-kafka';
import { dkgAuthTokenPath, requestFaucetFunding, toErrorMessage, hasErrorCode } from '@origintrail-official/dkg-core';
import yaml from 'js-yaml';
import {
Expand Down Expand Up @@ -1717,6 +1718,62 @@ assertionCmd

// ─── dkg kafka ──────────────────────────────────────────────────────

/**
* Resolve the SASL password for `dkg kafka endpoint register`, in priority
* order:
* 1. `--password-stdin` (read from stdin; conflicts with `--password`)
* 2. `--password <pass>`
* 3. `DKG_KAFKA_PASSWORD` environment variable
* 4. `undefined` (no password supplied)
*
* `--password <pass>` exposes the secret to shell history and `ps -ef`. The
* stdin / env var paths exist so CI and humans can avoid that. We only ever
* read the FIRST line of stdin and trim trailing newlines — anything beyond
* that is not a password.
*
* Stdin handling: when `--password-stdin` is set, we require a non-TTY stdin
* (a piped value). A TTY-attached stdin would need an interactive prompt with
* suppressed echo, which is intentionally out of scope for this commit; if a
* TTY is detected we fail with a clear pointer to the alternatives.
*/
async function resolveKafkaPassword(opts: {
password?: string;
passwordStdin?: boolean;
}): Promise<string | undefined> {
if (opts.passwordStdin && opts.password) {
throw new Error(
'--password and --password-stdin are mutually exclusive (pick one)',
);
}
if (opts.passwordStdin) {
if (process.stdin.isTTY) {
// Interactive masked prompt is a separate piece of work; pipe the
// password instead, e.g. `printf %s "$PW" | dkg ... --password-stdin`.
throw new Error(
'--password-stdin requires piped stdin; pipe the password (or use DKG_KAFKA_PASSWORD)',
);
}
const chunks: Buffer[] = [];
for await (const chunk of process.stdin) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
const raw = Buffer.concat(chunks).toString('utf8');
// Take the first line only, trim trailing CR/LF — keeps secrets that
// legitimately contain whitespace intact while rejecting the trailing
// newline that `printf '%s\n' "$PW" | ...` and most shells will append.
const firstLine = raw.split(/\r?\n/, 1)[0] ?? '';
return firstLine.length > 0 ? firstLine : undefined;
}
if (typeof opts.password === 'string' && opts.password.length > 0) {
return opts.password;
}
const envPw = process.env.DKG_KAFKA_PASSWORD;
if (typeof envPw === 'string' && envPw.length > 0) {
return envPw;
}
return undefined;
}

const kafkaCmd = program
.command('kafka')
.description('Kafka metadata registration commands');
Expand All @@ -1732,20 +1789,141 @@ kafkaEndpointCmd
.requiredOption('--broker <host:port>', 'Kafka broker host:port')
.requiredOption('--topic <name>', 'Kafka topic name')
.option('--format <mime>', 'Kafka message format MIME type', 'application/json')
// ── opportunistic verification flags (slice 04) ─────────────────────
// Without `--security-protocol` no probe runs and the KA records
// `verificationStatus: "unattempted"`. With it, the daemon attempts a
// one-shot probe and rejects the registration on failure unless
// `--force` is passed.
.option('--security-protocol <proto>', 'PLAINTEXT | SASL_PLAINTEXT | SASL_SSL | SSL')
.option('--username <user>', 'SASL username (SASL_PLAINTEXT or SASL_SSL)')
.option(
'--password <pass>',
'SASL password (NOT recommended — exposes secret in shell history; prefer --password-stdin or DKG_KAFKA_PASSWORD)',
)
.option(
'--password-stdin',
'Read SASL password from stdin (recommended; prevents shell-history exposure)',
)
.option(
'--sasl-mechanism <mechanism>',
'SASL mechanism: plain (default), scram-sha-256, scram-sha-512',
'plain',
)
.option('--ca-pem-path <path>', 'Filesystem path to a CA PEM bundle (SASL_SSL or SSL)')
.option('--cert-pem-path <path>', 'Filesystem path to an mTLS client cert PEM (SSL)')
.option('--key-pem-path <path>', 'Filesystem path to an mTLS client key PEM (SSL)')
.option('--force', 'Register the KA even if the broker probe fails (verificationStatus="failed")')
.action(async (opts: ActionOpts) => {
try {
// Validate `--sasl-mechanism` here (rather than via commander's
// `Option.choices()`) so the error message matches the rest of this
// file's style and so the daemon and CLI produce identical wording for
// the same misconfig. The valid set mirrors `KafkaSaslCredentials`.
const VALID_SASL_MECHANISMS = ['plain', 'scram-sha-256', 'scram-sha-512'] as const;
type SaslMechanism = (typeof VALID_SASL_MECHANISMS)[number];
const saslMechanism = String(opts.saslMechanism ?? 'plain').toLowerCase();
if (!(VALID_SASL_MECHANISMS as readonly string[]).includes(saslMechanism)) {
throw new Error(
`--sasl-mechanism must be one of ${VALID_SASL_MECHANISMS.join(', ')}`,
);
}

// Resolve filesystem PEMs at the CLI layer so the request body carries
// inline PEM strings — the daemon's "filesystem path" mode is a
// separate escape hatch for callers that prefer the daemon to read
// them, but the CLI prefers explicit transport.
const ssl: Record<string, string> = {};
if (opts.caPemPath) ssl.ca = await readFile(String(opts.caPemPath), 'utf8');
if (opts.certPemPath) ssl.cert = await readFile(String(opts.certPemPath), 'utf8');
if (opts.keyPemPath) ssl.key = await readFile(String(opts.keyPemPath), 'utf8');

// Resolve the SASL password from --password / --password-stdin /
// DKG_KAFKA_PASSWORD before composing the request body so that all
// downstream SASL-credential logic reads from a single resolved value.
const resolvedPassword = await resolveKafkaPassword({
password: typeof opts.password === 'string' ? opts.password : undefined,
passwordStdin: Boolean(opts.passwordStdin),
});

// ── SASL credential validation ────────────────────────────────────
// The previous shape silently dropped a half-supplied SASL block when
// exactly one of --username / --password was present. That left the
// caller with `verificationStatus: "unattempted"` even though they
// clearly intended to authenticate — a confusing footgun. Fail fast
// instead, with messages that name every input that could have set
// the credential (including --password-stdin / DKG_KAFKA_PASSWORD).
const username = typeof opts.username === 'string' && opts.username.length > 0
? opts.username
: undefined;
const securityProtocol = opts.securityProtocol
? (String(opts.securityProtocol).toUpperCase() as SecurityProtocol)
: undefined;
const isSaslProtocol =
securityProtocol === 'SASL_PLAINTEXT' || securityProtocol === 'SASL_SSL';
const isNonSaslProtocol =
securityProtocol === 'PLAINTEXT' || securityProtocol === 'SSL';

if ((username && !resolvedPassword) || (!username && resolvedPassword)) {
throw new Error(
'--username and --password (or --password-stdin / DKG_KAFKA_PASSWORD) must be supplied together',
);
}
if (isSaslProtocol && (!username || !resolvedPassword)) {
throw new Error(
'SASL_PLAINTEXT/SASL_SSL requires --username and --password (or --password-stdin / DKG_KAFKA_PASSWORD)',
);
}
if (isNonSaslProtocol && (username || resolvedPassword)) {
throw new Error(
'--username/--password is only valid with SASL_PLAINTEXT or SASL_SSL',
);
}

const client = await ApiClient.connect();
const result = await client.registerKafkaEndpoint({
contextGraphId: opts.cg,
broker: opts.broker,
topic: opts.topic,
messageFormat: opts.format,
...(securityProtocol ? { securityProtocol } : {}),
...(username && resolvedPassword
? {
sasl: {
mechanism: saslMechanism as SaslMechanism,
username,
password: resolvedPassword,
},
}
: {}),
...(Object.keys(ssl).length > 0 ? { ssl } : {}),
...(opts.force ? { force: true } : {}),
});
console.log('Kafka endpoint registered:');
console.log(` URI: ${result.uri}`);
console.log(` Context graph: ${result.contextGraphId}`);
console.log(` URI: ${result.uri}`);
console.log(` Context graph: ${result.contextGraphId}`);
if (result.verificationStatus) {
console.log(` Verification status: ${result.verificationStatus}`);
}
if (result.verifiedAt) {
console.log(` Verified at: ${result.verifiedAt}`);
}
} catch (err) {
console.error(toErrorMessage(err));
Comment thread
zsculac marked this conversation as resolved.
// The route's 422 response carries `probeStatus` (e.g. "failed",
// "unreachable") and `probeError` (kafkajs error class name) as
// top-level fields on the response body. Render them here so users
// debugging an auth or topic failure see the actual cause instead of
// just the generic "pass force=true" message.
const body = (err as { responseBody?: unknown }).responseBody;
if (body && typeof body === 'object') {
const r = body as Record<string, unknown>;
if (typeof r.probeStatus === 'string') {
console.error(` Probe status: ${r.probeStatus}`);
}
if (typeof r.probeError === 'string') {
console.error(` Probe error: ${r.probeError}`);
}
}
process.exit(1);
}
});
Expand Down
Loading
Loading