diff --git a/packages/agent/src/dkg-agent.ts b/packages/agent/src/dkg-agent.ts index 5dc9e60b2..3d1aa7b8f 100644 --- a/packages/agent/src/dkg-agent.ts +++ b/packages/agent/src/dkg-agent.ts @@ -6251,7 +6251,13 @@ export class DKGAgent { }); } - private async isPrivateContextGraph(contextGraphId: string): Promise { + /** + * Returns true when the CG carries a `"private"` access-policy triple in + * the ontology or `_meta` graph, OR when its `_meta` graph contains any + * allowlist predicate (`DKG_ALLOWED_PEER`, `DKG_ALLOWED_AGENT`, or + * `DKG_PARTICIPANT_AGENT`). System paranets always return false. + */ + async isPrivateContextGraph(contextGraphId: string): Promise { if ((Object.values(SYSTEM_PARANETS) as string[]).includes(contextGraphId)) { return false; } diff --git a/packages/cli/src/api-client.ts b/packages/cli/src/api-client.ts index d51715b38..a91c47b14 100644 --- a/packages/cli/src/api-client.ts +++ b/packages/cli/src/api-client.ts @@ -551,14 +551,24 @@ export class ApiClient { return this.get(`/api/context-graph/${encodeURIComponent(contextGraphId)}/participants`); } - async registerKafkaEndpoint(request: { - contextGraphId: string; - broker: string; - topic: string; - messageFormat: string; - }): Promise<{ + async registerKafkaEndpoint( + request: + | { + contextGraphId: string; + broker: string; + topic: string; + messageFormat: string; + } + | { + useLocalCg: true; + broker: string; + topic: string; + messageFormat: string; + }, + ): Promise<{ uri: string; contextGraphId: string; + cgScope: 'local' | 'shared'; }> { return this.post('/api/kafka/endpoint', request); } diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index 0e55ea5de..7590d2416 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -1,6 +1,6 @@ #!/usr/bin/env node -import { Command } from 'commander'; +import { Command, Option } from 'commander'; import { readFileSync, existsSync } from 'node:fs'; import { createInterface } from 'node:readline'; import { spawn, execSync } from 'node:child_process'; @@ -1725,25 +1725,69 @@ const kafkaEndpointCmd = kafkaCmd .command('endpoint') .description('Kafka topic endpoint operations'); +// ADR-0004: explicit local-vs-shared CG choice. The caller must pick +// `--cg ` (publish into a named shared CG) or `--local` (publish into the +// node-local "kafka-local" free CG, lazy-created on first use). Passing +// neither — or both — is rejected by commander before any network call so +// the user gets a clean error pre-network. kafkaEndpointCmd .command('register') - .description('Register a Kafka topic endpoint as a knowledge asset in a named context graph') - .requiredOption('--cg ', 'Target context graph') + .description('Register a Kafka topic endpoint as a knowledge asset in a context graph (named or kafka-local)') + // Mutual exclusion is wired declaratively via `Option#conflicts` so the + // pairing lives next to the option definition. commander throws before + // the action runs if both --cg and --local are passed. + .addOption( + new Option('--cg ', 'Target named context graph (mutually exclusive with --local)') + .conflicts('local'), + ) + .addOption( + new Option('--local', 'Publish into the node-local "kafka-local" free CG (mutually exclusive with --cg)') + .conflicts('cg'), + ) .requiredOption('--broker ', 'Kafka broker host:port') .requiredOption('--topic ', 'Kafka topic name') .option('--format ', 'Kafka message format MIME type', 'application/json') + .addHelpText( + 'after', + '\nExactly one of --cg or --local must be passed. There is no implicit default.', + ) .action(async (opts: ActionOpts) => { + const cgId = typeof opts.cg === 'string' ? opts.cg : undefined; + const useLocal = opts.local === true; + + // Mutual-exclusion is enforced at the parser level via Option#conflicts + // on the addOption() declarations above (commander throws before this + // action runs). The "neither" case is the only thing left to guard here: + // commander treats both options as optional, so we check that exactly + // one is set. + if (!cgId && !useLocal) { + console.error( + 'Pass exactly one of "--cg " (publish into a named shared CG) ' + + 'or "--local" (publish into the local "kafka-local" free CG).', + ); + process.exit(1); + } + try { const client = await ApiClient.connect(); - const result = await client.registerKafkaEndpoint({ - contextGraphId: opts.cg, - broker: opts.broker, - topic: opts.topic, - messageFormat: opts.format, - }); + const request = useLocal + ? { + useLocalCg: true as const, + broker: opts.broker, + topic: opts.topic, + messageFormat: opts.format, + } + : { + contextGraphId: cgId!, + broker: opts.broker, + topic: opts.topic, + messageFormat: opts.format, + }; + const result = await client.registerKafkaEndpoint(request); console.log('Kafka endpoint registered:'); console.log(` URI: ${result.uri}`); console.log(` Context graph: ${result.contextGraphId}`); + console.log(` CG scope: ${result.cgScope}`); } catch (err) { console.error(toErrorMessage(err)); process.exit(1); diff --git a/packages/cli/src/daemon/routes/kafka-adapters.ts b/packages/cli/src/daemon/routes/kafka-adapters.ts new file mode 100644 index 000000000..58136986a --- /dev/null +++ b/packages/cli/src/daemon/routes/kafka-adapters.ts @@ -0,0 +1,54 @@ +// Adapters that bind the daemon's `DKGAgent` to the small interfaces the +// `@origintrail-official/dkg-kafka` package depends on. Kept on the CLI side +// because the agent type lives here; the kafka package stays agent-agnostic. +// +// Slice 02 only consumes these from `routes/kafka.ts`; later kafka routes +// (discover, revoke) will reuse them — one home, one responsibility each. + +import type { DKGAgent } from '@origintrail-official/dkg-agent'; +import type { + KafkaEndpointPublisher, + LocalCgPrimitive, +} from '@origintrail-official/dkg-kafka'; + +/** + * Wraps `agent.publish` with the daemon's `{ public: content }` envelope so + * the kafka package can stay envelope-agnostic. + */ +export function kafkaPublisherFromAgent(agent: DKGAgent): KafkaEndpointPublisher { + return { + async publish(contextGraphId, knowledgeAsset) { + await agent.publish( + contextGraphId, + { public: knowledgeAsset } as Record, + ); + }, + }; +} + +/** + * Adapts the agent's free-CG surface to the `LocalCgPrimitive` shape the + * kafka-local ensurer consumes. `callerAgentAddress` is threaded into + * `createContextGraph` so the create runs under the requesting agent. + * + * The `private: true` flag is hardcoded HERE — the kafka package never sees + * the boolean, so a future refactor cannot accidentally drop it. Without + * `private: true`, the agent auto-subscribes to the CG's gossip topic and + * broadcasts the CG definition (`dkg-agent.ts:3837`); the slice-02 spec + * requires kafka-local to be truly node-local, which means no gossip. + */ +export function kafkaLocalCgFromAgent( + agent: DKGAgent, + callerAgentAddress: string, +): LocalCgPrimitive { + return { + contextGraphExists: (id) => agent.contextGraphExists(id), + isPrivateContextGraph: (id) => agent.isPrivateContextGraph(id), + createPrivateContextGraph: (opts) => + agent.createContextGraph({ + ...opts, + private: true, + callerAgentAddress, + }), + }; +} diff --git a/packages/cli/src/daemon/routes/kafka.ts b/packages/cli/src/daemon/routes/kafka.ts index 550f6f05e..4590629bd 100644 --- a/packages/cli/src/daemon/routes/kafka.ts +++ b/packages/cli/src/daemon/routes/kafka.ts @@ -1,9 +1,14 @@ -import { jsonResponse, readBody, validateRequiredContextGraphId } from '../http-utils.js'; +import { jsonResponse, readBody, isValidContextGraphId } from '../http-utils.js'; import type { RequestContext } from './context.js'; import { + createKafkaLocalCgEnsurer, registerKafkaEndpoint, - type KafkaEndpointPublisher, + validateContextGraphSelection, } from '@origintrail-official/dkg-kafka'; +import { + kafkaLocalCgFromAgent, + kafkaPublisherFromAgent, +} from './kafka-adapters.js'; function isNonEmptyString(value: unknown): value is string { return typeof value === 'string' && value.trim().length > 0; @@ -29,15 +34,26 @@ export async function handleKafkaRoutes(ctx: RequestContext): Promise { const { contextGraphId, + useLocalCg, broker, topic, messageFormat, } = parsed as Record; - if (!validateRequiredContextGraphId(contextGraphId, res)) { - return; + // ADR-0004: explicit local-vs-shared CG choice. We surface the pure + // validator's error message via the daemon's standard 400 envelope. + let selection; + try { + selection = validateContextGraphSelection({ contextGraphId, useLocalCg }); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + return jsonResponse(res, 400, { error: message }); + } + + if (selection.kind === 'shared' && !isValidContextGraphId(selection.contextGraphId)) { + return jsonResponse(res, 400, { error: 'Invalid "contextGraphId"' }); } - const targetContextGraphId = contextGraphId as string; + if (!isNonEmptyString(broker)) { return jsonResponse(res, 400, { error: '"broker" must be a non-empty string' }); } @@ -48,22 +64,21 @@ export async function handleKafkaRoutes(ctx: RequestContext): Promise { return jsonResponse(res, 400, { error: '"messageFormat" must be a non-empty string' }); } - const publisher: KafkaEndpointPublisher = { - async publish(cgId, content) { - await agent.publish( - cgId, - { public: content } as Record, - ); - }, - }; - const result = await registerKafkaEndpoint({ - contextGraphId: targetContextGraphId, + selection, owner: requestAgentAddress.toLowerCase(), broker, topic, messageFormat, - publisher, + publisher: kafkaPublisherFromAgent(agent), + // Fresh ensurer per request; hoisting per-agent is a deferred optimization + // (the agent's "already exists" guard plus the exists-check make repeats cheap). + // The peer-id scopes the kafka-local CG id per node — `kafka-local-{peerId}` — + // so two nodes cannot collide on the literal "kafka-local" id. + ensureLocalCg: createKafkaLocalCgEnsurer( + kafkaLocalCgFromAgent(agent, requestAgentAddress), + agent.peerId, + ), }); return jsonResponse(res, 200, result); diff --git a/packages/cli/test/kafka-cli-smoke.test.ts b/packages/cli/test/kafka-cli-smoke.test.ts index f80ec60b2..704d5733a 100644 --- a/packages/cli/test/kafka-cli-smoke.test.ts +++ b/packages/cli/test/kafka-cli-smoke.test.ts @@ -1,4 +1,4 @@ -import { beforeAll, afterAll, describe, expect, it } from 'vitest'; +import { beforeAll, afterAll, beforeEach, describe, expect, it } from 'vitest'; import { createServer } from 'node:http'; import { execFile } from 'node:child_process'; import { promisify } from 'node:util'; @@ -18,6 +18,7 @@ describe.sequential('kafka CLI smoke', () => { let smokeApiPort: string; let lastBody = ''; let lastAuthHeader = ''; + let nextResponse: { status: number; body: unknown } = { status: 200, body: {} }; beforeAll(async () => { dkgHome = await mkdtemp(join(tmpdir(), 'dkg-kafka-cli-')); @@ -38,11 +39,8 @@ describe.sequential('kafka CLI smoke', () => { chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); } lastBody = Buffer.concat(chunks).toString('utf8'); - res.writeHead(200, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ - uri: 'urn:dkg:kafka-endpoint:0xabc:hash', - contextGraphId: 'devnet-test', - })); + res.writeHead(nextResponse.status, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(nextResponse.body)); return; } @@ -60,12 +58,25 @@ describe.sequential('kafka CLI smoke', () => { }); }); + beforeEach(() => { + lastBody = ''; + lastAuthHeader = ''; + nextResponse = { + status: 200, + body: { + uri: 'urn:dkg:kafka-endpoint:0xabc:hash', + contextGraphId: 'devnet-test', + cgScope: 'shared', + }, + }; + }); + afterAll(async () => { await new Promise((resolve) => server.close(() => resolve())); await rm(dkgHome, { recursive: true, force: true }); }); - it('registers a Kafka endpoint through the CLI', async () => { + it('registers a Kafka endpoint through the CLI with --cg', async () => { const env = { ...process.env, DKG_HOME: dkgHome, DKG_API_PORT: smokeApiPort }; const result = await execFileAsync('node', [ @@ -84,6 +95,7 @@ describe.sequential('kafka CLI smoke', () => { expect(result.stdout).toContain('Kafka endpoint registered:'); expect(result.stdout).toContain('urn:dkg:kafka-endpoint:0xabc:hash'); expect(result.stdout).toContain('devnet-test'); + expect(result.stdout).toContain('CG scope: shared'); expect(lastAuthHeader).toBe('Bearer smoke-token'); expect(JSON.parse(lastBody)).toEqual({ contextGraphId: 'devnet-test', @@ -92,4 +104,93 @@ describe.sequential('kafka CLI smoke', () => { messageFormat: 'application/json', }); }, 15000); + + it('registers a Kafka endpoint through the CLI with --local', async () => { + // The daemon scopes the kafka-local CG id per-node as + // `kafka-local-{peerId}`. The smoke test mocks the daemon, so we choose + // a realistic prefixed value here and assert the CLI prints it back. + const stubPeerId = '12D3KooWStubPeerIdForSmokeTest'; + const stubLocalId = `kafka-local-${stubPeerId}`; + nextResponse = { + status: 200, + body: { + uri: 'urn:dkg:kafka-endpoint:0xabc:hash', + contextGraphId: stubLocalId, + cgScope: 'local', + }, + }; + const env = { ...process.env, DKG_HOME: dkgHome, DKG_API_PORT: smokeApiPort }; + + const result = await execFileAsync('node', [ + CLI_ENTRY, + 'kafka', + 'endpoint', + 'register', + '--local', + '--broker', + 'kafka.example.com:9092', + '--topic', + 'orders.created', + ], { env }); + + expect(result.stdout).toContain('Kafka endpoint registered:'); + expect(result.stdout).toContain('urn:dkg:kafka-endpoint:0xabc:hash'); + expect(result.stdout).toContain(stubLocalId); + expect(result.stdout).toContain('CG scope: local'); + expect(JSON.parse(lastBody)).toEqual({ + useLocalCg: true, + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + }); + }, 15000); + + it('rejects --cg and --local together at the parser level (no network call)', async () => { + const env = { ...process.env, DKG_HOME: dkgHome, DKG_API_PORT: smokeApiPort }; + lastBody = '__no_request__'; + + await expect( + execFileAsync('node', [ + CLI_ENTRY, + 'kafka', + 'endpoint', + 'register', + '--cg', + 'devnet-test', + '--local', + '--broker', + 'kafka.example.com:9092', + '--topic', + 'orders.created', + ], { env }), + ).rejects.toMatchObject({ + code: 1, + stderr: expect.stringMatching(/--cg.*--local|--local.*--cg/i), + }); + + expect(lastBody).toBe('__no_request__'); + }, 15000); + + it('rejects neither --cg nor --local at the action level (no network call)', async () => { + const env = { ...process.env, DKG_HOME: dkgHome, DKG_API_PORT: smokeApiPort }; + lastBody = '__no_request__'; + + await expect( + execFileAsync('node', [ + CLI_ENTRY, + 'kafka', + 'endpoint', + 'register', + '--broker', + 'kafka.example.com:9092', + '--topic', + 'orders.created', + ], { env }), + ).rejects.toMatchObject({ + code: 1, + stderr: expect.stringMatching(/--cg.*--local|--local.*--cg/), + }); + + expect(lastBody).toBe('__no_request__'); + }, 15000); }); diff --git a/packages/kafka/src/endpoint.ts b/packages/kafka/src/endpoint.ts index 22888c8d2..602eea153 100644 --- a/packages/kafka/src/endpoint.ts +++ b/packages/kafka/src/endpoint.ts @@ -1,38 +1,73 @@ -import { buildKafkaEndpointKnowledgeAsset } from './ka-builder.js'; +import { + buildKafkaEndpointKnowledgeAsset, + type KafkaEndpointKnowledgeAsset, +} from './ka-builder.js'; import { buildKafkaEndpointUri } from './uri.js'; +import type { KafkaContextGraphSelection } from './validation.js'; /** * Dependency-inversion boundary: the kafka package needs something that can * publish a JSON-LD knowledge asset. The package hands the bare KA across this * interface; envelope wrapping (e.g. `{ public: ... }`) belongs to the caller. */ -export type KafkaEndpointKnowledgeAsset = ReturnType; - export interface KafkaEndpointPublisher { publish( contextGraphId: string, knowledgeAsset: KafkaEndpointKnowledgeAsset, - ): Promise; + ): Promise; } +export type CgScope = 'local' | 'shared'; + export interface RegisterKafkaEndpointInput { - contextGraphId: string; + /** Pre-validated selection produced by `validateContextGraphSelection`. */ + selection: KafkaContextGraphSelection; owner: string; broker: string; topic: string; messageFormat: string; issuedAt?: string; publisher: KafkaEndpointPublisher; + /** + * Required when `selection.kind === 'local'`. Resolves the destination CG + * id by lazily creating `kafka-local` if needed. The orchestrator stays + * agent-agnostic by depending on this thunk rather than importing the V10 + * primitive directly — see `local-cg.ts` for the concrete implementation + * the route handler binds. + */ + ensureLocalCg?: () => Promise; } export interface RegisterKafkaEndpointResult { uri: string; contextGraphId: string; + cgScope: CgScope; +} + +async function resolveSelection( + selection: KafkaContextGraphSelection, + ensureLocalCg?: () => Promise, +): Promise<{ contextGraphId: string; cgScope: CgScope }> { + if (selection.kind === 'shared') { + return { contextGraphId: selection.contextGraphId, cgScope: 'shared' }; + } + if (!ensureLocalCg) { + throw new Error( + '"ensureLocalCg" is required when selection.kind is "local". ' + + 'The caller must bind a thunk that lazy-creates the kafka-local free CG.', + ); + } + return { contextGraphId: await ensureLocalCg(), cgScope: 'local' }; } export async function registerKafkaEndpoint( input: RegisterKafkaEndpointInput, ): Promise { + const { contextGraphId, cgScope } = await resolveSelection( + input.selection, + input.ensureLocalCg, + ); + const issuedAt = input.issuedAt ?? new Date().toISOString(); const uri = buildKafkaEndpointUri(input); const knowledgeAsset = buildKafkaEndpointKnowledgeAsset({ @@ -43,10 +78,7 @@ export async function registerKafkaEndpoint( issuedAt, }); - await input.publisher.publish(input.contextGraphId, knowledgeAsset); + await input.publisher.publish(contextGraphId, knowledgeAsset); - return { - uri, - contextGraphId: input.contextGraphId, - }; + return { uri, contextGraphId, cgScope }; } diff --git a/packages/kafka/src/index.ts b/packages/kafka/src/index.ts index 9e74b9f44..a69d492d0 100644 --- a/packages/kafka/src/index.ts +++ b/packages/kafka/src/index.ts @@ -1,3 +1,5 @@ export * from './uri.js'; export * from './ka-builder.js'; export * from './endpoint.js'; +export * from './validation.js'; +export * from './local-cg.js'; diff --git a/packages/kafka/src/ka-builder.ts b/packages/kafka/src/ka-builder.ts index 42fe40c30..24a512db0 100644 --- a/packages/kafka/src/ka-builder.ts +++ b/packages/kafka/src/ka-builder.ts @@ -15,7 +15,21 @@ export interface BuildKafkaEndpointKnowledgeAssetInput { issuedAt: string; } -export function buildKafkaEndpointKnowledgeAsset(input: BuildKafkaEndpointKnowledgeAssetInput) { +export interface KafkaEndpointKnowledgeAsset { + '@context': typeof KAFKA_ENDPOINT_CONTEXT; + '@id': string; + '@type': readonly [string, string]; + 'dcat:endpointURL': { '@id': string }; + 'dkg:broker': string; + 'dkg:topic': string; + 'dkg:messageFormat': string; + 'dct:publisher': { '@id': string }; + 'dct:issued': { '@value': string; '@type': 'xsd:dateTime' }; +} + +export function buildKafkaEndpointKnowledgeAsset( + input: BuildKafkaEndpointKnowledgeAssetInput, +): KafkaEndpointKnowledgeAsset { const owner = input.owner.toLowerCase(); return { diff --git a/packages/kafka/src/local-cg.ts b/packages/kafka/src/local-cg.ts new file mode 100644 index 000000000..1c5255601 --- /dev/null +++ b/packages/kafka/src/local-cg.ts @@ -0,0 +1,119 @@ +// Lazy-creation orchestrator for the node-local kafka-local free CG. +// +// "Free CG" = local-only context graph, never registered on-chain. Created +// via `agent.createContextGraph({ private: true })` so the V10 agent skips +// gossip subscription/broadcast (`dkg-agent.ts:3837` flips `subscribed` off +// when `private: true`). Hardcoding `private: true` at the route adapter is +// what makes the CG truly node-local — without it, the agent would still +// auto-subscribe to its own gossip topics and leak the CG definition. +// +// Per-node uniqueness: the CG id is `kafka-local-{peerId}` rather than a bare +// `kafka-local`. This makes a cross-node id collision impossible by +// construction — two nodes literally cannot pick the same id — and removes +// the need for a post-create privacy probe. We pick the libp2p peer-id (not +// a wallet address) because peer-id is the canonical, stable node identity: +// `DKG_CREATOR` already records `did:dkg:agent:{peerId}` and a node may run +// multiple operational wallets that rotate independently. +// +// Promotion path (informational): a node-local kafka-local CG can be promoted +// in place via the existing V10 primitives — `dkg assertion promote` moves +// quads from the assertion graph (LWM) into the shared-memory graph (SWM) +// inside the SAME CG, and `dkg context-graph register` anchors the CG +// on-chain and flips `subscribed: false → true` (`dkg-agent.ts:4227-4236`), +// at which point SWM data starts flowing to peers. Slice 02 ships no +// promotion code of its own — the platform already covers it. + +export const KAFKA_LOCAL_CG_ID_PREFIX = 'kafka-local-'; +export const KAFKA_LOCAL_CG_BARE_ID = 'kafka-local'; + +/** + * Builds the per-node kafka-local CG id from a libp2p peer-id. Two nodes + * cannot collide on this id by construction. + */ +export function kafkaLocalCgIdFor(peerId: string): string { + return `${KAFKA_LOCAL_CG_ID_PREFIX}${peerId}`; +} + +/** + * Minimal V10 free-CG creation surface this module needs. Injected so unit + * tests run without a real DKG agent and the same code path serves both. The + * `createPrivateContextGraph` name is deliberate — the privacy guarantee is + * encoded in the method name rather than a boolean flag, so a future caller + * cannot accidentally drop `private: true` at the boundary. + * + * `isPrivateContextGraph` is required (not optional) so the ensurer can verify + * defence-in-depth that a pre-existing CG with our reserved id is actually + * private — protects against a non-private CG having been created out-of-band + * via `/api/context-graph/create` directly with a colliding id. + */ +export interface LocalCgPrimitive { + contextGraphExists(id: string): Promise; + isPrivateContextGraph(id: string): Promise; + createPrivateContextGraph(opts: { id: string; name: string }): Promise; +} + +/** + * Returns a thunk that ensures the per-node kafka-local CG exists and resolves + * to its id. Concurrent callers in a burst share one in-flight create; the + * gate clears in `finally` so a later call after deletion re-runs the + * exists-check. The gate lives in this closure so two ensurers built from + * different primitives don't accidentally share state keyed only on the id. + */ +export function createKafkaLocalCgEnsurer( + cg: LocalCgPrimitive, + peerId: string, +): () => Promise { + const id = kafkaLocalCgIdFor(peerId); + // Truncate the peer-id in the human-facing CG name so operator UIs don't + // get a 50-char id glued onto every label. The id itself stays full-length. + const displayName = `Kafka Local (${peerId.slice(0, 12)}…)`; + let inFlight: Promise | null = null; + + return async function ensureKafkaLocalCg(): Promise { + if (inFlight) { + return inFlight; + } + inFlight = (async () => { + // Defence-in-depth: `contextGraphExists` only proves an id is in the + // local store, not that it's the private kafka-local CG we expect. A + // pre-existing non-private CG with a colliding id (created via + // `/api/context-graph/create` directly, or stale data from before this + // fix) would otherwise be reused — publishing into a non-local graph + // while the API still reports `cgScope: 'local'`. Verify privacy. + if (await cg.contextGraphExists(id)) { + if (await cg.isPrivateContextGraph(id)) { + return id; + } + throw new Error( + `Context graph "${id}" exists locally but is not private. ` + + `The "${KAFKA_LOCAL_CG_BARE_ID}" prefix is reserved for private node-local CGs. ` + + `Either rename or delete the conflicting CG, or report a bug if you did not create it.`, + ); + } + try { + await cg.createPrivateContextGraph({ id, name: displayName }); + } catch (err: unknown) { + // Race tolerance: another path (parallel CLI, restart-replay) may + // have created the CG between our exists-check and our create. + // The agent surfaces this with an "already exists" message. + const msg = err instanceof Error ? err.message : String(err); + if (!/already exists/i.test(msg)) { + throw err; + } + // Re-verify privacy: a parallel creator may have used the reserved + // id for a NON-private CG. Same threat model as the exists-branch + // check above — fail loudly rather than silently leak. + if (!(await cg.isPrivateContextGraph(id))) { + throw new Error( + `Context graph "${id}" was created concurrently but is not private. ` + + `Reserved id was used for a non-private CG — investigate.`, + ); + } + } + return id; + })().finally(() => { + inFlight = null; + }); + return inFlight; + }; +} diff --git a/packages/kafka/src/validation.ts b/packages/kafka/src/validation.ts new file mode 100644 index 000000000..63f241f08 --- /dev/null +++ b/packages/kafka/src/validation.ts @@ -0,0 +1,83 @@ +// Pure validation for the context-graph selection on a Kafka endpoint +// registration. Caller must pass exactly one of `contextGraphId` (named +// shared CG) or `useLocalCg: true` (the node-local kafka-local free CG). +// Neither and both are hard errors. No DKG client dependency: the route +// handler maps a thrown error to a 400 response. The local-cg constant +// imports below are just string literals — no I/O. See ADR-0004. + +import { + KAFKA_LOCAL_CG_BARE_ID, + KAFKA_LOCAL_CG_ID_PREFIX, +} from './local-cg.js'; + +const BOTH_OPTIONS_HINT = + 'Pass exactly one of "contextGraphId" (publish into a named shared CG) ' + + 'or "useLocalCg": true (publish into the local kafka-local free CG).'; + +export interface KafkaContextGraphSelectionInput { + contextGraphId?: unknown; + useLocalCg?: unknown; +} + +export type KafkaContextGraphSelection = + | { kind: 'shared'; contextGraphId: string } + | { kind: 'local' }; + +export function validateContextGraphSelection( + input: KafkaContextGraphSelectionInput, +): KafkaContextGraphSelection { + const hasCg = input.contextGraphId != null; + const localValue = input.useLocalCg; + + // Type-check `useLocalCg` first so that explicit non-boolean values like `0` + // or `'true'` always fail with a type error, regardless of whether a + // `contextGraphId` is also present. Coercion would silently mask caller bugs. + if ( + localValue !== undefined && + localValue !== null && + typeof localValue !== 'boolean' + ) { + throw new Error('"useLocalCg" must be a boolean (true).'); + } + + // Treat `useLocalCg: false` as equivalent to omission (matches typical + // JSON-default serialization patterns where a client emits all fields with + // their defaults). Only `useLocalCg === true` is a positive selection. + const wantsLocal = localValue === true; + + if (hasCg && wantsLocal) { + throw new Error( + `"contextGraphId" and "useLocalCg" are mutually exclusive. ${BOTH_OPTIONS_HINT}`, + ); + } + + if (wantsLocal) { + return { kind: 'local' }; + } + + if (hasCg) { + if (typeof input.contextGraphId !== 'string') { + throw new Error('"contextGraphId" must be a string.'); + } + const trimmed = input.contextGraphId.trim(); + if (trimmed.length === 0) { + throw new Error('"contextGraphId" must be a non-empty string.'); + } + // Reject both the bare reserved id and any per-node prefixed form + // (`kafka-local-{peerId}`). The prefix space is owned by the local-CG + // ensurer; letting a caller smuggle one of those ids through the shared + // path would bypass the lazy-create gate and risk publishing into another + // node's local CG. + if ( + trimmed === KAFKA_LOCAL_CG_BARE_ID || + trimmed.startsWith(KAFKA_LOCAL_CG_ID_PREFIX) + ) { + throw new Error( + `"contextGraphId" cannot be "${trimmed}" — the "${KAFKA_LOCAL_CG_BARE_ID}" prefix is reserved for per-node local free CGs. Use "useLocalCg": true instead.`, + ); + } + return { kind: 'shared', contextGraphId: trimmed }; + } + + throw new Error(`Missing context-graph selection. ${BOTH_OPTIONS_HINT}`); +} diff --git a/packages/kafka/test/e2e/walking-skeleton.test.ts b/packages/kafka/test/e2e/walking-skeleton.test.ts index 9cca516be..a0b28b147 100644 --- a/packages/kafka/test/e2e/walking-skeleton.test.ts +++ b/packages/kafka/test/e2e/walking-skeleton.test.ts @@ -176,6 +176,7 @@ describe('kafka walking skeleton e2e', () => { expect(result.stdout).toContain('Kafka endpoint registered:'); expect(result.stdout).toContain(expectedUri); expect(result.stdout).toContain(CONTEXT_GRAPH_ID); + expect(result.stdout).toContain('CG scope: shared'); const row = await waitForEndpointRow(client, CONTEXT_GRAPH_ID, expectedUri); @@ -186,4 +187,104 @@ describe('kafka walking skeleton e2e', () => { expect(stripIriDelimiters(row.endpointUrl ?? '')).toBe(`kafka://${broker}/${topic}`); expect(Number.isNaN(Date.parse(stripQuotedLiteral(row.issued ?? '')))).toBe(false); }, 90_000); + + it('registers a Kafka endpoint into kafka-local-{peerId} with --local and discovers it via SPARQL', async () => { + const broker = 'kafka.e2e.local:9092'; + const topic = `walking-skeleton-local.${Date.now()}`; + const messageFormat = 'application/cloudevents+json'; + const expectedUri = buildKafkaEndpointUri({ owner, broker, topic }); + + const result = await execFileAsync( + 'node', + [ + CLI_ENTRY, + 'kafka', + 'endpoint', + 'register', + '--local', + '--broker', + broker, + '--topic', + topic, + '--format', + messageFormat, + ], + { + cwd: REPO_ROOT, + env: { + ...process.env, + DKG_HOME: DEVNET_NODE1_HOME, + DKG_API_PORT: String(port), + }, + }, + ); + + expect(result.stdout).toContain('Kafka endpoint registered:'); + expect(result.stdout).toContain(expectedUri); + expect(result.stdout).toContain('kafka-local'); + expect(result.stdout).toContain('CG scope: local'); + + // The daemon scopes the kafka-local CG id per-node as + // `kafka-local-{peerId}`. Parse the resolved id from the CLI output and + // SPARQL-query against THAT id so the test stays correct on any node. + const cgLineMatch = result.stdout.match(/Context graph:\s+(\S+)/); + expect(cgLineMatch?.[1]).toBeDefined(); + const resolvedCgId = cgLineMatch![1]!; + expect(resolvedCgId.startsWith('kafka-local-')).toBe(true); + + const row = await waitForEndpointRow(client, resolvedCgId, expectedUri); + + expect(stripQuotedLiteral(row.broker ?? '')).toBe(broker); + expect(stripQuotedLiteral(row.topic ?? '')).toBe(topic); + expect(stripQuotedLiteral(row.messageFormat ?? '')).toBe(messageFormat); + expect(stripIriDelimiters(row.publisher ?? '')).toBe(`urn:dkg:agent:${owner}`); + expect(stripIriDelimiters(row.endpointUrl ?? '')).toBe(`kafka://${broker}/${topic}`); + expect(Number.isNaN(Date.parse(stripQuotedLiteral(row.issued ?? '')))).toBe(false); + }, 90_000); + + it('rejects a request with neither contextGraphId nor useLocalCg with a 4xx', async () => { + const broker = 'kafka.e2e.local:9092'; + const topic = `walking-skeleton-bad-${Date.now()}`; + const messageFormat = 'application/json'; + + const response = await fetch(`http://127.0.0.1:${port}/api/kafka/endpoint`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${token}`, + }, + body: JSON.stringify({ broker, topic, messageFormat }), + }); + + expect(response.status).toBe(400); + const payload = (await response.json()) as { error?: string }; + expect(payload.error ?? '').toMatch(/contextGraphId/); + expect(payload.error ?? '').toMatch(/useLocalCg/); + }, 30_000); + + it('rejects a request with both contextGraphId and useLocalCg with a 4xx', async () => { + const broker = 'kafka.e2e.local:9092'; + const topic = `walking-skeleton-bad-${Date.now()}`; + const messageFormat = 'application/json'; + + const response = await fetch(`http://127.0.0.1:${port}/api/kafka/endpoint`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${token}`, + }, + body: JSON.stringify({ + contextGraphId: CONTEXT_GRAPH_ID, + useLocalCg: true, + broker, + topic, + messageFormat, + }), + }); + + expect(response.status).toBe(400); + const payload = (await response.json()) as { error?: string }; + expect(payload.error ?? '').toMatch(/contextGraphId/); + expect(payload.error ?? '').toMatch(/useLocalCg/); + }, 30_000); }); diff --git a/packages/kafka/test/endpoint.register.test.ts b/packages/kafka/test/endpoint.register.test.ts index e22f06fcb..00ec9aefa 100644 --- a/packages/kafka/test/endpoint.register.test.ts +++ b/packages/kafka/test/endpoint.register.test.ts @@ -2,7 +2,7 @@ import { describe, expect, it } from 'vitest'; import { registerKafkaEndpoint } from '../src/endpoint.js'; describe('registerKafkaEndpoint', () => { - it('publishes the Kafka endpoint KA into the named context graph', async () => { + it('publishes the Kafka endpoint KA into the named context graph (shared scope)', async () => { const calls: Array<{ contextGraphId: string; content: unknown }> = []; const publisher = { async publish(contextGraphId: string, content: unknown) { @@ -12,7 +12,7 @@ describe('registerKafkaEndpoint', () => { }; const result = await registerKafkaEndpoint({ - contextGraphId: 'devnet-test', + selection: { kind: 'shared', contextGraphId: 'devnet-test' }, owner: '0xAbCDEFabcdefABCDEFabcdefABCDEFabcdefABCD', broker: 'kafka.example.com:9092', topic: 'orders.created', @@ -25,6 +25,7 @@ describe('registerKafkaEndpoint', () => { uri: 'urn:dkg:kafka-endpoint:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd:' + '33b58f60595c766739f72b29e4ee417888d1a46af8339a4b5bdb1c3a5692f652', contextGraphId: 'devnet-test', + cgScope: 'shared', }); expect(calls).toHaveLength(1); @@ -55,4 +56,91 @@ describe('registerKafkaEndpoint', () => { }, }); }); + + it('publishes into the resolved local CG when selection.kind is "local"', async () => { + const calls: Array<{ contextGraphId: string; content: unknown }> = []; + const publisher = { + async publish(contextGraphId: string, content: unknown) { + calls.push({ contextGraphId, content }); + return { ual: 'did:dkg:test/1', kcId: '1', status: 'confirmed' as const }; + }, + }; + let ensureCalls = 0; + const ensureLocalCg = async (): Promise => { + ensureCalls += 1; + return 'kafka-local'; + }; + + const result = await registerKafkaEndpoint({ + selection: { kind: 'local' }, + owner: '0xAbCDEFabcdefABCDEFabcdefABCDEFabcdefABCD', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + issuedAt: '2026-05-04T12:34:56.000Z', + publisher, + ensureLocalCg, + }); + + expect(ensureCalls).toBe(1); + expect(result).toEqual({ + uri: 'urn:dkg:kafka-endpoint:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd:' + + '33b58f60595c766739f72b29e4ee417888d1a46af8339a4b5bdb1c3a5692f652', + contextGraphId: 'kafka-local', + cgScope: 'local', + }); + expect(calls).toHaveLength(1); + expect(calls[0]?.contextGraphId).toBe('kafka-local'); + }); + + it('throws when selection.kind is "local" but ensureLocalCg is not provided', async () => { + const publisher = { + async publish() { + return { ual: 'did:dkg:test/1', kcId: '1', status: 'confirmed' as const }; + }, + }; + + await expect( + registerKafkaEndpoint({ + selection: { kind: 'local' }, + owner: '0xAbCDEFabcdefABCDEFabcdefABCDEFabcdefABCD', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + publisher, + }), + ).rejects.toThrow(/ensureLocalCg/); + }); + + it('defaults issuedAt to "now" when caller omits it', async () => { + const calls: Array<{ contextGraphId: string; content: unknown }> = []; + const publisher = { + async publish(contextGraphId: string, content: unknown) { + calls.push({ contextGraphId, content }); + return { ual: 'did:dkg:test/1', kcId: '1', status: 'confirmed' as const }; + }, + }; + + const before = Date.now(); + await registerKafkaEndpoint({ + selection: { kind: 'shared', contextGraphId: 'devnet-test' }, + owner: '0xAbCDEFabcdefABCDEFabcdefABCDEFabcdefABCD', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + publisher, + }); + const after = Date.now(); + + // Verify the default-issuedAt branch by reading the timestamp the + // builder actually stamped onto the published KA, not by re-asserting + // wall-clock monotonicity. The KA carries it as a typed xsd:dateTime + // literal at `dct:issued.@value` — see ka-builder.ts. + expect(calls).toHaveLength(1); + const content = calls[0]!.content as { 'dct:issued': { '@value': string } }; + const issuedMs = Date.parse(content['dct:issued']['@value']); + expect(Number.isNaN(issuedMs)).toBe(false); + expect(issuedMs).toBeGreaterThanOrEqual(before); + expect(issuedMs).toBeLessThanOrEqual(after); + }); }); diff --git a/packages/kafka/test/local-cg.test.ts b/packages/kafka/test/local-cg.test.ts new file mode 100644 index 000000000..82cb47000 --- /dev/null +++ b/packages/kafka/test/local-cg.test.ts @@ -0,0 +1,243 @@ +import { describe, expect, it } from 'vitest'; +import { + KAFKA_LOCAL_CG_BARE_ID, + KAFKA_LOCAL_CG_ID_PREFIX, + createKafkaLocalCgEnsurer, + kafkaLocalCgIdFor, +} from '../src/local-cg.js'; + +const TEST_PEER_ID = '12D3KooWAbcDEFghiJKLmnoPQRstuVWxyZ'; +const EXPECTED_ID = `${KAFKA_LOCAL_CG_ID_PREFIX}${TEST_PEER_ID}`; + +interface FakeCgStore { + exists: Set; + // Tracks privacy per-id. Default-true on create unless the test seeds a + // non-private id explicitly. `isPrivateContextGraph` reads from this set. + privateIds: Set; + createCalls: Array<{ id: string; name: string }>; +} + +function makeFakeCg(initial: { withId?: string; nonPrivate?: boolean } = {}) { + const store: FakeCgStore = { + exists: new Set(initial.withId ? [initial.withId] : []), + privateIds: new Set( + initial.withId && !initial.nonPrivate ? [initial.withId] : [], + ), + createCalls: [], + }; + + // The dependency injected into createKafkaLocalCgEnsurer models the V10 + // free-CG primitive. The method is `createPrivateContextGraph` — the + // privacy guarantee is encoded in the method name so the kafka package + // never sees the boolean. Both calls await — close enough to the real V10 + // surface that idempotency proofs translate. + const cg = { + contextGraphExists: async (id: string): Promise => { + // microtask hop so two parallel calls actually interleave their checks + await Promise.resolve(); + return store.exists.has(id); + }, + isPrivateContextGraph: async (id: string): Promise => { + await Promise.resolve(); + return store.privateIds.has(id); + }, + createPrivateContextGraph: async (opts: { id: string; name: string }): Promise => { + // microtask hop, then atomic check-and-set so a real backing store + // cannot double-create. Mirrors `agent.createContextGraph`'s own + // "already exists" guard. + await Promise.resolve(); + if (store.exists.has(opts.id)) { + throw new Error(`Context graph "${opts.id}" already exists`); + } + store.exists.add(opts.id); + // The real adapter hardcodes `private: true`, so any successful create + // through this primitive yields a private CG. + store.privateIds.add(opts.id); + store.createCalls.push({ id: opts.id, name: opts.name }); + }, + }; + + return { store, cg }; +} + +describe('kafkaLocalCgIdFor', () => { + it('builds a per-node id by prefixing the peer-id', () => { + expect(kafkaLocalCgIdFor(TEST_PEER_ID)).toBe(EXPECTED_ID); + }); + + it('exposes the prefix and bare id as separate constants', () => { + expect(KAFKA_LOCAL_CG_ID_PREFIX).toBe('kafka-local-'); + expect(KAFKA_LOCAL_CG_BARE_ID).toBe('kafka-local'); + }); +}); + +describe('createKafkaLocalCgEnsurer', () => { + it('creates the per-node kafka-local CG on first call and returns its id', async () => { + const { store, cg } = makeFakeCg(); + const ensure = createKafkaLocalCgEnsurer(cg, TEST_PEER_ID); + + const id = await ensure(); + + expect(id).toBe(EXPECTED_ID); + expect(store.exists.has(EXPECTED_ID)).toBe(true); + expect(store.createCalls).toEqual([ + { id: EXPECTED_ID, name: expect.any(String) }, + ]); + }); + + it('skips creation when the per-node CG already exists (idempotent on subsequent calls)', async () => { + const { store, cg } = makeFakeCg({ withId: EXPECTED_ID }); + const ensure = createKafkaLocalCgEnsurer(cg, TEST_PEER_ID); + + const id = await ensure(); + + expect(id).toBe(EXPECTED_ID); + expect(store.createCalls).toEqual([]); + }); + + it('serializes parallel callers (same ensurer) so the per-node CG is created exactly once', async () => { + const { store, cg } = makeFakeCg(); + const ensure = createKafkaLocalCgEnsurer(cg, TEST_PEER_ID); + + const ids = await Promise.all([ + ensure(), + ensure(), + ensure(), + ensure(), + ensure(), + ]); + + expect(ids).toEqual([ + EXPECTED_ID, + EXPECTED_ID, + EXPECTED_ID, + EXPECTED_ID, + EXPECTED_ID, + ]); + expect(store.createCalls).toHaveLength(1); + expect(store.createCalls[0]?.id).toBe(EXPECTED_ID); + }); + + // The agent's `createPrivateContextGraph` adapter hardcodes `private: true` + // at the route boundary; the kafka package depends on that name. Asserting + // that the ensurer calls THIS method (not a generic create) locks the + // privacy guarantee at the type-system level — a future refactor that + // renames the method back to a generic create would break this test before + // it could leak data. + it('invokes createPrivateContextGraph (not a generic create) on the injected primitive', async () => { + let createPrivateCalls = 0; + const cg = { + contextGraphExists: async (_id: string): Promise => false, + isPrivateContextGraph: async (_id: string): Promise => true, + createPrivateContextGraph: async (_opts: { + id: string; + name: string; + }): Promise => { + createPrivateCalls += 1; + }, + }; + const ensure = createKafkaLocalCgEnsurer(cg, TEST_PEER_ID); + + await ensure(); + + expect(createPrivateCalls).toBe(1); + }); + + // Defence-in-depth path: the in-flight gate already serializes concurrent + // callers in-process, but if an external creator wins the race between our + // exists-check and our create-call, the underlying store will throw + // "already exists". The ensurer must swallow that specific error and + // return the id anyway, while non-"already exists" errors must still + // bubble up. + it('treats a concurrent "already exists" create error as success', async () => { + const cg = { + contextGraphExists: async (_id: string): Promise => false, + // The parallel creator was OUR adapter (private: true), so the + // resulting CG is private and the post-race verification passes. + isPrivateContextGraph: async (_id: string): Promise => true, + createPrivateContextGraph: async (_opts: { id: string; name: string }): Promise => { + throw new Error(`Context graph "${EXPECTED_ID}" already exists`); + }, + }; + const ensure = createKafkaLocalCgEnsurer(cg, TEST_PEER_ID); + + const id = await ensure(); + + expect(id).toBe(EXPECTED_ID); + }); + + it('rethrows non-"already exists" create errors', async () => { + const cg = { + contextGraphExists: async (_id: string): Promise => false, + isPrivateContextGraph: async (_id: string): Promise => true, + createPrivateContextGraph: async (_opts: { id: string; name: string }): Promise => { + throw new Error('storage offline'); + }, + }; + const ensure = createKafkaLocalCgEnsurer(cg, TEST_PEER_ID); + + await expect(ensure()).rejects.toThrow(/storage offline/); + }); + + // Defence-in-depth: `contextGraphExists` only proves an id is in the local + // store, not that it's the private kafka-local CG we expect. A pre-existing + // non-private CG with a colliding id (created via /api/context-graph/create + // directly, or stale data from before this fix) must be REJECTED — not + // silently reused, which would publish into a non-local graph while the API + // still reports `cgScope: 'local'`. + it('throws when a CG with the reserved id exists locally but is not private', async () => { + const { cg } = makeFakeCg({ withId: EXPECTED_ID, nonPrivate: true }); + const ensure = createKafkaLocalCgEnsurer(cg, TEST_PEER_ID); + + await expect(ensure()).rejects.toThrow( + new RegExp(`${EXPECTED_ID}.*not private`, 'i'), + ); + }); + + // Race-tolerance must not silently accept a non-private CG. If a parallel + // creator beat us with `private: false`, the post-race re-verification has + // to fail loudly with a distinct error that points at the race. + it('throws when a parallel "already exists" create yields a non-private CG', async () => { + const cg = { + contextGraphExists: async (_id: string): Promise => false, + isPrivateContextGraph: async (_id: string): Promise => false, + createPrivateContextGraph: async (_opts: { id: string; name: string }): Promise => { + throw new Error(`Context graph "${EXPECTED_ID}" already exists`); + }, + }; + const ensure = createKafkaLocalCgEnsurer(cg, TEST_PEER_ID); + + await expect(ensure()).rejects.toThrow( + /created concurrently but is not private/i, + ); + }); + + // Hidden-coupling regression test: two ensurers built from two different + // primitive instances AND different peer-ids must NOT share their gate. If + // they did, a parallel burst across both would only create the CG in one + // store, silently ignoring the other primitive — see the factory docstring. + // Different peer-ids also exercises the per-node uniqueness guarantee: + // each ensurer resolves to its own `kafka-local-{peerId}`. + it('two ensurers with different primitives and peer-ids each create their own CG independently', async () => { + const peerA = '12D3KooWPeerAAAAAAAAAAAAAAAAAAAAAAAAA'; + const peerB = '12D3KooWPeerBBBBBBBBBBBBBBBBBBBBBBBBB'; + const a = makeFakeCg(); + const b = makeFakeCg(); + const ensureA = createKafkaLocalCgEnsurer(a.cg, peerA); + const ensureB = createKafkaLocalCgEnsurer(b.cg, peerB); + + const [idA, idB] = await Promise.all([ensureA(), ensureB()]); + + expect(idA).toBe(kafkaLocalCgIdFor(peerA)); + expect(idB).toBe(kafkaLocalCgIdFor(peerB)); + expect(idA).not.toBe(idB); + // Each store saw exactly one create — neither piggy-backed on the other. + expect(a.store.createCalls).toHaveLength(1); + expect(b.store.createCalls).toHaveLength(1); + expect(a.store.exists.has(idA)).toBe(true); + expect(b.store.exists.has(idB)).toBe(true); + // Cross-isolation: peer-A's id never lands in peer-B's store, and vice versa. + expect(a.store.exists.has(idB)).toBe(false); + expect(b.store.exists.has(idA)).toBe(false); + }); +}); diff --git a/packages/kafka/test/validation.test.ts b/packages/kafka/test/validation.test.ts new file mode 100644 index 000000000..0346d3e52 --- /dev/null +++ b/packages/kafka/test/validation.test.ts @@ -0,0 +1,129 @@ +import { describe, expect, it } from 'vitest'; +import { validateContextGraphSelection } from '../src/validation.js'; + +describe('validateContextGraphSelection', () => { + it('accepts a non-empty contextGraphId alone and resolves to shared scope', () => { + expect( + validateContextGraphSelection({ contextGraphId: 'devnet-test' }), + ).toEqual({ kind: 'shared', contextGraphId: 'devnet-test' }); + }); + + it('normalizes a whitespace-padded contextGraphId by trimming it', () => { + expect( + validateContextGraphSelection({ contextGraphId: ' devnet-test ' }), + ).toEqual({ kind: 'shared', contextGraphId: 'devnet-test' }); + }); + + it('accepts useLocalCg: true alone and resolves to local scope', () => { + expect(validateContextGraphSelection({ useLocalCg: true })).toEqual({ + kind: 'local', + }); + }); + + it('rejects neither field with an error mentioning both options', () => { + expect(() => validateContextGraphSelection({})).toThrow( + /contextGraphId.*useLocalCg|useLocalCg.*contextGraphId/, + ); + }); + + it('rejects both fields with an error mentioning both options', () => { + expect(() => + validateContextGraphSelection({ + contextGraphId: 'devnet-test', + useLocalCg: true, + }), + ).toThrow(/contextGraphId.*useLocalCg|useLocalCg.*contextGraphId/); + }); + + it('rejects useLocalCg: false alone (treated as neither field set)', () => { + expect(() => + validateContextGraphSelection({ useLocalCg: false }), + ).toThrow(/contextGraphId.*useLocalCg|useLocalCg.*contextGraphId/); + }); + + it('rejects an empty-string contextGraphId', () => { + expect(() => + validateContextGraphSelection({ contextGraphId: '' }), + ).toThrow(/contextGraphId/); + }); + + it('rejects a whitespace-only contextGraphId', () => { + expect(() => + validateContextGraphSelection({ contextGraphId: ' ' }), + ).toThrow(/contextGraphId/); + }); + + it('rejects a non-string contextGraphId', () => { + expect(() => + validateContextGraphSelection({ contextGraphId: 42 as unknown as string }), + ).toThrow(/contextGraphId/); + }); + + it('rejects a non-boolean useLocalCg', () => { + expect(() => + validateContextGraphSelection({ useLocalCg: 'yes' as unknown as boolean }), + ).toThrow(/useLocalCg/); + }); + + // `useLocalCg: false` is treated as equivalent to omission, which matches + // typical JSON-default serialization patterns where a client emits every + // field with its default. Non-boolean values like `0` / `'true'` / `1` are + // still rejected as type errors at the API boundary — coercing them would + // silently mask caller bugs. These tests lock that behavior in. + it('treats useLocalCg: false as "missing selection" (boolean false collapses)', () => { + expect(() => + validateContextGraphSelection({ useLocalCg: false }), + ).toThrow(/Missing context-graph selection/); + }); + + it('treats useLocalCg: 0 as a type error ("must be a boolean")', () => { + expect(() => + validateContextGraphSelection({ useLocalCg: 0 as unknown as boolean }), + ).toThrow(/"useLocalCg" must be a boolean/); + }); + + // Bug 1 regression: a client that auto-emits boolean defaults (e.g. JSON + // serializing `useLocalCg: false`) alongside a real `contextGraphId` must + // be accepted as a shared selection, not rejected as "mutually exclusive". + it('accepts contextGraphId paired with useLocalCg: false as shared (trimmed)', () => { + expect( + validateContextGraphSelection({ + contextGraphId: ' devnet-test ', + useLocalCg: false, + }), + ).toEqual({ kind: 'shared', contextGraphId: 'devnet-test' }); + }); + + // Bug 2 regression: the `kafka-local` namespace is reserved at the package + // level for node-local free CGs (see local-cg.ts). The reservation covers + // both the bare id and any `kafka-local-{peerId}` prefix form — the prefix + // space is owned by the local-CG ensurer. + it('rejects contextGraphId: "kafka-local" with a hint to use useLocalCg: true', () => { + expect(() => + validateContextGraphSelection({ contextGraphId: 'kafka-local' }), + ).toThrow(/"useLocalCg":\s*true/); + }); + + it('rejects whitespace-padded "kafka-local" (trim happens before the reserved-id check)', () => { + expect(() => + validateContextGraphSelection({ contextGraphId: ' kafka-local ' }), + ).toThrow(/kafka-local/); + }); + + it('rejects the per-node prefixed form "kafka-local-{peerId}" with the same hint', () => { + expect(() => + validateContextGraphSelection({ + contextGraphId: 'kafka-local-12D3KooWAbcDEFghiJKLmnoPQRstuVWxyZ', + }), + ).toThrow(/"useLocalCg":\s*true/); + }); + + // Boundary: only the literal `kafka-local-` prefix is reserved. Ids that + // happen to start with the substring `kafkalocal` (no dash) are perfectly + // valid shared ids and must NOT be swept up by the prefix check. + it('accepts a shared id that merely contains "kafkalocal" without the reserved prefix', () => { + expect( + validateContextGraphSelection({ contextGraphId: 'kafkalocal' }), + ).toEqual({ kind: 'shared', contextGraphId: 'kafkalocal' }); + }); +}); diff --git a/vitest.coverage.ts b/vitest.coverage.ts index 7123b7e22..847783141 100644 --- a/vitest.coverage.ts +++ b/vitest.coverage.ts @@ -162,7 +162,7 @@ export const kosavaEpcisCoverage: CoverageThresholds = { export const kosavaKafkaCoverage: CoverageThresholds = { lines: 100, functions: 100, - branches: 50, + branches: 97, statements: 100, };