diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index cf0917df6..e28e11883 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -65,6 +65,7 @@ export { type PeerHealth, } from './dkg-agent.js'; export type { CclPublishedEvaluationRecord, CclPublishedResultEntry } from './dkg-agent.js'; +export type { JsonLdContent, JsonLdDocument } from './dkg-agent-utils.js'; export { bindRandomSampling, type RandomSamplingBindOptions, diff --git a/packages/cli/src/api-client.ts b/packages/cli/src/api-client.ts index d51715b38..36840f6f6 100644 --- a/packages/cli/src/api-client.ts +++ b/packages/cli/src/api-client.ts @@ -556,9 +556,11 @@ export class ApiClient { broker: string; topic: string; messageFormat: string; + private?: boolean; }): Promise<{ uri: string; contextGraphId: string; + private: boolean; }> { return this.post('/api/kafka/endpoint', request); } diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index 0e55ea5de..4d59c6166 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -1732,6 +1732,7 @@ kafkaEndpointCmd .requiredOption('--broker ', 'Kafka broker host:port') .requiredOption('--topic ', 'Kafka topic name') .option('--format ', 'Kafka message format MIME type', 'application/json') + .option('--public', 'Publish the endpoint as a public KA (default: private)') .action(async (opts: ActionOpts) => { try { const client = await ApiClient.connect(); @@ -1740,10 +1741,12 @@ kafkaEndpointCmd broker: opts.broker, topic: opts.topic, messageFormat: opts.format, + ...(opts.public ? { private: false } : {}), }); console.log('Kafka endpoint registered:'); console.log(` URI: ${result.uri}`); console.log(` Context graph: ${result.contextGraphId}`); + console.log(` Private: ${result.private}`); } catch (err) { console.error(toErrorMessage(err)); process.exit(1); diff --git a/packages/cli/src/daemon/http-utils.ts b/packages/cli/src/daemon/http-utils.ts index 462c71a57..dd59e115e 100644 --- a/packages/cli/src/daemon/http-utils.ts +++ b/packages/cli/src/daemon/http-utils.ts @@ -260,6 +260,22 @@ export function validateOptionalSubGraphName( return true; } +/** Type guard: non-empty trimmed string. Does not write a response — caller handles the 400. */ +export function isNonEmptyString(value: unknown): value is string { + return typeof value === 'string' && value.trim().length > 0; +} + +/** Optional-boolean validator. Returns false (and writes a 400) if the field is present but not a boolean. */ +export function validateOptionalBoolean( + value: unknown, + fieldName: string, + res: ServerResponse, +): boolean { + if (value === undefined || typeof value === 'boolean') return true; + jsonResponse(res, 400, { error: `"${fieldName}" must be a boolean` }); + return false; +} + export function validateRequiredContextGraphId( contextGraphId: unknown, res: ServerResponse, diff --git a/packages/cli/src/daemon/json-ld-envelope.ts b/packages/cli/src/daemon/json-ld-envelope.ts new file mode 100644 index 000000000..a57395a70 --- /dev/null +++ b/packages/cli/src/daemon/json-ld-envelope.ts @@ -0,0 +1,9 @@ +import type { JsonLdContent, JsonLdDocument } from '@origintrail-official/dkg-agent'; + +/** Wrap a JSON-LD document in the `{ public }` or `{ private }` envelope `DKGAgent.publish` expects. */ +export function wrapJsonLdContent( + content: JsonLdDocument, + options: { private: boolean }, +): JsonLdContent { + return options.private ? { private: content } : { public: content }; +} diff --git a/packages/cli/src/daemon/routes/kafka.ts b/packages/cli/src/daemon/routes/kafka.ts index 550f6f05e..be91edd71 100644 --- a/packages/cli/src/daemon/routes/kafka.ts +++ b/packages/cli/src/daemon/routes/kafka.ts @@ -1,14 +1,17 @@ -import { jsonResponse, readBody, validateRequiredContextGraphId } from '../http-utils.js'; +import { + isNonEmptyString, + jsonResponse, + readBody, + validateOptionalBoolean, + validateRequiredContextGraphId, +} from '../http-utils.js'; +import { wrapJsonLdContent } from '../json-ld-envelope.js'; import type { RequestContext } from './context.js'; import { registerKafkaEndpoint, type KafkaEndpointPublisher, } from '@origintrail-official/dkg-kafka'; -function isNonEmptyString(value: unknown): value is string { - return typeof value === 'string' && value.trim().length > 0; -} - export async function handleKafkaRoutes(ctx: RequestContext): Promise { const { req, @@ -32,6 +35,7 @@ export async function handleKafkaRoutes(ctx: RequestContext): Promise { broker, topic, messageFormat, + private: privateField, } = parsed as Record; if (!validateRequiredContextGraphId(contextGraphId, res)) { @@ -48,12 +52,15 @@ export async function handleKafkaRoutes(ctx: RequestContext): Promise { return jsonResponse(res, 400, { error: '"messageFormat" must be a non-empty string' }); } + if (!validateOptionalBoolean(privateField, 'private', res)) return; + + // `!== false` is intentional: only literal `false` opts in to public; omitted/undefined defaults to private. + // Do NOT tighten to `=== true` — that would silently break the omitted-defaults-to-private contract. + const isPrivate = privateField !== false; + const publisher: KafkaEndpointPublisher = { async publish(cgId, content) { - await agent.publish( - cgId, - { public: content } as Record, - ); + await agent.publish(cgId, wrapJsonLdContent(content, { private: isPrivate })); }, }; @@ -66,6 +73,6 @@ export async function handleKafkaRoutes(ctx: RequestContext): Promise { publisher, }); - return jsonResponse(res, 200, result); + return jsonResponse(res, 200, { ...result, private: isPrivate }); } } diff --git a/packages/cli/test/api-client.test.ts b/packages/cli/test/api-client.test.ts index 7a6cfff20..04dcef3e3 100644 --- a/packages/cli/test/api-client.test.ts +++ b/packages/cli/test/api-client.test.ts @@ -201,6 +201,10 @@ describe('ApiClient', () => { body: { uri: 'urn:dkg:kafka-endpoint:0xabc:hash', contextGraphId: 'devnet-test', + // Mirror the real route's response shape: it always echoes the + // resolved `private` flag (default-private when omitted from the + // request body, as is the case here). + private: true, }, }); globalThis.fetch = fetch; diff --git a/packages/cli/test/daemon-json-ld-envelope.test.ts b/packages/cli/test/daemon-json-ld-envelope.test.ts new file mode 100644 index 000000000..e18663c19 --- /dev/null +++ b/packages/cli/test/daemon-json-ld-envelope.test.ts @@ -0,0 +1,55 @@ +/** + * Unit tests for `wrapJsonLdContent` — the small helper that produces the + * `{ public }` / `{ private }` envelope shape DKGAgent.publish() expects. + * + * The helper is the privacy boundary for any route that publishes a + * JSON-LD KA. Keeping it tiny and well-tested means slice-02/07 (and any + * future route that publishes a KA) can adopt it without re-deriving + * envelope semantics. + */ + +import { describe, expect, it } from 'vitest'; +import { wrapJsonLdContent } from '../src/daemon/json-ld-envelope.js'; + +describe('wrapJsonLdContent', () => { + it('wraps the document in { private: ... } when options.private is true', () => { + const doc = { '@id': 'urn:test:1', 'foo:bar': 'baz' }; + + const envelope = wrapJsonLdContent(doc, { private: true }); + + expect(envelope).toEqual({ private: doc }); + expect(envelope).not.toHaveProperty('public'); + }); + + it('wraps the document in { public: ... } when options.private is false', () => { + const doc = { '@id': 'urn:test:2', 'foo:bar': 'qux' }; + + const envelope = wrapJsonLdContent(doc, { private: false }); + + expect(envelope).toEqual({ public: doc }); + expect(envelope).not.toHaveProperty('private'); + }); + + it('preserves the document by reference (no copy)', () => { + // The helper is a thin wrapper — it must NOT clone the document. A + // copy would silently break callers that rely on identity (e.g. for + // post-publish hooks that mutate the original). + const doc = { '@id': 'urn:test:3' }; + + const privateEnv = wrapJsonLdContent(doc, { private: true }) as { private: object }; + const publicEnv = wrapJsonLdContent(doc, { private: false }) as { public: object }; + + expect(privateEnv.private).toBe(doc); + expect(publicEnv.public).toBe(doc); + }); + + it('accepts an array of JSON-LD documents (graph form)', () => { + // JsonLdDocument is `Record | Record[]` + // so an array of objects must round-trip through the envelope unchanged. + const docs = [{ '@id': 'urn:a' }, { '@id': 'urn:b' }]; + + const envelope = wrapJsonLdContent(docs, { private: true }) as { private: unknown }; + + expect(envelope.private).toBe(docs); + }); +}); diff --git a/packages/cli/test/daemon-routes-kafka.test.ts b/packages/cli/test/daemon-routes-kafka.test.ts new file mode 100644 index 000000000..28bab1893 --- /dev/null +++ b/packages/cli/test/daemon-routes-kafka.test.ts @@ -0,0 +1,131 @@ +/** + * Unit tests for the Kafka route adapter's privacy-envelope logic. + * + * The route adapter (packages/cli/src/daemon/routes/kafka.ts) is responsible + * for wrapping the bare KA in either `{ private: KA }` or `{ public: KA }` + * before passing it to agent.publish(). The kafka package itself stays agnostic. + * + * These tests invoke handleKafkaRoutes directly with a minimal RequestContext + * mock — no real daemon, no network, no chain. The fakes live in + * test/helpers/route-test-utils.ts and are reused by future route tests. + */ + +import { describe, it, expect } from 'vitest'; +import { handleKafkaRoutes } from '../src/daemon/routes/kafka.js'; +import { + makeFakeRequest, + makeFakeResponse, + makeRequestContext, +} from './helpers/route-test-utils.js'; + +const KAFKA_ENDPOINT_URL = '/api/kafka/endpoint'; + +const VALID_BASE_BODY = { + contextGraphId: 'devnet-test', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', +}; + +describe('Kafka route adapter — privacy envelope', () => { + it('wraps with { private: KA } when private: true is in request body', async () => { + const req = makeFakeRequest({ ...VALID_BASE_BODY, private: true }, { url: KAFKA_ENDPOINT_URL }); + const { res, getResult } = makeFakeResponse(); + const { ctx, publishCalls } = makeRequestContext(req, res); + + await handleKafkaRoutes(ctx); + + const { status, body } = getResult(); + expect(status).toBe(200); + + expect(publishCalls).toHaveLength(1); + const { envelope } = publishCalls[0]!; + expect(envelope).toHaveProperty('private'); + expect(envelope).not.toHaveProperty('public'); + + // Response must echo the resolved private flag + expect((body as Record).private).toBe(true); + }); + + it('wraps with { public: KA } when private: false is in request body', async () => { + const req = makeFakeRequest({ ...VALID_BASE_BODY, private: false }, { url: KAFKA_ENDPOINT_URL }); + const { res, getResult } = makeFakeResponse(); + const { ctx, publishCalls } = makeRequestContext(req, res); + + await handleKafkaRoutes(ctx); + + const { status, body } = getResult(); + expect(status).toBe(200); + + expect(publishCalls).toHaveLength(1); + const { envelope } = publishCalls[0]!; + expect(envelope).toHaveProperty('public'); + expect(envelope).not.toHaveProperty('private'); + + // Response must echo the resolved private flag + expect((body as Record).private).toBe(false); + }); + + it('defaults to { private: KA } when private field is omitted from request body', async () => { + // No `private` field — route defaults to private: true + const req = makeFakeRequest(VALID_BASE_BODY, { url: KAFKA_ENDPOINT_URL }); + const { res, getResult } = makeFakeResponse(); + const { ctx, publishCalls } = makeRequestContext(req, res); + + await handleKafkaRoutes(ctx); + + const { status, body } = getResult(); + expect(status).toBe(200); + + expect(publishCalls).toHaveLength(1); + const { envelope } = publishCalls[0]!; + expect(envelope).toHaveProperty('private'); + expect(envelope).not.toHaveProperty('public'); + + // Response echoes resolved private flag = true (default) + expect((body as Record).private).toBe(true); + }); + + it('returns 400 when contextGraphId is missing', async () => { + const req = makeFakeRequest( + { broker: 'x:9092', topic: 't', messageFormat: 'application/json' }, + { url: KAFKA_ENDPOINT_URL }, + ); + const { res, getResult } = makeFakeResponse(); + const { ctx } = makeRequestContext(req, res); + + await handleKafkaRoutes(ctx); + + expect(getResult().status).toBe(400); + }); + + it('returns 400 when broker is missing', async () => { + const req = makeFakeRequest( + { contextGraphId: 'devnet-test', topic: 't', messageFormat: 'application/json' }, + { url: KAFKA_ENDPOINT_URL }, + ); + const { res, getResult } = makeFakeResponse(); + const { ctx } = makeRequestContext(req, res); + + await handleKafkaRoutes(ctx); + + expect(getResult().status).toBe(400); + }); + + it('returns 400 when "private" is a non-boolean value (e.g. string "false")', async () => { + // The route enforces a strict boolean for `private` to keep the privacy + // contract unambiguous. Truthy/falsy coercion would create an unsafe + // ambiguity at a privacy boundary. + const req = makeFakeRequest({ ...VALID_BASE_BODY, private: 'false' }, { url: KAFKA_ENDPOINT_URL }); + const { res, getResult } = makeFakeResponse(); + const { ctx, publishCalls } = makeRequestContext(req, res); + + await handleKafkaRoutes(ctx); + + const { status, body } = getResult(); + expect(status).toBe(400); + expect((body as Record).error).toMatch(/"private" must be a boolean/); + // No publish should have happened + expect(publishCalls).toHaveLength(0); + }); +}); diff --git a/packages/cli/test/helpers/route-test-utils.ts b/packages/cli/test/helpers/route-test-utils.ts new file mode 100644 index 000000000..ce8445561 --- /dev/null +++ b/packages/cli/test/helpers/route-test-utils.ts @@ -0,0 +1,96 @@ +/** Shared in-process mocks for unit-testing daemon route handlers. No real daemon, network, chain, or Hardhat. */ + +import { EventEmitter } from 'node:events'; +import type { IncomingMessage, ServerResponse } from 'node:http'; +import type { RequestContext } from '../../src/daemon/routes/context.js'; + +const DEFAULT_AGENT_ADDRESS = '0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef'; + +/** Fake IncomingMessage; body is JSON-encoded and emitted via data/end events on the next tick. */ +export function makeFakeRequest( + body: object | null, + overrides: { method?: string; url?: string } = {}, +): IncomingMessage { + const emitter = new EventEmitter() as unknown as IncomingMessage; + emitter.method = overrides.method ?? 'POST'; + emitter.url = overrides.url ?? '/'; + + // Defer emit so listeners attach first. + setImmediate(() => { + if (body !== null) { + emitter.emit('data', Buffer.from(JSON.stringify(body))); + } + emitter.emit('end'); + }); + + return emitter; +} + +export interface FakeResponse { + res: ServerResponse; + getResult: () => { status: number; body: unknown }; +} + +/** Fake ServerResponse capturing status + JSON body. Supports the writeHead(status, headers?) overload jsonResponse uses. */ +export function makeFakeResponse(): FakeResponse { + let capturedStatus = 0; + let capturedBody: unknown = null; + + const res = new EventEmitter() as unknown as ServerResponse; + (res as any).writeHead = (status: number, _headers?: unknown) => { + capturedStatus = status; + return res; + }; + (res as any).end = (data?: string) => { + try { + capturedBody = data ? JSON.parse(data) : null; + } catch { + capturedBody = data; + } + return res; + }; + + return { + res, + getResult: () => ({ status: capturedStatus, body: capturedBody }), + }; +} + +export interface PublishCall { + cgId: string; + envelope: unknown; +} + +export type PublishHook = (cgId: string, envelope: unknown) => Promise; + +/** + * Build a minimal RequestContext; the mock agent.publish appends every call to `publishCalls`. + * `overrides.onPublish` overrides the publish return value; `publishCalls` is still populated. + */ +export function makeRequestContext( + req: IncomingMessage, + res: ServerResponse, + overrides: Partial & { onPublish?: PublishHook } = {}, +): { ctx: RequestContext; publishCalls: PublishCall[] } { + const publishCalls: PublishCall[] = []; + const { onPublish, ...ctxOverrides } = overrides; + + const mockAgent = { + async publish(cgId: string, envelope: unknown) { + publishCalls.push({ cgId, envelope }); + if (onPublish) return onPublish(cgId, envelope); + return { ual: 'did:dkg:test/1', kcId: '1', status: 'confirmed' as const }; + }, + }; + + const ctx: Partial = { + req, + res, + agent: mockAgent as unknown as RequestContext['agent'], + requestAgentAddress: DEFAULT_AGENT_ADDRESS, + path: req.url ?? '/', + ...ctxOverrides, + }; + + return { ctx: ctx as RequestContext, publishCalls }; +} diff --git a/packages/cli/test/kafka-cli-smoke.test.ts b/packages/cli/test/kafka-cli-smoke.test.ts index f80ec60b2..6ce38a88a 100644 --- a/packages/cli/test/kafka-cli-smoke.test.ts +++ b/packages/cli/test/kafka-cli-smoke.test.ts @@ -18,6 +18,8 @@ describe.sequential('kafka CLI smoke', () => { let smokeApiPort: string; let lastBody = ''; let lastAuthHeader = ''; + // Track what privacy flag the server should echo back in its response + let serverPrivate = true; beforeAll(async () => { dkgHome = await mkdtemp(join(tmpdir(), 'dkg-kafka-cli-')); @@ -38,10 +40,14 @@ describe.sequential('kafka CLI smoke', () => { chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); } lastBody = Buffer.concat(chunks).toString('utf8'); + const requestedPrivate = (JSON.parse(lastBody) as Record).private; + // Mirror what the real route does: omitted → private, false → public + serverPrivate = requestedPrivate !== false; res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ uri: 'urn:dkg:kafka-endpoint:0xabc:hash', contextGraphId: 'devnet-test', + private: serverPrivate, })); return; } @@ -65,7 +71,7 @@ describe.sequential('kafka CLI smoke', () => { await rm(dkgHome, { recursive: true, force: true }); }); - it('registers a Kafka endpoint through the CLI', async () => { + it('registers a Kafka endpoint (default: private) through the CLI', async () => { const env = { ...process.env, DKG_HOME: dkgHome, DKG_API_PORT: smokeApiPort }; const result = await execFileAsync('node', [ @@ -84,12 +90,49 @@ describe.sequential('kafka CLI smoke', () => { expect(result.stdout).toContain('Kafka endpoint registered:'); expect(result.stdout).toContain('urn:dkg:kafka-endpoint:0xabc:hash'); expect(result.stdout).toContain('devnet-test'); + // Default (no --public flag) → private: true displayed in stdout + expect(result.stdout).toContain('Private: true'); expect(lastAuthHeader).toBe('Bearer smoke-token'); - expect(JSON.parse(lastBody)).toEqual({ + + // No --public flag: `private` field is OMITTED from the request body + // (route defaults to private when the field is absent). + const parsedBody = JSON.parse(lastBody) as Record; + expect(parsedBody).toEqual({ contextGraphId: 'devnet-test', broker: 'kafka.example.com:9092', topic: 'orders.created', messageFormat: 'application/json', }); + expect(parsedBody).not.toHaveProperty('private'); + }, 15000); + + it('registers a public Kafka endpoint when --public flag is provided', async () => { + const env = { ...process.env, DKG_HOME: dkgHome, DKG_API_PORT: smokeApiPort }; + + const result = await execFileAsync('node', [ + CLI_ENTRY, + 'kafka', + 'endpoint', + 'register', + '--cg', + 'devnet-test', + '--broker', + 'kafka.example.com:9092', + '--topic', + 'public-orders', + '--public', + ], { env }); + + expect(result.stdout).toContain('Kafka endpoint registered:'); + // --public flag → private: false displayed in stdout + expect(result.stdout).toContain('Private: false'); + expect(lastAuthHeader).toBe('Bearer smoke-token'); + + // --public flag: `private: false` sent in request body + const parsedBody = JSON.parse(lastBody) as Record; + expect(parsedBody.private).toBe(false); + expect(parsedBody.contextGraphId).toBe('devnet-test'); + expect(parsedBody.broker).toBe('kafka.example.com:9092'); + expect(parsedBody.topic).toBe('public-orders'); }, 15000); }); diff --git a/packages/kafka/test/e2e/walking-skeleton.test.ts b/packages/kafka/test/e2e/walking-skeleton.test.ts index 9cca516be..5b4e09fa1 100644 --- a/packages/kafka/test/e2e/walking-skeleton.test.ts +++ b/packages/kafka/test/e2e/walking-skeleton.test.ts @@ -141,7 +141,9 @@ describe('kafka walking skeleton e2e', () => { if (!RUN_E2E || !devnetReachable) skip(); }); - it('registers a Kafka endpoint into the named context graph and discovers it via SPARQL', async () => { + it('registers a public Kafka endpoint into the named context graph and discovers it via SPARQL', async () => { + // Registers with --public so the KA is wrapped in { public: KA } and + // cleartext quads are stored. SPARQL query should return the endpoint row. const broker = 'kafka.e2e.local:9092'; const topic = `walking-skeleton.${Date.now()}`; const messageFormat = 'application/cloudevents+json'; @@ -162,6 +164,7 @@ describe('kafka walking skeleton e2e', () => { topic, '--format', messageFormat, + '--public', ], { cwd: REPO_ROOT, @@ -176,6 +179,8 @@ describe('kafka walking skeleton e2e', () => { expect(result.stdout).toContain('Kafka endpoint registered:'); expect(result.stdout).toContain(expectedUri); expect(result.stdout).toContain(CONTEXT_GRAPH_ID); + // --public flag → response echoes Private: false + expect(result.stdout).toContain('Private: false'); const row = await waitForEndpointRow(client, CONTEXT_GRAPH_ID, expectedUri); @@ -186,4 +191,58 @@ describe('kafka walking skeleton e2e', () => { expect(stripIriDelimiters(row.endpointUrl ?? '')).toBe(`kafka://${broker}/${topic}`); expect(Number.isNaN(Date.parse(stripQuotedLiteral(row.issued ?? '')))).toBe(false); }, 90_000); + + it('registers a private (default) Kafka endpoint and confirms CLI reports Private: true', async () => { + // Registers WITHOUT --public so the KA is wrapped in { private: KA } and + // stored as encrypted data for CG participants. + // + // V10 private-KA semantics: on a single-node devnet the local participant + // IS the publisher, so the node may decrypt the KA for itself and surface + // it via SPARQL. On a multi-node network, other participants would need + // decryption keys. We do NOT assert SPARQL content here because: + // 1. Verifying encryption-to-CG-participants requires a full multi-node + // devnet with participant key management — beyond this single-node e2e. + // 2. The route-adapter unit tests in daemon-routes-kafka.test.ts already + // confirm { private: KA } is the envelope sent to agent.publish(). + // This e2e test focuses on: + // (a) CLI stdout reports Private: true + // (b) the request was accepted (HTTP 200 → execFileAsync doesn't throw) + const broker = 'kafka.e2e.private:9092'; + const topic = `walking-skeleton-private.${Date.now()}`; + const messageFormat = 'application/cloudevents+json'; + + const result = await execFileAsync( + 'node', + [ + CLI_ENTRY, + 'kafka', + 'endpoint', + 'register', + '--cg', + CONTEXT_GRAPH_ID, + '--broker', + broker, + '--topic', + topic, + '--format', + messageFormat, + // NO --public flag: default is private + ], + { + cwd: REPO_ROOT, + env: { + ...process.env, + DKG_HOME: DEVNET_NODE1_HOME, + DKG_API_PORT: String(port), + }, + }, + ); + + expect(result.stdout).toContain('Kafka endpoint registered:'); + expect(result.stdout).toContain(CONTEXT_GRAPH_ID); + // Default (no --public) → response echoes Private: true + expect(result.stdout).toContain('Private: true'); + // stdout does NOT say "Private: false" + expect(result.stdout).not.toContain('Private: false'); + }, 90_000); });