From 95377185d8b5e052087f359e3e8fa66d6c260c44 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 15:16:09 +0200 Subject: [PATCH 01/13] Add kafka package scaffold and URI builder --- packages/kafka/package.json | 34 +++++++++++++++++++++++ packages/kafka/src/index.ts | 3 ++ packages/kafka/src/uri.ts | 19 +++++++++++++ packages/kafka/test/uri.test.ts | 49 +++++++++++++++++++++++++++++++++ packages/kafka/tsconfig.json | 9 ++++++ packages/kafka/vitest.config.ts | 14 ++++++++++ pnpm-lock.yaml | 13 +++++++++ 7 files changed, 141 insertions(+) create mode 100644 packages/kafka/package.json create mode 100644 packages/kafka/src/index.ts create mode 100644 packages/kafka/src/uri.ts create mode 100644 packages/kafka/test/uri.test.ts create mode 100644 packages/kafka/tsconfig.json create mode 100644 packages/kafka/vitest.config.ts diff --git a/packages/kafka/package.json b/packages/kafka/package.json new file mode 100644 index 000000000..6cadc5b9b --- /dev/null +++ b/packages/kafka/package.json @@ -0,0 +1,34 @@ +{ + "name": "@origintrail-official/dkg-kafka", + "version": "10.0.0-rc.4", + "type": "module", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "scripts": { + "build": "tsc", + "test": "vitest run", + "test:coverage": "vitest run --coverage", + "clean": "rm -rf dist tsconfig.tsbuildinfo" + }, + "dependencies": { + "@origintrail-official/dkg-core": "workspace:*" + }, + "devDependencies": { + "@vitest/coverage-v8": "^4.0.18", + "vitest": "^4.0.18" + }, + "publishConfig": { + "access": "public" + }, + "files": [ + "dist", + "README.md", + "LICENSE" + ], + "license": "Apache-2.0", + "repository": { + "type": "git", + "url": "https://github.com/OriginTrail/dkg-v9.git", + "directory": "packages/kafka" + } +} diff --git a/packages/kafka/src/index.ts b/packages/kafka/src/index.ts new file mode 100644 index 000000000..9e74b9f44 --- /dev/null +++ b/packages/kafka/src/index.ts @@ -0,0 +1,3 @@ +export * from './uri.js'; +export * from './ka-builder.js'; +export * from './endpoint.js'; diff --git a/packages/kafka/src/uri.ts b/packages/kafka/src/uri.ts new file mode 100644 index 000000000..86ad2df53 --- /dev/null +++ b/packages/kafka/src/uri.ts @@ -0,0 +1,19 @@ +import { createHash } from 'node:crypto'; + +export interface KafkaEndpointIdentity { + owner: string; + broker: string; + topic: string; +} + +function hashBrokerAndTopic(broker: string, topic: string): string { + return createHash('sha256') + .update(`${broker.toLowerCase()}|${topic}`) + .digest('hex'); +} + +export function buildKafkaEndpointUri(identity: KafkaEndpointIdentity): string { + const owner = identity.owner.toLowerCase(); + const hash = hashBrokerAndTopic(identity.broker, identity.topic); + return `urn:dkg:kafka-endpoint:${owner}:${hash}`; +} diff --git a/packages/kafka/test/uri.test.ts b/packages/kafka/test/uri.test.ts new file mode 100644 index 000000000..c318df5e5 --- /dev/null +++ b/packages/kafka/test/uri.test.ts @@ -0,0 +1,49 @@ +import { describe, expect, it } from 'vitest'; +import { buildKafkaEndpointUri } from '../src/uri.js'; + +describe('buildKafkaEndpointUri', () => { + it('builds a deterministic URN from owner, broker, and topic', () => { + expect( + buildKafkaEndpointUri({ + owner: '0xAbCDEFabcdefABCDEFabcdefABCDEFabcdefABCD', + broker: 'Kafka.EXAMPLE.com:9092', + topic: 'orders.created', + }), + ).toBe( + 'urn:dkg:kafka-endpoint:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd:' + + '33b58f60595c766739f72b29e4ee417888d1a46af8339a4b5bdb1c3a5692f652', + ); + }); + + it('changes the URN when the topic changes', () => { + const left = buildKafkaEndpointUri({ + owner: '0xabcdefabcdefabcdefabcdefabcdefabcdefabcd', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + }); + + const right = buildKafkaEndpointUri({ + owner: '0xabcdefabcdefabcdefabcdefabcdefabcdefabcd', + broker: 'kafka.example.com:9092', + topic: 'orders.updated', + }); + + expect(left).not.toBe(right); + }); + + it('normalizes broker and owner casing before hashing', () => { + const left = buildKafkaEndpointUri({ + owner: '0xabcdefabcdefabcdefabcdefabcdefabcdefabcd', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + }); + + const right = buildKafkaEndpointUri({ + owner: '0xABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCD', + broker: 'KAFKA.EXAMPLE.COM:9092', + topic: 'orders.created', + }); + + expect(left).toBe(right); + }); +}); diff --git a/packages/kafka/tsconfig.json b/packages/kafka/tsconfig.json new file mode 100644 index 000000000..d231bbc57 --- /dev/null +++ b/packages/kafka/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src", + "composite": true + }, + "include": ["src"] +} diff --git a/packages/kafka/vitest.config.ts b/packages/kafka/vitest.config.ts new file mode 100644 index 000000000..4ec85529c --- /dev/null +++ b/packages/kafka/vitest.config.ts @@ -0,0 +1,14 @@ +import { defineConfig } from 'vitest/config'; +import { criticalityTargets } from '../../vitest.coverage.js'; + +export default defineConfig({ + test: { + include: ['test/**/*.test.ts'], + coverage: { + provider: 'v8', + reporter: ['text', 'html', 'lcov', 'json-summary'], + reportsDirectory: './coverage', + thresholds: criticalityTargets.kosava, + }, + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index cacc8324a..fcd92e2fa 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -484,6 +484,19 @@ importers: specifier: ^8.3.3 version: 8.3.3(web-streams-polyfill@3.3.3) + packages/kafka: + dependencies: + '@origintrail-official/dkg-core': + specifier: workspace:* + version: link:../core + devDependencies: + '@vitest/coverage-v8': + specifier: ^4.0.18 + version: 4.0.18(vitest@4.0.18(@types/node@22.19.11)(happy-dom@20.8.3(bufferutil@4.1.0)(utf-8-validate@5.0.10))(jiti@2.6.1)(tsx@4.21.0)(yaml@2.8.3)) + vitest: + specifier: ^4.0.18 + version: 4.0.18(@types/node@22.19.11)(happy-dom@20.8.3(bufferutil@4.1.0)(utf-8-validate@5.0.10))(jiti@2.6.1)(tsx@4.21.0)(yaml@2.8.3) + packages/mcp-dkg: dependencies: '@modelcontextprotocol/sdk': From c6df0323a5d724cf0a313713d43502add20f6e73 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 15:18:57 +0200 Subject: [PATCH 02/13] Add kafka endpoint KA builder --- packages/kafka/src/ka-builder.ts | 39 +++++++++++++++++++ packages/kafka/test/fixtures/endpoint-ka.json | 26 +++++++++++++ packages/kafka/test/ka-builder.test.ts | 20 ++++++++++ 3 files changed, 85 insertions(+) create mode 100644 packages/kafka/src/ka-builder.ts create mode 100644 packages/kafka/test/fixtures/endpoint-ka.json create mode 100644 packages/kafka/test/ka-builder.test.ts diff --git a/packages/kafka/src/ka-builder.ts b/packages/kafka/src/ka-builder.ts new file mode 100644 index 000000000..42fe40c30 --- /dev/null +++ b/packages/kafka/src/ka-builder.ts @@ -0,0 +1,39 @@ +import { buildKafkaEndpointUri } from './uri.js'; + +const KAFKA_ENDPOINT_CONTEXT = { + dcat: 'http://www.w3.org/ns/dcat#', + dct: 'http://purl.org/dc/terms/', + dkg: 'https://ontology.dkg.io/dkg#', + xsd: 'http://www.w3.org/2001/XMLSchema#', +} as const; + +export interface BuildKafkaEndpointKnowledgeAssetInput { + owner: string; + broker: string; + topic: string; + messageFormat: string; + issuedAt: string; +} + +export function buildKafkaEndpointKnowledgeAsset(input: BuildKafkaEndpointKnowledgeAssetInput) { + const owner = input.owner.toLowerCase(); + + return { + '@context': KAFKA_ENDPOINT_CONTEXT, + '@id': buildKafkaEndpointUri(input), + '@type': ['dkg:KafkaTopicEndpoint', 'dcat:DataService'], + 'dcat:endpointURL': { + '@id': `kafka://${input.broker}/${input.topic}`, + }, + 'dkg:broker': input.broker, + 'dkg:topic': input.topic, + 'dkg:messageFormat': input.messageFormat, + 'dct:publisher': { + '@id': `urn:dkg:agent:${owner}`, + }, + 'dct:issued': { + '@value': input.issuedAt, + '@type': 'xsd:dateTime', + }, + }; +} diff --git a/packages/kafka/test/fixtures/endpoint-ka.json b/packages/kafka/test/fixtures/endpoint-ka.json new file mode 100644 index 000000000..71da18435 --- /dev/null +++ b/packages/kafka/test/fixtures/endpoint-ka.json @@ -0,0 +1,26 @@ +{ + "@context": { + "dcat": "http://www.w3.org/ns/dcat#", + "dct": "http://purl.org/dc/terms/", + "dkg": "https://ontology.dkg.io/dkg#", + "xsd": "http://www.w3.org/2001/XMLSchema#" + }, + "@id": "urn:dkg:kafka-endpoint:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd:33b58f60595c766739f72b29e4ee417888d1a46af8339a4b5bdb1c3a5692f652", + "@type": [ + "dkg:KafkaTopicEndpoint", + "dcat:DataService" + ], + "dcat:endpointURL": { + "@id": "kafka://kafka.example.com:9092/orders.created" + }, + "dkg:broker": "kafka.example.com:9092", + "dkg:topic": "orders.created", + "dkg:messageFormat": "application/json", + "dct:publisher": { + "@id": "urn:dkg:agent:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd" + }, + "dct:issued": { + "@value": "2026-05-04T12:34:56.000Z", + "@type": "xsd:dateTime" + } +} diff --git a/packages/kafka/test/ka-builder.test.ts b/packages/kafka/test/ka-builder.test.ts new file mode 100644 index 000000000..cb39ba5e8 --- /dev/null +++ b/packages/kafka/test/ka-builder.test.ts @@ -0,0 +1,20 @@ +import { readFile } from 'node:fs/promises'; +import { describe, expect, it } from 'vitest'; +import { buildKafkaEndpointKnowledgeAsset } from '../src/ka-builder.js'; + +describe('buildKafkaEndpointKnowledgeAsset', () => { + it('builds the minimum Kafka endpoint KA shape', async () => { + const actual = buildKafkaEndpointKnowledgeAsset({ + owner: '0xAbCDEFabcdefABCDEFabcdefABCDEFabcdefABCD', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + issuedAt: '2026-05-04T12:34:56.000Z', + }); + + const fixtureUrl = new URL('./fixtures/endpoint-ka.json', import.meta.url); + const expected = JSON.parse(await readFile(fixtureUrl, 'utf8')); + + expect(actual).toEqual(expected); + }); +}); From 7c08568aa90c455e921b24ba78278b5bd6a9c767 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 15:21:28 +0200 Subject: [PATCH 03/13] Add kafka endpoint registration orchestration --- packages/kafka/src/endpoint.ts | 42 +++++++++++++ packages/kafka/test/endpoint.register.test.ts | 60 +++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 packages/kafka/src/endpoint.ts create mode 100644 packages/kafka/test/endpoint.register.test.ts diff --git a/packages/kafka/src/endpoint.ts b/packages/kafka/src/endpoint.ts new file mode 100644 index 000000000..e9618df41 --- /dev/null +++ b/packages/kafka/src/endpoint.ts @@ -0,0 +1,42 @@ +import { buildKafkaEndpointKnowledgeAsset } from './ka-builder.js'; +import { buildKafkaEndpointUri } from './uri.js'; + +export interface KafkaEndpointPublisher { + publish(contextGraphId: string, content: unknown): Promise; +} + +export interface RegisterKafkaEndpointInput { + contextGraphId: string; + owner: string; + broker: string; + topic: string; + messageFormat: string; + issuedAt?: string; + publisher: KafkaEndpointPublisher; +} + +export interface RegisterKafkaEndpointResult { + uri: string; + contextGraphId: string; +} + +export async function registerKafkaEndpoint( + input: RegisterKafkaEndpointInput, +): Promise { + const issuedAt = input.issuedAt ?? new Date().toISOString(); + const uri = buildKafkaEndpointUri(input); + const knowledgeAsset = buildKafkaEndpointKnowledgeAsset({ + owner: input.owner, + broker: input.broker, + topic: input.topic, + messageFormat: input.messageFormat, + issuedAt, + }); + + await input.publisher.publish(input.contextGraphId, { public: knowledgeAsset }); + + return { + uri, + contextGraphId: input.contextGraphId, + }; +} diff --git a/packages/kafka/test/endpoint.register.test.ts b/packages/kafka/test/endpoint.register.test.ts new file mode 100644 index 000000000..871d770c3 --- /dev/null +++ b/packages/kafka/test/endpoint.register.test.ts @@ -0,0 +1,60 @@ +import { describe, expect, it } from 'vitest'; +import { registerKafkaEndpoint } from '../src/endpoint.js'; + +describe('registerKafkaEndpoint', () => { + it('publishes the Kafka endpoint KA into the named context graph', async () => { + const calls: Array<{ contextGraphId: string; content: unknown }> = []; + const publisher = { + async publish(contextGraphId: string, content: unknown) { + calls.push({ contextGraphId, content }); + return { ual: 'did:dkg:test/1', kcId: '1', status: 'confirmed' as const }; + }, + }; + + const result = await registerKafkaEndpoint({ + contextGraphId: 'devnet-test', + owner: '0xAbCDEFabcdefABCDEFabcdefABCDEFabcdefABCD', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + issuedAt: '2026-05-04T12:34:56.000Z', + publisher, + }); + + expect(result).toEqual({ + uri: 'urn:dkg:kafka-endpoint:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd:' + + '33b58f60595c766739f72b29e4ee417888d1a46af8339a4b5bdb1c3a5692f652', + contextGraphId: 'devnet-test', + }); + + expect(calls).toHaveLength(1); + expect(calls[0]).toEqual({ + contextGraphId: 'devnet-test', + content: { + public: { + '@context': { + dcat: 'http://www.w3.org/ns/dcat#', + dct: 'http://purl.org/dc/terms/', + dkg: 'https://ontology.dkg.io/dkg#', + xsd: 'http://www.w3.org/2001/XMLSchema#', + }, + '@id': result.uri, + '@type': ['dkg:KafkaTopicEndpoint', 'dcat:DataService'], + 'dcat:endpointURL': { + '@id': 'kafka://kafka.example.com:9092/orders.created', + }, + 'dkg:broker': 'kafka.example.com:9092', + 'dkg:topic': 'orders.created', + 'dkg:messageFormat': 'application/json', + 'dct:publisher': { + '@id': 'urn:dkg:agent:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd', + }, + 'dct:issued': { + '@value': '2026-05-04T12:34:56.000Z', + '@type': 'xsd:dateTime', + }, + }, + }, + }); + }); +}); From 5b9194a25c78fa8e85a3602edcdf2e3981f4bb44 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 15:36:07 +0200 Subject: [PATCH 04/13] Wire kafka endpoint route and CLI --- package.json | 2 +- packages/cli/package.json | 1 + packages/cli/src/api-client.ts | 12 +++ packages/cli/src/cli.ts | 35 +++++++++ packages/cli/src/daemon/handle-request.ts | 4 + packages/cli/src/daemon/routes/kafka.ts | 68 ++++++++++++++++ packages/cli/test/api-client.test.ts | 28 +++++++ packages/cli/test/kafka-cli-smoke.test.ts | 95 +++++++++++++++++++++++ packages/cli/tsconfig.json | 1 + pnpm-lock.yaml | 3 + 10 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 packages/cli/src/daemon/routes/kafka.ts create mode 100644 packages/cli/test/kafka-cli-smoke.test.ts diff --git a/package.json b/package.json index 37a97da22..33bf97724 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ }, "scripts": { "build": "turbo build", - "build:runtime:packages": "pnpm -r --filter @origintrail-official/dkg-core --filter @origintrail-official/dkg-storage --filter @origintrail-official/dkg-query --filter @origintrail-official/dkg-publisher --filter @origintrail-official/dkg-chain --filter @origintrail-official/dkg-epcis --filter @origintrail-official/dkg-random-sampling --filter @origintrail-official/dkg-agent --filter @origintrail-official/dkg-graph-viz --filter @origintrail-official/dkg-node-ui --filter @origintrail-official/dkg-adapter-openclaw --filter @origintrail-official/dkg run build", + "build:runtime:packages": "pnpm -r --filter @origintrail-official/dkg-core --filter @origintrail-official/dkg-storage --filter @origintrail-official/dkg-query --filter @origintrail-official/dkg-publisher --filter @origintrail-official/dkg-chain --filter @origintrail-official/dkg-epcis --filter @origintrail-official/dkg-kafka --filter @origintrail-official/dkg-random-sampling --filter @origintrail-official/dkg-agent --filter @origintrail-official/dkg-graph-viz --filter @origintrail-official/dkg-node-ui --filter @origintrail-official/dkg-adapter-openclaw --filter @origintrail-official/dkg run build", "build:runtime": "pnpm run build:runtime:packages && pnpm --filter @origintrail-official/dkg-node-ui run build:ui", "test": "turbo test", "test:coverage": "turbo test:coverage", diff --git a/packages/cli/package.json b/packages/cli/package.json index 310ce28c1..91bb3b02e 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -24,6 +24,7 @@ "@origintrail-official/dkg-core": "workspace:*", "@origintrail-official/dkg-mcp": "workspace:*", "@origintrail-official/dkg-epcis": "workspace:*", + "@origintrail-official/dkg-kafka": "workspace:*", "@origintrail-official/dkg-node-ui": "workspace:*", "@origintrail-official/dkg-publisher": "workspace:*", "@origintrail-official/dkg-storage": "workspace:*", diff --git a/packages/cli/src/api-client.ts b/packages/cli/src/api-client.ts index 4354f3ac2..d51715b38 100644 --- a/packages/cli/src/api-client.ts +++ b/packages/cli/src/api-client.ts @@ -551,6 +551,18 @@ export class ApiClient { return this.get(`/api/context-graph/${encodeURIComponent(contextGraphId)}/participants`); } + async registerKafkaEndpoint(request: { + contextGraphId: string; + broker: string; + topic: string; + messageFormat: string; + }): Promise<{ + uri: string; + contextGraphId: string; + }> { + return this.post('/api/kafka/endpoint', request); + } + async signJoinRequest(contextGraphId: string): Promise<{ ok: boolean; status?: string; diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index f8b340c2a..0e55ea5de 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -1715,6 +1715,41 @@ assertionCmd } }); +// ─── dkg kafka ────────────────────────────────────────────────────── + +const kafkaCmd = program + .command('kafka') + .description('Kafka metadata registration commands'); + +const kafkaEndpointCmd = kafkaCmd + .command('endpoint') + .description('Kafka topic endpoint operations'); + +kafkaEndpointCmd + .command('register') + .description('Register a Kafka topic endpoint as a knowledge asset in a named context graph') + .requiredOption('--cg ', 'Target context graph') + .requiredOption('--broker ', 'Kafka broker host:port') + .requiredOption('--topic ', 'Kafka topic name') + .option('--format ', 'Kafka message format MIME type', 'application/json') + .action(async (opts: ActionOpts) => { + try { + const client = await ApiClient.connect(); + const result = await client.registerKafkaEndpoint({ + contextGraphId: opts.cg, + broker: opts.broker, + topic: opts.topic, + messageFormat: opts.format, + }); + console.log('Kafka endpoint registered:'); + console.log(` URI: ${result.uri}`); + console.log(` Context graph: ${result.contextGraphId}`); + } catch (err) { + console.error(toErrorMessage(err)); + process.exit(1); + } + }); + // ─── dkg openclaw ─────────────────────────────────────────────────── const openclawCmd = program diff --git a/packages/cli/src/daemon/handle-request.ts b/packages/cli/src/daemon/handle-request.ts index 34ce76ffc..07f07dd3a 100644 --- a/packages/cli/src/daemon/handle-request.ts +++ b/packages/cli/src/daemon/handle-request.ts @@ -330,6 +330,7 @@ import { handleAssertionRoutes } from './routes/assertion.js'; import { handleQueryRoutes } from './routes/query.js'; import { handleLocalAgentsRoutes } from './routes/local-agents.js'; import { handleEpcisRoutes } from './routes/epcis.js'; +import { handleKafkaRoutes } from './routes/kafka.js'; export async function handleRequest( @@ -431,5 +432,8 @@ export async function handleRequest( await handleEpcisRoutes(ctx); if (res.writableEnded) return; + await handleKafkaRoutes(ctx); + if (res.writableEnded) return; + jsonResponse(res, 404, { error: 'Not found' }); } diff --git a/packages/cli/src/daemon/routes/kafka.ts b/packages/cli/src/daemon/routes/kafka.ts new file mode 100644 index 000000000..e1bea21fe --- /dev/null +++ b/packages/cli/src/daemon/routes/kafka.ts @@ -0,0 +1,68 @@ +import { jsonResponse, readBody, validateRequiredContextGraphId } from '../http-utils.js'; +import type { RequestContext } from './context.js'; +import { + registerKafkaEndpoint, + type KafkaEndpointPublisher, +} from '@origintrail-official/dkg-kafka'; + +function isNonEmptyString(value: unknown): value is string { + return typeof value === 'string' && value.trim().length > 0; +} + +export async function handleKafkaRoutes(ctx: RequestContext): Promise { + const { + req, + res, + agent, + path, + requestAgentAddress, + } = ctx; + + if (req.method === 'POST' && path === '/api/kafka/endpoint') { + const body = await readBody(req); + let parsed: unknown; + try { + parsed = JSON.parse(body); + } catch { + return jsonResponse(res, 400, { error: 'Invalid JSON in request body' }); + } + + const { + contextGraphId, + broker, + topic, + messageFormat, + } = parsed as Record; + + if (!validateRequiredContextGraphId(contextGraphId, res)) { + return; + } + const targetContextGraphId = contextGraphId as string; + if (!isNonEmptyString(broker)) { + return jsonResponse(res, 400, { error: '"broker" must be a non-empty string' }); + } + if (!isNonEmptyString(topic)) { + return jsonResponse(res, 400, { error: '"topic" must be a non-empty string' }); + } + if (!isNonEmptyString(messageFormat)) { + return jsonResponse(res, 400, { error: '"messageFormat" must be a non-empty string' }); + } + + const publisher: KafkaEndpointPublisher = { + async publish(targetContextGraphId, content) { + await agent.publish(targetContextGraphId, content as Record); + }, + }; + + const result = await registerKafkaEndpoint({ + contextGraphId: targetContextGraphId, + owner: requestAgentAddress.toLowerCase(), + broker, + topic, + messageFormat, + publisher, + }); + + return jsonResponse(res, 200, result); + } +} diff --git a/packages/cli/test/api-client.test.ts b/packages/cli/test/api-client.test.ts index 040164a74..7a6cfff20 100644 --- a/packages/cli/test/api-client.test.ts +++ b/packages/cli/test/api-client.test.ts @@ -194,6 +194,34 @@ describe('ApiClient', () => { expect(body.name).toBe('incident'); }); + it('registerKafkaEndpoint() posts the endpoint payload', async () => { + const { fetch, calls } = createTrackingFetch({ + ok: true, + status: 200, + body: { + uri: 'urn:dkg:kafka-endpoint:0xabc:hash', + contextGraphId: 'devnet-test', + }, + }); + globalThis.fetch = fetch; + + await client.registerKafkaEndpoint({ + contextGraphId: 'devnet-test', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + }); + + expect(calls[0].url).toBe(`http://127.0.0.1:${PORT}/api/kafka/endpoint`); + const body = JSON.parse(calls[0].opts.body as string); + expect(body).toEqual({ + contextGraphId: 'devnet-test', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + }); + }); + it('approveCclPolicy() posts approval payload', async () => { const { fetch, calls } = createTrackingFetch({ ok: true, status: 200, body: { policyUri: 'urn:policy', bindingUri: 'urn:binding', approvedAt: 'now' } }); globalThis.fetch = fetch; diff --git a/packages/cli/test/kafka-cli-smoke.test.ts b/packages/cli/test/kafka-cli-smoke.test.ts new file mode 100644 index 000000000..f80ec60b2 --- /dev/null +++ b/packages/cli/test/kafka-cli-smoke.test.ts @@ -0,0 +1,95 @@ +import { beforeAll, afterAll, describe, expect, it } from 'vitest'; +import { createServer } from 'node:http'; +import { execFile } from 'node:child_process'; +import { promisify } from 'node:util'; +import { mkdtemp, writeFile, rm } from 'node:fs/promises'; +import { existsSync } from 'node:fs'; +import { join, dirname } from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { tmpdir } from 'node:os'; + +const execFileAsync = promisify(execFile); +const __dirname = dirname(fileURLToPath(import.meta.url)); +const CLI_ENTRY = join(__dirname, '..', 'dist', 'cli.js'); + +describe.sequential('kafka CLI smoke', () => { + let dkgHome: string; + let server: ReturnType; + let smokeApiPort: string; + let lastBody = ''; + let lastAuthHeader = ''; + + beforeAll(async () => { + dkgHome = await mkdtemp(join(tmpdir(), 'dkg-kafka-cli-')); + if (!existsSync(CLI_ENTRY)) { + await execFileAsync('pnpm', ['build'], { cwd: join(__dirname, '..') }); + } + if (!existsSync(CLI_ENTRY)) { + throw new Error(`CLI entry not found after build: ${CLI_ENTRY}`); + } + + await writeFile(join(dkgHome, 'auth.token'), 'smoke-token\n'); + + server = createServer(async (req, res) => { + if (req.method === 'POST' && req.url === '/api/kafka/endpoint') { + lastAuthHeader = String(req.headers.authorization ?? ''); + const chunks: Buffer[] = []; + for await (const chunk of req) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + lastBody = Buffer.concat(chunks).toString('utf8'); + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ + uri: 'urn:dkg:kafka-endpoint:0xabc:hash', + contextGraphId: 'devnet-test', + })); + return; + } + + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Not found' })); + }); + + await new Promise((resolve, reject) => { + server.once('error', reject); + server.listen(0, '127.0.0.1', () => { + const addr = server.address(); + smokeApiPort = typeof addr === 'object' && addr ? String(addr.port) : '0'; + resolve(); + }); + }); + }); + + afterAll(async () => { + await new Promise((resolve) => server.close(() => resolve())); + await rm(dkgHome, { recursive: true, force: true }); + }); + + it('registers a Kafka endpoint through the CLI', async () => { + const env = { ...process.env, DKG_HOME: dkgHome, DKG_API_PORT: smokeApiPort }; + + const result = await execFileAsync('node', [ + CLI_ENTRY, + 'kafka', + 'endpoint', + 'register', + '--cg', + 'devnet-test', + '--broker', + 'kafka.example.com:9092', + '--topic', + 'orders.created', + ], { env }); + + expect(result.stdout).toContain('Kafka endpoint registered:'); + expect(result.stdout).toContain('urn:dkg:kafka-endpoint:0xabc:hash'); + expect(result.stdout).toContain('devnet-test'); + expect(lastAuthHeader).toBe('Bearer smoke-token'); + expect(JSON.parse(lastBody)).toEqual({ + contextGraphId: 'devnet-test', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + }); + }, 15000); +}); diff --git a/packages/cli/tsconfig.json b/packages/cli/tsconfig.json index e1baaa902..a43344a6c 100644 --- a/packages/cli/tsconfig.json +++ b/packages/cli/tsconfig.json @@ -10,6 +10,7 @@ { "path": "../core" }, { "path": "../agent" }, { "path": "../epcis" }, + { "path": "../kafka" }, { "path": "../node-ui" }, { "path": "../adapter-openclaw" }, { "path": "../mcp-dkg" } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index fcd92e2fa..1351b7a4e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -225,6 +225,9 @@ importers: '@origintrail-official/dkg-epcis': specifier: workspace:* version: link:../epcis + '@origintrail-official/dkg-kafka': + specifier: workspace:* + version: link:../kafka '@origintrail-official/dkg-mcp': specifier: workspace:* version: link:../mcp-dkg From 3db0fb183a55e90510b9f535d04f0cbf4e6646d0 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 15:58:25 +0200 Subject: [PATCH 05/13] Add kafka walking skeleton e2e test --- .../kafka/test/e2e/walking-skeleton.test.ts | 180 ++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 packages/kafka/test/e2e/walking-skeleton.test.ts diff --git a/packages/kafka/test/e2e/walking-skeleton.test.ts b/packages/kafka/test/e2e/walking-skeleton.test.ts new file mode 100644 index 000000000..4e63ad18e --- /dev/null +++ b/packages/kafka/test/e2e/walking-skeleton.test.ts @@ -0,0 +1,180 @@ +import { access, readFile } from 'node:fs/promises'; +import { execFile } from 'node:child_process'; +import { constants } from 'node:fs'; +import { dirname, join } from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { promisify } from 'node:util'; +import { beforeAll, beforeEach, describe, expect, it } from 'vitest'; +import { ApiClient } from '../../../cli/src/api-client.js'; +import { buildKafkaEndpointUri } from '../../src/uri.js'; + +const execFileAsync = promisify(execFile); + +const __dirname = dirname(fileURLToPath(import.meta.url)); +const REPO_ROOT = join(__dirname, '..', '..', '..', '..'); +const CLI_ENTRY = join(REPO_ROOT, 'packages', 'cli', 'dist', 'cli.js'); +const DEVNET_NODE1_HOME = + process.env.DKG_KAFKA_DEVNET_HOME ?? '/Users/zvonimir/projects/dkg/.devnet/node1'; +const RUN_E2E = + process.env.DKG_KAFKA_E2E === '1' || process.env.DKG_KAFKA_E2E === 'true'; +const CONTEXT_GRAPH_ID = 'devnet-test'; + +function parseTokenFile(raw: string): string { + return raw + .split('\n') + .map((line) => line.trim()) + .find((line) => line.length > 0 && !line.startsWith('#')) ?? ''; +} + +function stripQuotedLiteral(value: string): string { + const typed = value.match(/^"(.*)"(?:\^\^<.*>)?$/s); + return typed ? typed[1] : value; +} + +function stripIriDelimiters(value: string): string { + if (value.startsWith('<') && value.endsWith('>')) { + return value.slice(1, -1); + } + return value; +} + +function agentAddressFromUri(agentUri: string): string { + if (agentUri.startsWith('urn:dkg:agent:')) { + return agentUri.slice('urn:dkg:agent:'.length); + } + if (agentUri.startsWith('did:dkg:agent:')) { + return agentUri.slice('did:dkg:agent:'.length); + } + throw new Error(`Unsupported agent URI: ${agentUri}`); +} + +async function waitForEndpointRow( + client: ApiClient, + contextGraphId: string, + uri: string, +): Promise> { + const sparql = ` + PREFIX dcat: + PREFIX dct: + PREFIX dkg: + SELECT ?broker ?topic ?messageFormat ?publisher ?endpointUrl ?issued + WHERE { + BIND(<${uri}> AS ?endpoint) + ?endpoint a dkg:KafkaTopicEndpoint, dcat:DataService ; + dkg:broker ?broker ; + dkg:topic ?topic ; + dkg:messageFormat ?messageFormat ; + dct:publisher ?publisher ; + dct:issued ?issued ; + dcat:endpointURL ?endpointUrl . + } + `; + + const deadline = Date.now() + 20_000; + while (Date.now() < deadline) { + const response = await client.query(sparql, contextGraphId); + if (response.result.type === 'bindings' && response.result.bindings.length > 0) { + return response.result.bindings[0]!; + } + await new Promise((resolve) => setTimeout(resolve, 1_000)); + } + + throw new Error(`Kafka endpoint ${uri} was not queryable in ${contextGraphId} within 20s`); +} + +describe('kafka walking skeleton e2e', () => { + let devnetReachable = false; + let port = 0; + let token = ''; + let client: ApiClient; + let owner = ''; + + beforeAll(async () => { + if (!RUN_E2E) return; + + try { + await access(CLI_ENTRY, constants.F_OK); + } catch { + await execFileAsync('pnpm', ['--dir', 'packages/cli', 'build'], { + cwd: REPO_ROOT, + }); + } + + const [tokenRaw, portRaw] = await Promise.all([ + readFile(join(DEVNET_NODE1_HOME, 'auth.token'), 'utf8'), + readFile(join(DEVNET_NODE1_HOME, 'api.port'), 'utf8'), + ]); + + token = parseTokenFile(tokenRaw); + port = parseInt(portRaw.trim(), 10); + client = new ApiClient(port, token); + + try { + const status = await client.status(); + devnetReachable = typeof status.peerId === 'string' && status.peerId.length > 0; + } catch { + devnetReachable = false; + } + + if (!devnetReachable) return; + + const agents = await client.agents(); + const selfAgent = agents.agents.find((agent) => (agent as any).connectionStatus === 'self'); + const agentUri = selfAgent?.agentUri; + const agentAddress = (selfAgent as any)?.agentAddress; + if (!agentUri || (!agentAddress && !agentUri.startsWith('urn:dkg:agent:') && !agentUri.startsWith('did:dkg:agent:'))) { + throw new Error(`Could not resolve publishing agent identity from /api/agents`); + } + owner = String(agentAddress ?? agentAddressFromUri(agentUri)).toLowerCase(); + }, 120_000); + + beforeEach(({ skip }) => { + if (!RUN_E2E || !devnetReachable) skip(); + }); + + it('registers a Kafka endpoint into the named context graph and discovers it via SPARQL', async () => { + const broker = 'kafka.e2e.local:9092'; + const topic = `walking-skeleton.${Date.now()}`; + const messageFormat = 'application/cloudevents+json'; + const expectedUri = buildKafkaEndpointUri({ owner, broker, topic }); + + const result = await execFileAsync( + 'node', + [ + CLI_ENTRY, + 'kafka', + 'endpoint', + 'register', + '--cg', + CONTEXT_GRAPH_ID, + '--broker', + broker, + '--topic', + topic, + '--format', + messageFormat, + ], + { + cwd: REPO_ROOT, + env: { + ...process.env, + DKG_HOME: DEVNET_NODE1_HOME, + DKG_API_PORT: String(port), + }, + }, + ); + + expect(result.stdout).toContain('Kafka endpoint registered:'); + expect(result.stdout).toContain(expectedUri); + expect(result.stdout).toContain(CONTEXT_GRAPH_ID); + + const row = await waitForEndpointRow(client, CONTEXT_GRAPH_ID, expectedUri); + + expect(stripQuotedLiteral(row.broker ?? '')).toBe(broker); + expect(stripQuotedLiteral(row.topic ?? '')).toBe(topic); + expect(stripQuotedLiteral(row.messageFormat ?? '')).toBe(messageFormat); + expect(stripIriDelimiters(row.publisher ?? '')).toBe(`urn:dkg:agent:${owner}`); + expect(stripIriDelimiters(row.endpointUrl ?? '')).toBe(`kafka://${broker}/${topic}`); + expect(Number.isNaN(Date.parse(stripQuotedLiteral(row.issued ?? '')))).toBe(false); + }, 90_000); +}); From 3c09ed460d6da6de21d5934a1f42255329b2bfd3 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 16:15:53 +0200 Subject: [PATCH 06/13] fix(kafka): default e2e devnet home to repo-root path Replace the developer-machine absolute path fallback with one derived from REPO_ROOT (already computed in the same file) so the e2e test is portable across machines and CI. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/kafka/test/e2e/walking-skeleton.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kafka/test/e2e/walking-skeleton.test.ts b/packages/kafka/test/e2e/walking-skeleton.test.ts index 4e63ad18e..756b3eb10 100644 --- a/packages/kafka/test/e2e/walking-skeleton.test.ts +++ b/packages/kafka/test/e2e/walking-skeleton.test.ts @@ -14,7 +14,7 @@ const __dirname = dirname(fileURLToPath(import.meta.url)); const REPO_ROOT = join(__dirname, '..', '..', '..', '..'); const CLI_ENTRY = join(REPO_ROOT, 'packages', 'cli', 'dist', 'cli.js'); const DEVNET_NODE1_HOME = - process.env.DKG_KAFKA_DEVNET_HOME ?? '/Users/zvonimir/projects/dkg/.devnet/node1'; + process.env.DKG_KAFKA_DEVNET_HOME ?? join(REPO_ROOT, '.devnet', 'node1'); const RUN_E2E = process.env.DKG_KAFKA_E2E === '1' || process.env.DKG_KAFKA_E2E === 'true'; const CONTEXT_GRAPH_ID = 'devnet-test'; From 8203b8663ef5e7d1a5e255d65f80f5b26c57d3fc Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 16:16:31 +0200 Subject: [PATCH 07/13] fix(kafka): drop unused dkg-core dependency Nothing in packages/kafka/src or tests imports from @origintrail-official/dkg-core; the dependency was carried over but never used. Removing it shrinks the install graph and clarifies the package's actual coupling. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/kafka/package.json | 3 --- pnpm-lock.yaml | 4 ---- 2 files changed, 7 deletions(-) diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 6cadc5b9b..abd8eca4e 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -10,9 +10,6 @@ "test:coverage": "vitest run --coverage", "clean": "rm -rf dist tsconfig.tsbuildinfo" }, - "dependencies": { - "@origintrail-official/dkg-core": "workspace:*" - }, "devDependencies": { "@vitest/coverage-v8": "^4.0.18", "vitest": "^4.0.18" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1351b7a4e..1ec1298b4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -488,10 +488,6 @@ importers: version: 8.3.3(web-streams-polyfill@3.3.3) packages/kafka: - dependencies: - '@origintrail-official/dkg-core': - specifier: workspace:* - version: link:../core devDependencies: '@vitest/coverage-v8': specifier: ^4.0.18 From fbd0ad1a12ceb40d0a09b3361406c98d1df37869 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 16:18:36 +0200 Subject: [PATCH 08/13] refactor(kafka): move publish envelope wrapping to route adapter The kafka package now hands a bare KA across the KafkaEndpointPublisher boundary; the {public: } envelope expected by agent.publish is applied by the route-handler adapter in packages/cli. This mirrors the EpcisPublisher pattern and keeps the kafka package agnostic of the agent.publish payload shape, which sets the contract for slices 02-07. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/cli/src/daemon/routes/kafka.ts | 5 ++- packages/kafka/src/endpoint.ts | 14 ++++++- packages/kafka/test/endpoint.register.test.ts | 42 +++++++++---------- 3 files changed, 36 insertions(+), 25 deletions(-) diff --git a/packages/cli/src/daemon/routes/kafka.ts b/packages/cli/src/daemon/routes/kafka.ts index e1bea21fe..c691ac664 100644 --- a/packages/cli/src/daemon/routes/kafka.ts +++ b/packages/cli/src/daemon/routes/kafka.ts @@ -50,7 +50,10 @@ export async function handleKafkaRoutes(ctx: RequestContext): Promise { const publisher: KafkaEndpointPublisher = { async publish(targetContextGraphId, content) { - await agent.publish(targetContextGraphId, content as Record); + await agent.publish( + targetContextGraphId, + { public: content } as Record, + ); }, }; diff --git a/packages/kafka/src/endpoint.ts b/packages/kafka/src/endpoint.ts index e9618df41..22888c8d2 100644 --- a/packages/kafka/src/endpoint.ts +++ b/packages/kafka/src/endpoint.ts @@ -1,8 +1,18 @@ import { buildKafkaEndpointKnowledgeAsset } from './ka-builder.js'; import { buildKafkaEndpointUri } from './uri.js'; +/** + * Dependency-inversion boundary: the kafka package needs something that can + * publish a JSON-LD knowledge asset. The package hands the bare KA across this + * interface; envelope wrapping (e.g. `{ public: ... }`) belongs to the caller. + */ +export type KafkaEndpointKnowledgeAsset = ReturnType; + export interface KafkaEndpointPublisher { - publish(contextGraphId: string, content: unknown): Promise; + publish( + contextGraphId: string, + knowledgeAsset: KafkaEndpointKnowledgeAsset, + ): Promise; } export interface RegisterKafkaEndpointInput { @@ -33,7 +43,7 @@ export async function registerKafkaEndpoint( issuedAt, }); - await input.publisher.publish(input.contextGraphId, { public: knowledgeAsset }); + await input.publisher.publish(input.contextGraphId, knowledgeAsset); return { uri, diff --git a/packages/kafka/test/endpoint.register.test.ts b/packages/kafka/test/endpoint.register.test.ts index 871d770c3..e22f06fcb 100644 --- a/packages/kafka/test/endpoint.register.test.ts +++ b/packages/kafka/test/endpoint.register.test.ts @@ -31,28 +31,26 @@ describe('registerKafkaEndpoint', () => { expect(calls[0]).toEqual({ contextGraphId: 'devnet-test', content: { - public: { - '@context': { - dcat: 'http://www.w3.org/ns/dcat#', - dct: 'http://purl.org/dc/terms/', - dkg: 'https://ontology.dkg.io/dkg#', - xsd: 'http://www.w3.org/2001/XMLSchema#', - }, - '@id': result.uri, - '@type': ['dkg:KafkaTopicEndpoint', 'dcat:DataService'], - 'dcat:endpointURL': { - '@id': 'kafka://kafka.example.com:9092/orders.created', - }, - 'dkg:broker': 'kafka.example.com:9092', - 'dkg:topic': 'orders.created', - 'dkg:messageFormat': 'application/json', - 'dct:publisher': { - '@id': 'urn:dkg:agent:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd', - }, - 'dct:issued': { - '@value': '2026-05-04T12:34:56.000Z', - '@type': 'xsd:dateTime', - }, + '@context': { + dcat: 'http://www.w3.org/ns/dcat#', + dct: 'http://purl.org/dc/terms/', + dkg: 'https://ontology.dkg.io/dkg#', + xsd: 'http://www.w3.org/2001/XMLSchema#', + }, + '@id': result.uri, + '@type': ['dkg:KafkaTopicEndpoint', 'dcat:DataService'], + 'dcat:endpointURL': { + '@id': 'kafka://kafka.example.com:9092/orders.created', + }, + 'dkg:broker': 'kafka.example.com:9092', + 'dkg:topic': 'orders.created', + 'dkg:messageFormat': 'application/json', + 'dct:publisher': { + '@id': 'urn:dkg:agent:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd', + }, + 'dct:issued': { + '@value': '2026-05-04T12:34:56.000Z', + '@type': 'xsd:dateTime', }, }, }); From cef06678d302fa68399120f3ba42b5cd4f73d983 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 16:19:03 +0200 Subject: [PATCH 09/13] refactor(kafka): rename shadowed closure param in route handler The publisher closure's first parameter was previously named targetContextGraphId, shadowing the outer targetContextGraphId variable defined in the same handler. Renaming the closure parameter to cgId removes the shadow and makes the two scopes obviously distinct. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/cli/src/daemon/routes/kafka.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/cli/src/daemon/routes/kafka.ts b/packages/cli/src/daemon/routes/kafka.ts index c691ac664..550f6f05e 100644 --- a/packages/cli/src/daemon/routes/kafka.ts +++ b/packages/cli/src/daemon/routes/kafka.ts @@ -49,9 +49,9 @@ export async function handleKafkaRoutes(ctx: RequestContext): Promise { } const publisher: KafkaEndpointPublisher = { - async publish(targetContextGraphId, content) { + async publish(cgId, content) { await agent.publish( - targetContextGraphId, + cgId, { public: content } as Record, ); }, From 4ca8a07290705681c7beeac89e393eaba40b5fe0 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 16:19:54 +0200 Subject: [PATCH 10/13] test(kafka): add per-package coverage ratchet Replace the generic criticalityTargets.kosava floor (60% lines/funcs/ statements, 50% branches) with a kafka-specific export pinned to the package's measured coverage (100% lines/funcs/statements, 50% branches). Without this, untested code can be added freely while CI still passes. Mirror the kosavaEpcisCoverage pattern, including the bare '../../vitest.coverage' import path used by EPCIS. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/kafka/vitest.config.ts | 4 ++-- vitest.coverage.ts | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/packages/kafka/vitest.config.ts b/packages/kafka/vitest.config.ts index 4ec85529c..2b68f1e59 100644 --- a/packages/kafka/vitest.config.ts +++ b/packages/kafka/vitest.config.ts @@ -1,5 +1,5 @@ import { defineConfig } from 'vitest/config'; -import { criticalityTargets } from '../../vitest.coverage.js'; +import { kosavaKafkaCoverage } from '../../vitest.coverage'; export default defineConfig({ test: { @@ -8,7 +8,7 @@ export default defineConfig({ provider: 'v8', reporter: ['text', 'html', 'lcov', 'json-summary'], reportsDirectory: './coverage', - thresholds: criticalityTargets.kosava, + thresholds: kosavaKafkaCoverage, }, }, }); diff --git a/vitest.coverage.ts b/vitest.coverage.ts index 9dc0f59f0..7123b7e22 100644 --- a/vitest.coverage.ts +++ b/vitest.coverage.ts @@ -159,6 +159,13 @@ export const kosavaEpcisCoverage: CoverageThresholds = { statements: 97, }; +export const kosavaKafkaCoverage: CoverageThresholds = { + lines: 100, + functions: 100, + branches: 50, + statements: 100, +}; + /** * @deprecated Import a tier-specific export (e.g. `kosavaNodeUiCoverage`). * Kept for any external tooling that still references the old name. From a61fc138bea7c8a5bf18e38a55315ac7e9d7488f Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 16:36:02 +0200 Subject: [PATCH 11/13] fix(devnet): escape backticks in node -e JS comments Five backticks inside JS comments at lines 742, 868, 881, 898, 899 were not escaped, so bash interpreted them as command substitution inside the surrounding `node -e "..."` double-quoted string. On every `devnet start` this produced noisy errors like: scripts/devnet.sh: line 694: continue: only meaningful in a `for'... scripts/devnet.sh: line 694: nonce++: command not found scripts/devnet.sh: line 694: syntax error near unexpected token `idId' The text was inside JS `//` comments so the staking script still ran, but the diagnostics drowned out real failures (e.g. the stale-bytecode revert this commit's sibling fixes). Lines 730-733 and 768 already use `\`` correctly; these five were oversights. Co-Authored-By: Claude Opus 4.7 (1M context) --- scripts/devnet.sh | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scripts/devnet.sh b/scripts/devnet.sh index c89acfed7..a1576c203 100755 --- a/scripts/devnet.sh +++ b/scripts/devnet.sh @@ -739,7 +739,7 @@ cmd_start() { const opSigners = new Array(n).fill(null); const nodeRoles = new Array(n).fill('edge'); // Codex round 4 on PR #368: read config.json FIRST and INDEPENDENTLY - // of wallets.json. The previous loop did wallets first and `continue`d + // of wallets.json. The previous loop did wallets first and \`continue\`d // on parse failure BEFORE reading config, so an intended core whose // wallets.json was malformed silently kept the 'edge' default, // dropped out of coreIdxs, and the lostCores guard never fired. @@ -865,7 +865,7 @@ cmd_start() { // this code path exists to prevent. // // Codex round 2 on PR #368: re-read 'pending' before EVERY tx - // (not once per node loop). The previous code did `nonce++` at + // (not once per node loop). The previous code did \`nonce++\` at // call site so a failed approve advanced the local counter even // though the chain nonce did not, leaving createConviction + // updateAsk to send with an inflated nonce that would either @@ -878,7 +878,7 @@ cmd_start() { return tx.wait(); }; // Codex round 4 on PR #368: skip createConviction if the daemon - // already opened a position. PR 366 wired `EVMChainAdapter.ensureProfile()` + // already opened a position. PR 366 wired \`EVMChainAdapter.ensureProfile()\` // to open a default 50k V10 conviction during agent startup, so by // the time this script reaches the staking loop the daemon has // (usually) already staked. Running createConviction again would @@ -895,8 +895,8 @@ cmd_start() { // updateAsk is INDEPENDENT of stake state, so we still attempt // it below regardless of the probe outcome. // - // Codex round 7 on PR #368: probe `getNodeStakeV10(idId)` instead - // of NFT `balanceOf(opSigner.address)`. The op wallet could hold + // Codex round 7 on PR #368: probe \`getNodeStakeV10(idId)\` instead + // of NFT \`balanceOf(opSigner.address)\`. The op wallet could hold // positions for OTHER identities (not in our devnet today, but // a wallet-scoped probe is the wrong semantic check); we want // to know whether THIS identity is staked. From d5e3e5a7f96aed201f2886f0afc7bda6bef15d17 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 16:43:53 +0200 Subject: [PATCH 12/13] fix(devnet): redeploy contracts when artifacts outpace running chain When hardhat is left running across edits to .sol sources, the on-chain bytecode lags the recompiled artifacts on disk. The next `devnet start` short-circuits in `start_hardhat` (alive PID), then `deploy_contracts` skips on the still-present `.devnet/hardhat/deployed` marker, so daemons connect to addresses whose code predates the latest source. View methods added since that deploy revert with "function selector not recognized"; ethers surfaces this as `require(false)` (no return data, no fallback). On a 6-node devnet this manifests as `0/4 core node(s) staked` because the staking loop's `CSS.getNodeStakeV10` probe fails for every node. Detect the mismatch by comparing artifact mtimes to the marker. If any contract artifact JSON is newer than the marker, kill the running hardhat so the existing fresh-start path (which clears the marker plus `localhost_contracts.json` + `hardhat_contracts.json`) runs and `deploy_contracts` re-deploys onto a fresh chain. No-op when artifacts are unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- scripts/devnet.sh | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/scripts/devnet.sh b/scripts/devnet.sh index a1576c203..2d1c24654 100755 --- a/scripts/devnet.sh +++ b/scripts/devnet.sh @@ -101,10 +101,36 @@ ensure_built() { start_hardhat() { local pidfile="$DEVNET_DIR/hardhat.pid" + local marker="$DEVNET_DIR/hardhat/deployed" + local artifacts_dir="$REPO_ROOT/packages/evm-module/artifacts/contracts" if [ -f "$pidfile" ] && kill -0 "$(cat "$pidfile")" 2>/dev/null; then - log "Hardhat node already running (PID $(cat "$pidfile"))" - return 0 + # Stale-bytecode guard. When contracts are recompiled while hardhat + # keeps running, the on-chain bytecode lags the artifacts on disk. + # The next `devnet start` short-circuits hardhat reuse here, then + # `deploy_contracts` skips on the still-present marker, so daemons + # connect to addresses whose code is older than the source. View + # methods added since the live deploy revert with "function selector + # not recognized" (e.g. CSS.getNodeStakeV10 in the staking loop). + # Detect the mismatch by comparing artifact mtimes to the marker; + # if any artifact is newer, kill hardhat so the fresh-start path + # below clears the marker + deployments and `deploy_contracts` + # re-deploys onto a fresh chain. + if [ -f "$marker" ] && [ -d "$artifacts_dir" ] \ + && [ -n "$(find "$artifacts_dir" -name '*.json' -newer "$marker" -print -quit 2>/dev/null)" ]; then + local stale_pid + stale_pid=$(cat "$pidfile") + log "Hardhat (PID $stale_pid) holds outdated contracts (artifacts newer than deploy marker) — restarting for fresh deploy" + kill "$stale_pid" 2>/dev/null || true + for _ in 1 2 3 4 5; do + kill -0 "$stale_pid" 2>/dev/null || break + sleep 1 + done + rm -f "$pidfile" + else + log "Hardhat node already running (PID $(cat "$pidfile"))" + return 0 + fi fi log "Starting Hardhat node on port $HARDHAT_PORT..." From 348ffd192a4c448b074f73093652cff351fbab1e Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Mon, 4 May 2026 16:50:09 +0200 Subject: [PATCH 13/13] fix(kafka): make e2e SPARQL match the daemon's named-graph store MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two issues kept the live e2e from passing: 1. The endpoint-row SPARQL queried the default graph, but the daemon stores published triples in named per-CG graphs (see packages/cli/src/daemon/routes/query.ts — every cg-scoped read in the codebase wraps its WHERE in `GRAPH ?g { ... }`). Without the wrapper the BGP returned zero bindings even though the data was present, so the 20s wait always timed out. 2. The completion check gated on `response.result.type === 'bindings'`, but `/api/query` returns `{ result: { bindings: [...] } }` with no `type` discriminator on this path — `QueryResult`'s optional `type` is only set on a couple of legacy callers. The check was always false, so even when the SPARQL did match (after fix #1) the loop still spun out. Both surfaced together because the fresh devnet harness was finally healthy enough to run the slice end-to-end (PR #383 unblocked the chain side). Verified locally: `DKG_KAFKA_E2E=1 pnpm --filter @origintrail-official/dkg-kafka exec vitest run test/e2e/walking-skeleton.test.ts` now passes in <1s. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../kafka/test/e2e/walking-skeleton.test.ts | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/packages/kafka/test/e2e/walking-skeleton.test.ts b/packages/kafka/test/e2e/walking-skeleton.test.ts index 756b3eb10..9cca516be 100644 --- a/packages/kafka/test/e2e/walking-skeleton.test.ts +++ b/packages/kafka/test/e2e/walking-skeleton.test.ts @@ -53,28 +53,37 @@ async function waitForEndpointRow( contextGraphId: string, uri: string, ): Promise> { + // Endpoint triples land in a named graph (one per CG), so the WHERE + // pattern must be wrapped in GRAPH ?g. Default-graph patterns return + // empty bindings against this store — see daemon/routes/query.ts. const sparql = ` PREFIX dcat: PREFIX dct: PREFIX dkg: SELECT ?broker ?topic ?messageFormat ?publisher ?endpointUrl ?issued WHERE { - BIND(<${uri}> AS ?endpoint) - ?endpoint a dkg:KafkaTopicEndpoint, dcat:DataService ; - dkg:broker ?broker ; - dkg:topic ?topic ; - dkg:messageFormat ?messageFormat ; - dct:publisher ?publisher ; - dct:issued ?issued ; - dcat:endpointURL ?endpointUrl . + GRAPH ?g { + BIND(<${uri}> AS ?endpoint) + ?endpoint a dkg:KafkaTopicEndpoint, dcat:DataService ; + dkg:broker ?broker ; + dkg:topic ?topic ; + dkg:messageFormat ?messageFormat ; + dct:publisher ?publisher ; + dct:issued ?issued ; + dcat:endpointURL ?endpointUrl . + } } `; const deadline = Date.now() + 20_000; while (Date.now() < deadline) { const response = await client.query(sparql, contextGraphId); - if (response.result.type === 'bindings' && response.result.bindings.length > 0) { - return response.result.bindings[0]!; + // Daemon's /api/query response is { result: { bindings: [...] } } — the + // optional `type` discriminator on QueryResult is only set on a couple + // of legacy paths and is absent here, so we key off `bindings` directly. + const bindings = (response.result as { bindings?: Array> }).bindings; + if (Array.isArray(bindings) && bindings.length > 0) { + return bindings[0]!; } await new Promise((resolve) => setTimeout(resolve, 1_000)); }