diff --git a/package.json b/package.json index 37a97da22..33bf97724 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ }, "scripts": { "build": "turbo build", - "build:runtime:packages": "pnpm -r --filter @origintrail-official/dkg-core --filter @origintrail-official/dkg-storage --filter @origintrail-official/dkg-query --filter @origintrail-official/dkg-publisher --filter @origintrail-official/dkg-chain --filter @origintrail-official/dkg-epcis --filter @origintrail-official/dkg-random-sampling --filter @origintrail-official/dkg-agent --filter @origintrail-official/dkg-graph-viz --filter @origintrail-official/dkg-node-ui --filter @origintrail-official/dkg-adapter-openclaw --filter @origintrail-official/dkg run build", + "build:runtime:packages": "pnpm -r --filter @origintrail-official/dkg-core --filter @origintrail-official/dkg-storage --filter @origintrail-official/dkg-query --filter @origintrail-official/dkg-publisher --filter @origintrail-official/dkg-chain --filter @origintrail-official/dkg-epcis --filter @origintrail-official/dkg-kafka --filter @origintrail-official/dkg-random-sampling --filter @origintrail-official/dkg-agent --filter @origintrail-official/dkg-graph-viz --filter @origintrail-official/dkg-node-ui --filter @origintrail-official/dkg-adapter-openclaw --filter @origintrail-official/dkg run build", "build:runtime": "pnpm run build:runtime:packages && pnpm --filter @origintrail-official/dkg-node-ui run build:ui", "test": "turbo test", "test:coverage": "turbo test:coverage", diff --git a/packages/cli/package.json b/packages/cli/package.json index 310ce28c1..91bb3b02e 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -24,6 +24,7 @@ "@origintrail-official/dkg-core": "workspace:*", "@origintrail-official/dkg-mcp": "workspace:*", "@origintrail-official/dkg-epcis": "workspace:*", + "@origintrail-official/dkg-kafka": "workspace:*", "@origintrail-official/dkg-node-ui": "workspace:*", "@origintrail-official/dkg-publisher": "workspace:*", "@origintrail-official/dkg-storage": "workspace:*", diff --git a/packages/cli/src/api-client.ts b/packages/cli/src/api-client.ts index 4354f3ac2..d51715b38 100644 --- a/packages/cli/src/api-client.ts +++ b/packages/cli/src/api-client.ts @@ -551,6 +551,18 @@ export class ApiClient { return this.get(`/api/context-graph/${encodeURIComponent(contextGraphId)}/participants`); } + async registerKafkaEndpoint(request: { + contextGraphId: string; + broker: string; + topic: string; + messageFormat: string; + }): Promise<{ + uri: string; + contextGraphId: string; + }> { + return this.post('/api/kafka/endpoint', request); + } + async signJoinRequest(contextGraphId: string): Promise<{ ok: boolean; status?: string; diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index f8b340c2a..0e55ea5de 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -1715,6 +1715,41 @@ assertionCmd } }); +// ─── dkg kafka ────────────────────────────────────────────────────── + +const kafkaCmd = program + .command('kafka') + .description('Kafka metadata registration commands'); + +const kafkaEndpointCmd = kafkaCmd + .command('endpoint') + .description('Kafka topic endpoint operations'); + +kafkaEndpointCmd + .command('register') + .description('Register a Kafka topic endpoint as a knowledge asset in a named context graph') + .requiredOption('--cg ', 'Target context graph') + .requiredOption('--broker ', 'Kafka broker host:port') + .requiredOption('--topic ', 'Kafka topic name') + .option('--format ', 'Kafka message format MIME type', 'application/json') + .action(async (opts: ActionOpts) => { + try { + const client = await ApiClient.connect(); + const result = await client.registerKafkaEndpoint({ + contextGraphId: opts.cg, + broker: opts.broker, + topic: opts.topic, + messageFormat: opts.format, + }); + console.log('Kafka endpoint registered:'); + console.log(` URI: ${result.uri}`); + console.log(` Context graph: ${result.contextGraphId}`); + } catch (err) { + console.error(toErrorMessage(err)); + process.exit(1); + } + }); + // ─── dkg openclaw ─────────────────────────────────────────────────── const openclawCmd = program diff --git a/packages/cli/src/daemon/handle-request.ts b/packages/cli/src/daemon/handle-request.ts index 34ce76ffc..07f07dd3a 100644 --- a/packages/cli/src/daemon/handle-request.ts +++ b/packages/cli/src/daemon/handle-request.ts @@ -330,6 +330,7 @@ import { handleAssertionRoutes } from './routes/assertion.js'; import { handleQueryRoutes } from './routes/query.js'; import { handleLocalAgentsRoutes } from './routes/local-agents.js'; import { handleEpcisRoutes } from './routes/epcis.js'; +import { handleKafkaRoutes } from './routes/kafka.js'; export async function handleRequest( @@ -431,5 +432,8 @@ export async function handleRequest( await handleEpcisRoutes(ctx); if (res.writableEnded) return; + await handleKafkaRoutes(ctx); + if (res.writableEnded) return; + jsonResponse(res, 404, { error: 'Not found' }); } diff --git a/packages/cli/src/daemon/routes/kafka.ts b/packages/cli/src/daemon/routes/kafka.ts new file mode 100644 index 000000000..550f6f05e --- /dev/null +++ b/packages/cli/src/daemon/routes/kafka.ts @@ -0,0 +1,71 @@ +import { jsonResponse, readBody, validateRequiredContextGraphId } from '../http-utils.js'; +import type { RequestContext } from './context.js'; +import { + registerKafkaEndpoint, + type KafkaEndpointPublisher, +} from '@origintrail-official/dkg-kafka'; + +function isNonEmptyString(value: unknown): value is string { + return typeof value === 'string' && value.trim().length > 0; +} + +export async function handleKafkaRoutes(ctx: RequestContext): Promise { + const { + req, + res, + agent, + path, + requestAgentAddress, + } = ctx; + + if (req.method === 'POST' && path === '/api/kafka/endpoint') { + const body = await readBody(req); + let parsed: unknown; + try { + parsed = JSON.parse(body); + } catch { + return jsonResponse(res, 400, { error: 'Invalid JSON in request body' }); + } + + const { + contextGraphId, + broker, + topic, + messageFormat, + } = parsed as Record; + + if (!validateRequiredContextGraphId(contextGraphId, res)) { + return; + } + const targetContextGraphId = contextGraphId as string; + if (!isNonEmptyString(broker)) { + return jsonResponse(res, 400, { error: '"broker" must be a non-empty string' }); + } + if (!isNonEmptyString(topic)) { + return jsonResponse(res, 400, { error: '"topic" must be a non-empty string' }); + } + if (!isNonEmptyString(messageFormat)) { + return jsonResponse(res, 400, { error: '"messageFormat" must be a non-empty string' }); + } + + const publisher: KafkaEndpointPublisher = { + async publish(cgId, content) { + await agent.publish( + cgId, + { public: content } as Record, + ); + }, + }; + + const result = await registerKafkaEndpoint({ + contextGraphId: targetContextGraphId, + owner: requestAgentAddress.toLowerCase(), + broker, + topic, + messageFormat, + publisher, + }); + + return jsonResponse(res, 200, result); + } +} diff --git a/packages/cli/test/api-client.test.ts b/packages/cli/test/api-client.test.ts index 040164a74..7a6cfff20 100644 --- a/packages/cli/test/api-client.test.ts +++ b/packages/cli/test/api-client.test.ts @@ -194,6 +194,34 @@ describe('ApiClient', () => { expect(body.name).toBe('incident'); }); + it('registerKafkaEndpoint() posts the endpoint payload', async () => { + const { fetch, calls } = createTrackingFetch({ + ok: true, + status: 200, + body: { + uri: 'urn:dkg:kafka-endpoint:0xabc:hash', + contextGraphId: 'devnet-test', + }, + }); + globalThis.fetch = fetch; + + await client.registerKafkaEndpoint({ + contextGraphId: 'devnet-test', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + }); + + expect(calls[0].url).toBe(`http://127.0.0.1:${PORT}/api/kafka/endpoint`); + const body = JSON.parse(calls[0].opts.body as string); + expect(body).toEqual({ + contextGraphId: 'devnet-test', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + }); + }); + it('approveCclPolicy() posts approval payload', async () => { const { fetch, calls } = createTrackingFetch({ ok: true, status: 200, body: { policyUri: 'urn:policy', bindingUri: 'urn:binding', approvedAt: 'now' } }); globalThis.fetch = fetch; diff --git a/packages/cli/test/kafka-cli-smoke.test.ts b/packages/cli/test/kafka-cli-smoke.test.ts new file mode 100644 index 000000000..f80ec60b2 --- /dev/null +++ b/packages/cli/test/kafka-cli-smoke.test.ts @@ -0,0 +1,95 @@ +import { beforeAll, afterAll, describe, expect, it } from 'vitest'; +import { createServer } from 'node:http'; +import { execFile } from 'node:child_process'; +import { promisify } from 'node:util'; +import { mkdtemp, writeFile, rm } from 'node:fs/promises'; +import { existsSync } from 'node:fs'; +import { join, dirname } from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { tmpdir } from 'node:os'; + +const execFileAsync = promisify(execFile); +const __dirname = dirname(fileURLToPath(import.meta.url)); +const CLI_ENTRY = join(__dirname, '..', 'dist', 'cli.js'); + +describe.sequential('kafka CLI smoke', () => { + let dkgHome: string; + let server: ReturnType; + let smokeApiPort: string; + let lastBody = ''; + let lastAuthHeader = ''; + + beforeAll(async () => { + dkgHome = await mkdtemp(join(tmpdir(), 'dkg-kafka-cli-')); + if (!existsSync(CLI_ENTRY)) { + await execFileAsync('pnpm', ['build'], { cwd: join(__dirname, '..') }); + } + if (!existsSync(CLI_ENTRY)) { + throw new Error(`CLI entry not found after build: ${CLI_ENTRY}`); + } + + await writeFile(join(dkgHome, 'auth.token'), 'smoke-token\n'); + + server = createServer(async (req, res) => { + if (req.method === 'POST' && req.url === '/api/kafka/endpoint') { + lastAuthHeader = String(req.headers.authorization ?? ''); + const chunks: Buffer[] = []; + for await (const chunk of req) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + lastBody = Buffer.concat(chunks).toString('utf8'); + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ + uri: 'urn:dkg:kafka-endpoint:0xabc:hash', + contextGraphId: 'devnet-test', + })); + return; + } + + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Not found' })); + }); + + await new Promise((resolve, reject) => { + server.once('error', reject); + server.listen(0, '127.0.0.1', () => { + const addr = server.address(); + smokeApiPort = typeof addr === 'object' && addr ? String(addr.port) : '0'; + resolve(); + }); + }); + }); + + afterAll(async () => { + await new Promise((resolve) => server.close(() => resolve())); + await rm(dkgHome, { recursive: true, force: true }); + }); + + it('registers a Kafka endpoint through the CLI', async () => { + const env = { ...process.env, DKG_HOME: dkgHome, DKG_API_PORT: smokeApiPort }; + + const result = await execFileAsync('node', [ + CLI_ENTRY, + 'kafka', + 'endpoint', + 'register', + '--cg', + 'devnet-test', + '--broker', + 'kafka.example.com:9092', + '--topic', + 'orders.created', + ], { env }); + + expect(result.stdout).toContain('Kafka endpoint registered:'); + expect(result.stdout).toContain('urn:dkg:kafka-endpoint:0xabc:hash'); + expect(result.stdout).toContain('devnet-test'); + expect(lastAuthHeader).toBe('Bearer smoke-token'); + expect(JSON.parse(lastBody)).toEqual({ + contextGraphId: 'devnet-test', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + }); + }, 15000); +}); diff --git a/packages/cli/tsconfig.json b/packages/cli/tsconfig.json index e1baaa902..a43344a6c 100644 --- a/packages/cli/tsconfig.json +++ b/packages/cli/tsconfig.json @@ -10,6 +10,7 @@ { "path": "../core" }, { "path": "../agent" }, { "path": "../epcis" }, + { "path": "../kafka" }, { "path": "../node-ui" }, { "path": "../adapter-openclaw" }, { "path": "../mcp-dkg" } diff --git a/packages/kafka/package.json b/packages/kafka/package.json new file mode 100644 index 000000000..abd8eca4e --- /dev/null +++ b/packages/kafka/package.json @@ -0,0 +1,31 @@ +{ + "name": "@origintrail-official/dkg-kafka", + "version": "10.0.0-rc.4", + "type": "module", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "scripts": { + "build": "tsc", + "test": "vitest run", + "test:coverage": "vitest run --coverage", + "clean": "rm -rf dist tsconfig.tsbuildinfo" + }, + "devDependencies": { + "@vitest/coverage-v8": "^4.0.18", + "vitest": "^4.0.18" + }, + "publishConfig": { + "access": "public" + }, + "files": [ + "dist", + "README.md", + "LICENSE" + ], + "license": "Apache-2.0", + "repository": { + "type": "git", + "url": "https://github.com/OriginTrail/dkg-v9.git", + "directory": "packages/kafka" + } +} diff --git a/packages/kafka/src/endpoint.ts b/packages/kafka/src/endpoint.ts new file mode 100644 index 000000000..22888c8d2 --- /dev/null +++ b/packages/kafka/src/endpoint.ts @@ -0,0 +1,52 @@ +import { buildKafkaEndpointKnowledgeAsset } from './ka-builder.js'; +import { buildKafkaEndpointUri } from './uri.js'; + +/** + * Dependency-inversion boundary: the kafka package needs something that can + * publish a JSON-LD knowledge asset. The package hands the bare KA across this + * interface; envelope wrapping (e.g. `{ public: ... }`) belongs to the caller. + */ +export type KafkaEndpointKnowledgeAsset = ReturnType; + +export interface KafkaEndpointPublisher { + publish( + contextGraphId: string, + knowledgeAsset: KafkaEndpointKnowledgeAsset, + ): Promise; +} + +export interface RegisterKafkaEndpointInput { + contextGraphId: string; + owner: string; + broker: string; + topic: string; + messageFormat: string; + issuedAt?: string; + publisher: KafkaEndpointPublisher; +} + +export interface RegisterKafkaEndpointResult { + uri: string; + contextGraphId: string; +} + +export async function registerKafkaEndpoint( + input: RegisterKafkaEndpointInput, +): Promise { + const issuedAt = input.issuedAt ?? new Date().toISOString(); + const uri = buildKafkaEndpointUri(input); + const knowledgeAsset = buildKafkaEndpointKnowledgeAsset({ + owner: input.owner, + broker: input.broker, + topic: input.topic, + messageFormat: input.messageFormat, + issuedAt, + }); + + await input.publisher.publish(input.contextGraphId, knowledgeAsset); + + return { + uri, + contextGraphId: input.contextGraphId, + }; +} diff --git a/packages/kafka/src/index.ts b/packages/kafka/src/index.ts new file mode 100644 index 000000000..9e74b9f44 --- /dev/null +++ b/packages/kafka/src/index.ts @@ -0,0 +1,3 @@ +export * from './uri.js'; +export * from './ka-builder.js'; +export * from './endpoint.js'; diff --git a/packages/kafka/src/ka-builder.ts b/packages/kafka/src/ka-builder.ts new file mode 100644 index 000000000..42fe40c30 --- /dev/null +++ b/packages/kafka/src/ka-builder.ts @@ -0,0 +1,39 @@ +import { buildKafkaEndpointUri } from './uri.js'; + +const KAFKA_ENDPOINT_CONTEXT = { + dcat: 'http://www.w3.org/ns/dcat#', + dct: 'http://purl.org/dc/terms/', + dkg: 'https://ontology.dkg.io/dkg#', + xsd: 'http://www.w3.org/2001/XMLSchema#', +} as const; + +export interface BuildKafkaEndpointKnowledgeAssetInput { + owner: string; + broker: string; + topic: string; + messageFormat: string; + issuedAt: string; +} + +export function buildKafkaEndpointKnowledgeAsset(input: BuildKafkaEndpointKnowledgeAssetInput) { + const owner = input.owner.toLowerCase(); + + return { + '@context': KAFKA_ENDPOINT_CONTEXT, + '@id': buildKafkaEndpointUri(input), + '@type': ['dkg:KafkaTopicEndpoint', 'dcat:DataService'], + 'dcat:endpointURL': { + '@id': `kafka://${input.broker}/${input.topic}`, + }, + 'dkg:broker': input.broker, + 'dkg:topic': input.topic, + 'dkg:messageFormat': input.messageFormat, + 'dct:publisher': { + '@id': `urn:dkg:agent:${owner}`, + }, + 'dct:issued': { + '@value': input.issuedAt, + '@type': 'xsd:dateTime', + }, + }; +} diff --git a/packages/kafka/src/uri.ts b/packages/kafka/src/uri.ts new file mode 100644 index 000000000..86ad2df53 --- /dev/null +++ b/packages/kafka/src/uri.ts @@ -0,0 +1,19 @@ +import { createHash } from 'node:crypto'; + +export interface KafkaEndpointIdentity { + owner: string; + broker: string; + topic: string; +} + +function hashBrokerAndTopic(broker: string, topic: string): string { + return createHash('sha256') + .update(`${broker.toLowerCase()}|${topic}`) + .digest('hex'); +} + +export function buildKafkaEndpointUri(identity: KafkaEndpointIdentity): string { + const owner = identity.owner.toLowerCase(); + const hash = hashBrokerAndTopic(identity.broker, identity.topic); + return `urn:dkg:kafka-endpoint:${owner}:${hash}`; +} diff --git a/packages/kafka/test/e2e/walking-skeleton.test.ts b/packages/kafka/test/e2e/walking-skeleton.test.ts new file mode 100644 index 000000000..9cca516be --- /dev/null +++ b/packages/kafka/test/e2e/walking-skeleton.test.ts @@ -0,0 +1,189 @@ +import { access, readFile } from 'node:fs/promises'; +import { execFile } from 'node:child_process'; +import { constants } from 'node:fs'; +import { dirname, join } from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { promisify } from 'node:util'; +import { beforeAll, beforeEach, describe, expect, it } from 'vitest'; +import { ApiClient } from '../../../cli/src/api-client.js'; +import { buildKafkaEndpointUri } from '../../src/uri.js'; + +const execFileAsync = promisify(execFile); + +const __dirname = dirname(fileURLToPath(import.meta.url)); +const REPO_ROOT = join(__dirname, '..', '..', '..', '..'); +const CLI_ENTRY = join(REPO_ROOT, 'packages', 'cli', 'dist', 'cli.js'); +const DEVNET_NODE1_HOME = + process.env.DKG_KAFKA_DEVNET_HOME ?? join(REPO_ROOT, '.devnet', 'node1'); +const RUN_E2E = + process.env.DKG_KAFKA_E2E === '1' || process.env.DKG_KAFKA_E2E === 'true'; +const CONTEXT_GRAPH_ID = 'devnet-test'; + +function parseTokenFile(raw: string): string { + return raw + .split('\n') + .map((line) => line.trim()) + .find((line) => line.length > 0 && !line.startsWith('#')) ?? ''; +} + +function stripQuotedLiteral(value: string): string { + const typed = value.match(/^"(.*)"(?:\^\^<.*>)?$/s); + return typed ? typed[1] : value; +} + +function stripIriDelimiters(value: string): string { + if (value.startsWith('<') && value.endsWith('>')) { + return value.slice(1, -1); + } + return value; +} + +function agentAddressFromUri(agentUri: string): string { + if (agentUri.startsWith('urn:dkg:agent:')) { + return agentUri.slice('urn:dkg:agent:'.length); + } + if (agentUri.startsWith('did:dkg:agent:')) { + return agentUri.slice('did:dkg:agent:'.length); + } + throw new Error(`Unsupported agent URI: ${agentUri}`); +} + +async function waitForEndpointRow( + client: ApiClient, + contextGraphId: string, + uri: string, +): Promise> { + // Endpoint triples land in a named graph (one per CG), so the WHERE + // pattern must be wrapped in GRAPH ?g. Default-graph patterns return + // empty bindings against this store — see daemon/routes/query.ts. + const sparql = ` + PREFIX dcat: + PREFIX dct: + PREFIX dkg: + SELECT ?broker ?topic ?messageFormat ?publisher ?endpointUrl ?issued + WHERE { + GRAPH ?g { + BIND(<${uri}> AS ?endpoint) + ?endpoint a dkg:KafkaTopicEndpoint, dcat:DataService ; + dkg:broker ?broker ; + dkg:topic ?topic ; + dkg:messageFormat ?messageFormat ; + dct:publisher ?publisher ; + dct:issued ?issued ; + dcat:endpointURL ?endpointUrl . + } + } + `; + + const deadline = Date.now() + 20_000; + while (Date.now() < deadline) { + const response = await client.query(sparql, contextGraphId); + // Daemon's /api/query response is { result: { bindings: [...] } } — the + // optional `type` discriminator on QueryResult is only set on a couple + // of legacy paths and is absent here, so we key off `bindings` directly. + const bindings = (response.result as { bindings?: Array> }).bindings; + if (Array.isArray(bindings) && bindings.length > 0) { + return bindings[0]!; + } + await new Promise((resolve) => setTimeout(resolve, 1_000)); + } + + throw new Error(`Kafka endpoint ${uri} was not queryable in ${contextGraphId} within 20s`); +} + +describe('kafka walking skeleton e2e', () => { + let devnetReachable = false; + let port = 0; + let token = ''; + let client: ApiClient; + let owner = ''; + + beforeAll(async () => { + if (!RUN_E2E) return; + + try { + await access(CLI_ENTRY, constants.F_OK); + } catch { + await execFileAsync('pnpm', ['--dir', 'packages/cli', 'build'], { + cwd: REPO_ROOT, + }); + } + + const [tokenRaw, portRaw] = await Promise.all([ + readFile(join(DEVNET_NODE1_HOME, 'auth.token'), 'utf8'), + readFile(join(DEVNET_NODE1_HOME, 'api.port'), 'utf8'), + ]); + + token = parseTokenFile(tokenRaw); + port = parseInt(portRaw.trim(), 10); + client = new ApiClient(port, token); + + try { + const status = await client.status(); + devnetReachable = typeof status.peerId === 'string' && status.peerId.length > 0; + } catch { + devnetReachable = false; + } + + if (!devnetReachable) return; + + const agents = await client.agents(); + const selfAgent = agents.agents.find((agent) => (agent as any).connectionStatus === 'self'); + const agentUri = selfAgent?.agentUri; + const agentAddress = (selfAgent as any)?.agentAddress; + if (!agentUri || (!agentAddress && !agentUri.startsWith('urn:dkg:agent:') && !agentUri.startsWith('did:dkg:agent:'))) { + throw new Error(`Could not resolve publishing agent identity from /api/agents`); + } + owner = String(agentAddress ?? agentAddressFromUri(agentUri)).toLowerCase(); + }, 120_000); + + beforeEach(({ skip }) => { + if (!RUN_E2E || !devnetReachable) skip(); + }); + + it('registers a Kafka endpoint into the named context graph and discovers it via SPARQL', async () => { + const broker = 'kafka.e2e.local:9092'; + const topic = `walking-skeleton.${Date.now()}`; + const messageFormat = 'application/cloudevents+json'; + const expectedUri = buildKafkaEndpointUri({ owner, broker, topic }); + + const result = await execFileAsync( + 'node', + [ + CLI_ENTRY, + 'kafka', + 'endpoint', + 'register', + '--cg', + CONTEXT_GRAPH_ID, + '--broker', + broker, + '--topic', + topic, + '--format', + messageFormat, + ], + { + cwd: REPO_ROOT, + env: { + ...process.env, + DKG_HOME: DEVNET_NODE1_HOME, + DKG_API_PORT: String(port), + }, + }, + ); + + expect(result.stdout).toContain('Kafka endpoint registered:'); + expect(result.stdout).toContain(expectedUri); + expect(result.stdout).toContain(CONTEXT_GRAPH_ID); + + const row = await waitForEndpointRow(client, CONTEXT_GRAPH_ID, expectedUri); + + expect(stripQuotedLiteral(row.broker ?? '')).toBe(broker); + expect(stripQuotedLiteral(row.topic ?? '')).toBe(topic); + expect(stripQuotedLiteral(row.messageFormat ?? '')).toBe(messageFormat); + expect(stripIriDelimiters(row.publisher ?? '')).toBe(`urn:dkg:agent:${owner}`); + expect(stripIriDelimiters(row.endpointUrl ?? '')).toBe(`kafka://${broker}/${topic}`); + expect(Number.isNaN(Date.parse(stripQuotedLiteral(row.issued ?? '')))).toBe(false); + }, 90_000); +}); diff --git a/packages/kafka/test/endpoint.register.test.ts b/packages/kafka/test/endpoint.register.test.ts new file mode 100644 index 000000000..e22f06fcb --- /dev/null +++ b/packages/kafka/test/endpoint.register.test.ts @@ -0,0 +1,58 @@ +import { describe, expect, it } from 'vitest'; +import { registerKafkaEndpoint } from '../src/endpoint.js'; + +describe('registerKafkaEndpoint', () => { + it('publishes the Kafka endpoint KA into the named context graph', async () => { + const calls: Array<{ contextGraphId: string; content: unknown }> = []; + const publisher = { + async publish(contextGraphId: string, content: unknown) { + calls.push({ contextGraphId, content }); + return { ual: 'did:dkg:test/1', kcId: '1', status: 'confirmed' as const }; + }, + }; + + const result = await registerKafkaEndpoint({ + contextGraphId: 'devnet-test', + owner: '0xAbCDEFabcdefABCDEFabcdefABCDEFabcdefABCD', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + issuedAt: '2026-05-04T12:34:56.000Z', + publisher, + }); + + expect(result).toEqual({ + uri: 'urn:dkg:kafka-endpoint:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd:' + + '33b58f60595c766739f72b29e4ee417888d1a46af8339a4b5bdb1c3a5692f652', + contextGraphId: 'devnet-test', + }); + + expect(calls).toHaveLength(1); + expect(calls[0]).toEqual({ + contextGraphId: 'devnet-test', + content: { + '@context': { + dcat: 'http://www.w3.org/ns/dcat#', + dct: 'http://purl.org/dc/terms/', + dkg: 'https://ontology.dkg.io/dkg#', + xsd: 'http://www.w3.org/2001/XMLSchema#', + }, + '@id': result.uri, + '@type': ['dkg:KafkaTopicEndpoint', 'dcat:DataService'], + 'dcat:endpointURL': { + '@id': 'kafka://kafka.example.com:9092/orders.created', + }, + 'dkg:broker': 'kafka.example.com:9092', + 'dkg:topic': 'orders.created', + 'dkg:messageFormat': 'application/json', + 'dct:publisher': { + '@id': 'urn:dkg:agent:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd', + }, + 'dct:issued': { + '@value': '2026-05-04T12:34:56.000Z', + '@type': 'xsd:dateTime', + }, + }, + }); + }); +}); diff --git a/packages/kafka/test/fixtures/endpoint-ka.json b/packages/kafka/test/fixtures/endpoint-ka.json new file mode 100644 index 000000000..71da18435 --- /dev/null +++ b/packages/kafka/test/fixtures/endpoint-ka.json @@ -0,0 +1,26 @@ +{ + "@context": { + "dcat": "http://www.w3.org/ns/dcat#", + "dct": "http://purl.org/dc/terms/", + "dkg": "https://ontology.dkg.io/dkg#", + "xsd": "http://www.w3.org/2001/XMLSchema#" + }, + "@id": "urn:dkg:kafka-endpoint:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd:33b58f60595c766739f72b29e4ee417888d1a46af8339a4b5bdb1c3a5692f652", + "@type": [ + "dkg:KafkaTopicEndpoint", + "dcat:DataService" + ], + "dcat:endpointURL": { + "@id": "kafka://kafka.example.com:9092/orders.created" + }, + "dkg:broker": "kafka.example.com:9092", + "dkg:topic": "orders.created", + "dkg:messageFormat": "application/json", + "dct:publisher": { + "@id": "urn:dkg:agent:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd" + }, + "dct:issued": { + "@value": "2026-05-04T12:34:56.000Z", + "@type": "xsd:dateTime" + } +} diff --git a/packages/kafka/test/ka-builder.test.ts b/packages/kafka/test/ka-builder.test.ts new file mode 100644 index 000000000..cb39ba5e8 --- /dev/null +++ b/packages/kafka/test/ka-builder.test.ts @@ -0,0 +1,20 @@ +import { readFile } from 'node:fs/promises'; +import { describe, expect, it } from 'vitest'; +import { buildKafkaEndpointKnowledgeAsset } from '../src/ka-builder.js'; + +describe('buildKafkaEndpointKnowledgeAsset', () => { + it('builds the minimum Kafka endpoint KA shape', async () => { + const actual = buildKafkaEndpointKnowledgeAsset({ + owner: '0xAbCDEFabcdefABCDEFabcdefABCDEFabcdefABCD', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + messageFormat: 'application/json', + issuedAt: '2026-05-04T12:34:56.000Z', + }); + + const fixtureUrl = new URL('./fixtures/endpoint-ka.json', import.meta.url); + const expected = JSON.parse(await readFile(fixtureUrl, 'utf8')); + + expect(actual).toEqual(expected); + }); +}); diff --git a/packages/kafka/test/uri.test.ts b/packages/kafka/test/uri.test.ts new file mode 100644 index 000000000..c318df5e5 --- /dev/null +++ b/packages/kafka/test/uri.test.ts @@ -0,0 +1,49 @@ +import { describe, expect, it } from 'vitest'; +import { buildKafkaEndpointUri } from '../src/uri.js'; + +describe('buildKafkaEndpointUri', () => { + it('builds a deterministic URN from owner, broker, and topic', () => { + expect( + buildKafkaEndpointUri({ + owner: '0xAbCDEFabcdefABCDEFabcdefABCDEFabcdefABCD', + broker: 'Kafka.EXAMPLE.com:9092', + topic: 'orders.created', + }), + ).toBe( + 'urn:dkg:kafka-endpoint:0xabcdefabcdefabcdefabcdefabcdefabcdefabcd:' + + '33b58f60595c766739f72b29e4ee417888d1a46af8339a4b5bdb1c3a5692f652', + ); + }); + + it('changes the URN when the topic changes', () => { + const left = buildKafkaEndpointUri({ + owner: '0xabcdefabcdefabcdefabcdefabcdefabcdefabcd', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + }); + + const right = buildKafkaEndpointUri({ + owner: '0xabcdefabcdefabcdefabcdefabcdefabcdefabcd', + broker: 'kafka.example.com:9092', + topic: 'orders.updated', + }); + + expect(left).not.toBe(right); + }); + + it('normalizes broker and owner casing before hashing', () => { + const left = buildKafkaEndpointUri({ + owner: '0xabcdefabcdefabcdefabcdefabcdefabcdefabcd', + broker: 'kafka.example.com:9092', + topic: 'orders.created', + }); + + const right = buildKafkaEndpointUri({ + owner: '0xABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCD', + broker: 'KAFKA.EXAMPLE.COM:9092', + topic: 'orders.created', + }); + + expect(left).toBe(right); + }); +}); diff --git a/packages/kafka/tsconfig.json b/packages/kafka/tsconfig.json new file mode 100644 index 000000000..d231bbc57 --- /dev/null +++ b/packages/kafka/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src", + "composite": true + }, + "include": ["src"] +} diff --git a/packages/kafka/vitest.config.ts b/packages/kafka/vitest.config.ts new file mode 100644 index 000000000..2b68f1e59 --- /dev/null +++ b/packages/kafka/vitest.config.ts @@ -0,0 +1,14 @@ +import { defineConfig } from 'vitest/config'; +import { kosavaKafkaCoverage } from '../../vitest.coverage'; + +export default defineConfig({ + test: { + include: ['test/**/*.test.ts'], + coverage: { + provider: 'v8', + reporter: ['text', 'html', 'lcov', 'json-summary'], + reportsDirectory: './coverage', + thresholds: kosavaKafkaCoverage, + }, + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index cacc8324a..1ec1298b4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -225,6 +225,9 @@ importers: '@origintrail-official/dkg-epcis': specifier: workspace:* version: link:../epcis + '@origintrail-official/dkg-kafka': + specifier: workspace:* + version: link:../kafka '@origintrail-official/dkg-mcp': specifier: workspace:* version: link:../mcp-dkg @@ -484,6 +487,15 @@ importers: specifier: ^8.3.3 version: 8.3.3(web-streams-polyfill@3.3.3) + packages/kafka: + devDependencies: + '@vitest/coverage-v8': + specifier: ^4.0.18 + version: 4.0.18(vitest@4.0.18(@types/node@22.19.11)(happy-dom@20.8.3(bufferutil@4.1.0)(utf-8-validate@5.0.10))(jiti@2.6.1)(tsx@4.21.0)(yaml@2.8.3)) + vitest: + specifier: ^4.0.18 + version: 4.0.18(@types/node@22.19.11)(happy-dom@20.8.3(bufferutil@4.1.0)(utf-8-validate@5.0.10))(jiti@2.6.1)(tsx@4.21.0)(yaml@2.8.3) + packages/mcp-dkg: dependencies: '@modelcontextprotocol/sdk': diff --git a/scripts/devnet.sh b/scripts/devnet.sh index c89acfed7..2d1c24654 100755 --- a/scripts/devnet.sh +++ b/scripts/devnet.sh @@ -101,10 +101,36 @@ ensure_built() { start_hardhat() { local pidfile="$DEVNET_DIR/hardhat.pid" + local marker="$DEVNET_DIR/hardhat/deployed" + local artifacts_dir="$REPO_ROOT/packages/evm-module/artifacts/contracts" if [ -f "$pidfile" ] && kill -0 "$(cat "$pidfile")" 2>/dev/null; then - log "Hardhat node already running (PID $(cat "$pidfile"))" - return 0 + # Stale-bytecode guard. When contracts are recompiled while hardhat + # keeps running, the on-chain bytecode lags the artifacts on disk. + # The next `devnet start` short-circuits hardhat reuse here, then + # `deploy_contracts` skips on the still-present marker, so daemons + # connect to addresses whose code is older than the source. View + # methods added since the live deploy revert with "function selector + # not recognized" (e.g. CSS.getNodeStakeV10 in the staking loop). + # Detect the mismatch by comparing artifact mtimes to the marker; + # if any artifact is newer, kill hardhat so the fresh-start path + # below clears the marker + deployments and `deploy_contracts` + # re-deploys onto a fresh chain. + if [ -f "$marker" ] && [ -d "$artifacts_dir" ] \ + && [ -n "$(find "$artifacts_dir" -name '*.json' -newer "$marker" -print -quit 2>/dev/null)" ]; then + local stale_pid + stale_pid=$(cat "$pidfile") + log "Hardhat (PID $stale_pid) holds outdated contracts (artifacts newer than deploy marker) — restarting for fresh deploy" + kill "$stale_pid" 2>/dev/null || true + for _ in 1 2 3 4 5; do + kill -0 "$stale_pid" 2>/dev/null || break + sleep 1 + done + rm -f "$pidfile" + else + log "Hardhat node already running (PID $(cat "$pidfile"))" + return 0 + fi fi log "Starting Hardhat node on port $HARDHAT_PORT..." @@ -739,7 +765,7 @@ cmd_start() { const opSigners = new Array(n).fill(null); const nodeRoles = new Array(n).fill('edge'); // Codex round 4 on PR #368: read config.json FIRST and INDEPENDENTLY - // of wallets.json. The previous loop did wallets first and `continue`d + // of wallets.json. The previous loop did wallets first and \`continue\`d // on parse failure BEFORE reading config, so an intended core whose // wallets.json was malformed silently kept the 'edge' default, // dropped out of coreIdxs, and the lostCores guard never fired. @@ -865,7 +891,7 @@ cmd_start() { // this code path exists to prevent. // // Codex round 2 on PR #368: re-read 'pending' before EVERY tx - // (not once per node loop). The previous code did `nonce++` at + // (not once per node loop). The previous code did \`nonce++\` at // call site so a failed approve advanced the local counter even // though the chain nonce did not, leaving createConviction + // updateAsk to send with an inflated nonce that would either @@ -878,7 +904,7 @@ cmd_start() { return tx.wait(); }; // Codex round 4 on PR #368: skip createConviction if the daemon - // already opened a position. PR 366 wired `EVMChainAdapter.ensureProfile()` + // already opened a position. PR 366 wired \`EVMChainAdapter.ensureProfile()\` // to open a default 50k V10 conviction during agent startup, so by // the time this script reaches the staking loop the daemon has // (usually) already staked. Running createConviction again would @@ -895,8 +921,8 @@ cmd_start() { // updateAsk is INDEPENDENT of stake state, so we still attempt // it below regardless of the probe outcome. // - // Codex round 7 on PR #368: probe `getNodeStakeV10(idId)` instead - // of NFT `balanceOf(opSigner.address)`. The op wallet could hold + // Codex round 7 on PR #368: probe \`getNodeStakeV10(idId)\` instead + // of NFT \`balanceOf(opSigner.address)\`. The op wallet could hold // positions for OTHER identities (not in our devnet today, but // a wallet-scoped probe is the wrong semantic check); we want // to know whether THIS identity is staked. diff --git a/vitest.coverage.ts b/vitest.coverage.ts index 9dc0f59f0..7123b7e22 100644 --- a/vitest.coverage.ts +++ b/vitest.coverage.ts @@ -159,6 +159,13 @@ export const kosavaEpcisCoverage: CoverageThresholds = { statements: 97, }; +export const kosavaKafkaCoverage: CoverageThresholds = { + lines: 100, + functions: 100, + branches: 50, + statements: 100, +}; + /** * @deprecated Import a tier-specific export (e.g. `kosavaNodeUiCoverage`). * Kept for any external tooling that still references the old name.