Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/agent/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export {
type PeerHealth,
} from './dkg-agent.js';
export type { CclPublishedEvaluationRecord, CclPublishedResultEntry } from './dkg-agent.js';
export type { JsonLdContent, JsonLdDocument } from './dkg-agent-utils.js';
export {
bindRandomSampling,
type RandomSamplingBindOptions,
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/api-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,11 @@ export class ApiClient {
broker: string;
topic: string;
messageFormat: string;
private?: boolean;
}): Promise<{
uri: string;
contextGraphId: string;
private: boolean;
Comment thread
zsculac marked this conversation as resolved.
}> {
return this.post('/api/kafka/endpoint', request);
}
Expand Down
3 changes: 3 additions & 0 deletions packages/cli/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,7 @@ kafkaEndpointCmd
.requiredOption('--broker <host:port>', 'Kafka broker host:port')
.requiredOption('--topic <name>', 'Kafka topic name')
.option('--format <mime>', 'Kafka message format MIME type', 'application/json')
.option('--public', 'Publish the endpoint as a public KA (default: private)')
.action(async (opts: ActionOpts) => {
try {
const client = await ApiClient.connect();
Expand All @@ -1740,10 +1741,12 @@ kafkaEndpointCmd
broker: opts.broker,
topic: opts.topic,
messageFormat: opts.format,
...(opts.public ? { private: false } : {}),
});
console.log('Kafka endpoint registered:');
console.log(` URI: ${result.uri}`);
console.log(` Context graph: ${result.contextGraphId}`);
console.log(` Private: ${result.private}`);
} catch (err) {
console.error(toErrorMessage(err));
process.exit(1);
Expand Down
16 changes: 16 additions & 0 deletions packages/cli/src/daemon/http-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,22 @@ export function validateOptionalSubGraphName(
return true;
}

/** Type guard: non-empty trimmed string. Does not write a response — caller handles the 400. */
export function isNonEmptyString(value: unknown): value is string {
return typeof value === 'string' && value.trim().length > 0;
}

/** Optional-boolean validator. Returns false (and writes a 400) if the field is present but not a boolean. */
export function validateOptionalBoolean(
value: unknown,
fieldName: string,
res: ServerResponse,
): boolean {
if (value === undefined || typeof value === 'boolean') return true;
jsonResponse(res, 400, { error: `"${fieldName}" must be a boolean` });
return false;
}

export function validateRequiredContextGraphId(
contextGraphId: unknown,
res: ServerResponse,
Expand Down
9 changes: 9 additions & 0 deletions packages/cli/src/daemon/json-ld-envelope.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import type { JsonLdContent, JsonLdDocument } from '@origintrail-official/dkg-agent';

/** Wrap a JSON-LD document in the `{ public }` or `{ private }` envelope `DKGAgent.publish` expects. */
export function wrapJsonLdContent(
content: JsonLdDocument,
options: { private: boolean },
): JsonLdContent {
return options.private ? { private: content } : { public: content };
}
27 changes: 17 additions & 10 deletions packages/cli/src/daemon/routes/kafka.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import { jsonResponse, readBody, validateRequiredContextGraphId } from '../http-utils.js';
import {
isNonEmptyString,
jsonResponse,
readBody,
validateOptionalBoolean,
validateRequiredContextGraphId,
} from '../http-utils.js';
import { wrapJsonLdContent } from '../json-ld-envelope.js';
import type { RequestContext } from './context.js';
import {
registerKafkaEndpoint,
type KafkaEndpointPublisher,
} from '@origintrail-official/dkg-kafka';

function isNonEmptyString(value: unknown): value is string {
return typeof value === 'string' && value.trim().length > 0;
}

export async function handleKafkaRoutes(ctx: RequestContext): Promise<void> {
const {
req,
Expand All @@ -32,6 +35,7 @@ export async function handleKafkaRoutes(ctx: RequestContext): Promise<void> {
broker,
topic,
messageFormat,
private: privateField,
} = parsed as Record<string, unknown>;

if (!validateRequiredContextGraphId(contextGraphId, res)) {
Expand All @@ -48,12 +52,15 @@ export async function handleKafkaRoutes(ctx: RequestContext): Promise<void> {
return jsonResponse(res, 400, { error: '"messageFormat" must be a non-empty string' });
}

if (!validateOptionalBoolean(privateField, 'private', res)) return;

// `!== false` is intentional: only literal `false` opts in to public; omitted/undefined defaults to private.
// Do NOT tighten to `=== true` — that would silently break the omitted-defaults-to-private contract.
const isPrivate = privateField !== false;
Comment thread
zsculac marked this conversation as resolved.
Comment thread
zsculac marked this conversation as resolved.

const publisher: KafkaEndpointPublisher = {
async publish(cgId, content) {
await agent.publish(
cgId,
{ public: content } as Record<string, unknown>,
);
await agent.publish(cgId, wrapJsonLdContent(content, { private: isPrivate }));
},
};

Expand All @@ -66,6 +73,6 @@ export async function handleKafkaRoutes(ctx: RequestContext): Promise<void> {
publisher,
});

return jsonResponse(res, 200, result);
return jsonResponse(res, 200, { ...result, private: isPrivate });
}
}
4 changes: 4 additions & 0 deletions packages/cli/test/api-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ describe('ApiClient', () => {
body: {
uri: 'urn:dkg:kafka-endpoint:0xabc:hash',
contextGraphId: 'devnet-test',
// Mirror the real route's response shape: it always echoes the
// resolved `private` flag (default-private when omitted from the
// request body, as is the case here).
private: true,
},
});
globalThis.fetch = fetch;
Expand Down
55 changes: 55 additions & 0 deletions packages/cli/test/daemon-json-ld-envelope.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Unit tests for `wrapJsonLdContent` — the small helper that produces the
* `{ public }` / `{ private }` envelope shape DKGAgent.publish() expects.
*
* The helper is the privacy boundary for any route that publishes a
* JSON-LD KA. Keeping it tiny and well-tested means slice-02/07 (and any
* future route that publishes a KA) can adopt it without re-deriving
* envelope semantics.
*/

import { describe, expect, it } from 'vitest';
import { wrapJsonLdContent } from '../src/daemon/json-ld-envelope.js';

describe('wrapJsonLdContent', () => {
it('wraps the document in { private: ... } when options.private is true', () => {
const doc = { '@id': 'urn:test:1', 'foo:bar': 'baz' };

const envelope = wrapJsonLdContent(doc, { private: true });

expect(envelope).toEqual({ private: doc });
expect(envelope).not.toHaveProperty('public');
});

it('wraps the document in { public: ... } when options.private is false', () => {
const doc = { '@id': 'urn:test:2', 'foo:bar': 'qux' };

const envelope = wrapJsonLdContent(doc, { private: false });

expect(envelope).toEqual({ public: doc });
expect(envelope).not.toHaveProperty('private');
});

it('preserves the document by reference (no copy)', () => {
// The helper is a thin wrapper — it must NOT clone the document. A
// copy would silently break callers that rely on identity (e.g. for
// post-publish hooks that mutate the original).
const doc = { '@id': 'urn:test:3' };

const privateEnv = wrapJsonLdContent(doc, { private: true }) as { private: object };
const publicEnv = wrapJsonLdContent(doc, { private: false }) as { public: object };

expect(privateEnv.private).toBe(doc);
expect(publicEnv.public).toBe(doc);
});

it('accepts an array of JSON-LD documents (graph form)', () => {
// JsonLdDocument is `Record<string, unknown> | Record<string, unknown>[]`
// so an array of objects must round-trip through the envelope unchanged.
const docs = [{ '@id': 'urn:a' }, { '@id': 'urn:b' }];

const envelope = wrapJsonLdContent(docs, { private: true }) as { private: unknown };

expect(envelope.private).toBe(docs);
});
});
131 changes: 131 additions & 0 deletions packages/cli/test/daemon-routes-kafka.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
* Unit tests for the Kafka route adapter's privacy-envelope logic.
*
* The route adapter (packages/cli/src/daemon/routes/kafka.ts) is responsible
* for wrapping the bare KA in either `{ private: KA }` or `{ public: KA }`
* before passing it to agent.publish(). The kafka package itself stays agnostic.
*
* These tests invoke handleKafkaRoutes directly with a minimal RequestContext
* mock — no real daemon, no network, no chain. The fakes live in
* test/helpers/route-test-utils.ts and are reused by future route tests.
*/

import { describe, it, expect } from 'vitest';
import { handleKafkaRoutes } from '../src/daemon/routes/kafka.js';
import {
makeFakeRequest,
makeFakeResponse,
makeRequestContext,
} from './helpers/route-test-utils.js';

const KAFKA_ENDPOINT_URL = '/api/kafka/endpoint';

const VALID_BASE_BODY = {
contextGraphId: 'devnet-test',
broker: 'kafka.example.com:9092',
topic: 'orders.created',
messageFormat: 'application/json',
};

describe('Kafka route adapter — privacy envelope', () => {
it('wraps with { private: KA } when private: true is in request body', async () => {
const req = makeFakeRequest({ ...VALID_BASE_BODY, private: true }, { url: KAFKA_ENDPOINT_URL });
const { res, getResult } = makeFakeResponse();
const { ctx, publishCalls } = makeRequestContext(req, res);

await handleKafkaRoutes(ctx);

const { status, body } = getResult();
expect(status).toBe(200);

expect(publishCalls).toHaveLength(1);
const { envelope } = publishCalls[0]!;
expect(envelope).toHaveProperty('private');
expect(envelope).not.toHaveProperty('public');

// Response must echo the resolved private flag
expect((body as Record<string, unknown>).private).toBe(true);
});

it('wraps with { public: KA } when private: false is in request body', async () => {
const req = makeFakeRequest({ ...VALID_BASE_BODY, private: false }, { url: KAFKA_ENDPOINT_URL });
const { res, getResult } = makeFakeResponse();
const { ctx, publishCalls } = makeRequestContext(req, res);

await handleKafkaRoutes(ctx);

const { status, body } = getResult();
expect(status).toBe(200);

expect(publishCalls).toHaveLength(1);
const { envelope } = publishCalls[0]!;
expect(envelope).toHaveProperty('public');
expect(envelope).not.toHaveProperty('private');

// Response must echo the resolved private flag
expect((body as Record<string, unknown>).private).toBe(false);
});

it('defaults to { private: KA } when private field is omitted from request body', async () => {
// No `private` field — route defaults to private: true
const req = makeFakeRequest(VALID_BASE_BODY, { url: KAFKA_ENDPOINT_URL });
const { res, getResult } = makeFakeResponse();
const { ctx, publishCalls } = makeRequestContext(req, res);

await handleKafkaRoutes(ctx);

const { status, body } = getResult();
expect(status).toBe(200);

expect(publishCalls).toHaveLength(1);
const { envelope } = publishCalls[0]!;
expect(envelope).toHaveProperty('private');
expect(envelope).not.toHaveProperty('public');

// Response echoes resolved private flag = true (default)
expect((body as Record<string, unknown>).private).toBe(true);
});

it('returns 400 when contextGraphId is missing', async () => {
const req = makeFakeRequest(
{ broker: 'x:9092', topic: 't', messageFormat: 'application/json' },
{ url: KAFKA_ENDPOINT_URL },
);
const { res, getResult } = makeFakeResponse();
const { ctx } = makeRequestContext(req, res);

await handleKafkaRoutes(ctx);

expect(getResult().status).toBe(400);
});

it('returns 400 when broker is missing', async () => {
const req = makeFakeRequest(
{ contextGraphId: 'devnet-test', topic: 't', messageFormat: 'application/json' },
{ url: KAFKA_ENDPOINT_URL },
);
const { res, getResult } = makeFakeResponse();
const { ctx } = makeRequestContext(req, res);

await handleKafkaRoutes(ctx);

expect(getResult().status).toBe(400);
});

it('returns 400 when "private" is a non-boolean value (e.g. string "false")', async () => {
// The route enforces a strict boolean for `private` to keep the privacy
// contract unambiguous. Truthy/falsy coercion would create an unsafe
// ambiguity at a privacy boundary.
const req = makeFakeRequest({ ...VALID_BASE_BODY, private: 'false' }, { url: KAFKA_ENDPOINT_URL });
const { res, getResult } = makeFakeResponse();
const { ctx, publishCalls } = makeRequestContext(req, res);

await handleKafkaRoutes(ctx);

const { status, body } = getResult();
expect(status).toBe(400);
expect((body as Record<string, unknown>).error).toMatch(/"private" must be a boolean/);
// No publish should have happened
expect(publishCalls).toHaveLength(0);
});
});
Loading
Loading