Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
},
"scripts": {
"build": "turbo build",
"build:runtime:packages": "pnpm -r --filter @origintrail-official/dkg-core --filter @origintrail-official/dkg-storage --filter @origintrail-official/dkg-query --filter @origintrail-official/dkg-publisher --filter @origintrail-official/dkg-chain --filter @origintrail-official/dkg-epcis --filter @origintrail-official/dkg-random-sampling --filter @origintrail-official/dkg-agent --filter @origintrail-official/dkg-graph-viz --filter @origintrail-official/dkg-node-ui --filter @origintrail-official/dkg-adapter-openclaw --filter @origintrail-official/dkg run build",
"build:runtime:packages": "pnpm -r --filter @origintrail-official/dkg-core --filter @origintrail-official/dkg-storage --filter @origintrail-official/dkg-query --filter @origintrail-official/dkg-publisher --filter @origintrail-official/dkg-chain --filter @origintrail-official/dkg-epcis --filter @origintrail-official/dkg-kafka --filter @origintrail-official/dkg-random-sampling --filter @origintrail-official/dkg-agent --filter @origintrail-official/dkg-graph-viz --filter @origintrail-official/dkg-node-ui --filter @origintrail-official/dkg-adapter-openclaw --filter @origintrail-official/dkg run build",
"build:runtime": "pnpm run build:runtime:packages && pnpm --filter @origintrail-official/dkg-node-ui run build:ui",
"test": "turbo test",
"test:coverage": "turbo test:coverage",
Expand Down
1 change: 1 addition & 0 deletions packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"@origintrail-official/dkg-core": "workspace:*",
"@origintrail-official/dkg-mcp": "workspace:*",
"@origintrail-official/dkg-epcis": "workspace:*",
"@origintrail-official/dkg-kafka": "workspace:*",
"@origintrail-official/dkg-node-ui": "workspace:*",
"@origintrail-official/dkg-publisher": "workspace:*",
"@origintrail-official/dkg-storage": "workspace:*",
Expand Down
12 changes: 12 additions & 0 deletions packages/cli/src/api-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,18 @@ export class ApiClient {
return this.get(`/api/context-graph/${encodeURIComponent(contextGraphId)}/participants`);
}

async registerKafkaEndpoint(request: {
contextGraphId: string;
broker: string;
topic: string;
messageFormat: string;
}): Promise<{
uri: string;
contextGraphId: string;
}> {
return this.post('/api/kafka/endpoint', request);
}

async signJoinRequest(contextGraphId: string): Promise<{
ok: boolean;
status?: string;
Expand Down
35 changes: 35 additions & 0 deletions packages/cli/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1715,6 +1715,41 @@ assertionCmd
}
});

// ─── dkg kafka ──────────────────────────────────────────────────────

const kafkaCmd = program
.command('kafka')
.description('Kafka metadata registration commands');

const kafkaEndpointCmd = kafkaCmd
.command('endpoint')
.description('Kafka topic endpoint operations');

kafkaEndpointCmd
.command('register')
.description('Register a Kafka topic endpoint as a knowledge asset in a named context graph')
.requiredOption('--cg <id>', 'Target context graph')
.requiredOption('--broker <host:port>', 'Kafka broker host:port')
.requiredOption('--topic <name>', 'Kafka topic name')
.option('--format <mime>', 'Kafka message format MIME type', 'application/json')
.action(async (opts: ActionOpts) => {
try {
const client = await ApiClient.connect();
const result = await client.registerKafkaEndpoint({
contextGraphId: opts.cg,
broker: opts.broker,
topic: opts.topic,
messageFormat: opts.format,
});
console.log('Kafka endpoint registered:');
console.log(` URI: ${result.uri}`);
console.log(` Context graph: ${result.contextGraphId}`);
} catch (err) {
console.error(toErrorMessage(err));
process.exit(1);
}
});

// ─── dkg openclaw ───────────────────────────────────────────────────

const openclawCmd = program
Expand Down
4 changes: 4 additions & 0 deletions packages/cli/src/daemon/handle-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ import { handleAssertionRoutes } from './routes/assertion.js';
import { handleQueryRoutes } from './routes/query.js';
import { handleLocalAgentsRoutes } from './routes/local-agents.js';
import { handleEpcisRoutes } from './routes/epcis.js';
import { handleKafkaRoutes } from './routes/kafka.js';


export async function handleRequest(
Expand Down Expand Up @@ -431,5 +432,8 @@ export async function handleRequest(
await handleEpcisRoutes(ctx);
if (res.writableEnded) return;

await handleKafkaRoutes(ctx);
if (res.writableEnded) return;

jsonResponse(res, 404, { error: 'Not found' });
}
71 changes: 71 additions & 0 deletions packages/cli/src/daemon/routes/kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { jsonResponse, readBody, validateRequiredContextGraphId } from '../http-utils.js';
import type { RequestContext } from './context.js';
import {
registerKafkaEndpoint,
type KafkaEndpointPublisher,
} from '@origintrail-official/dkg-kafka';

function isNonEmptyString(value: unknown): value is string {
return typeof value === 'string' && value.trim().length > 0;
}

export async function handleKafkaRoutes(ctx: RequestContext): Promise<void> {
const {
req,
res,
agent,
path,
requestAgentAddress,
} = ctx;

if (req.method === 'POST' && path === '/api/kafka/endpoint') {
const body = await readBody(req);
let parsed: unknown;
try {
parsed = JSON.parse(body);
} catch {
return jsonResponse(res, 400, { error: 'Invalid JSON in request body' });
}

const {
contextGraphId,
broker,
topic,
messageFormat,
} = parsed as Record<string, unknown>;

if (!validateRequiredContextGraphId(contextGraphId, res)) {
return;
}
const targetContextGraphId = contextGraphId as string;
if (!isNonEmptyString(broker)) {
return jsonResponse(res, 400, { error: '"broker" must be a non-empty string' });
}
if (!isNonEmptyString(topic)) {
return jsonResponse(res, 400, { error: '"topic" must be a non-empty string' });
}
if (!isNonEmptyString(messageFormat)) {
return jsonResponse(res, 400, { error: '"messageFormat" must be a non-empty string' });
}

const publisher: KafkaEndpointPublisher = {
async publish(cgId, content) {
await agent.publish(
cgId,
{ public: content } as Record<string, unknown>,
);
},
};

const result = await registerKafkaEndpoint({
contextGraphId: targetContextGraphId,
owner: requestAgentAddress.toLowerCase(),
broker,
topic,
messageFormat,
publisher,
});

return jsonResponse(res, 200, result);
}
}
28 changes: 28 additions & 0 deletions packages/cli/test/api-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,34 @@ describe('ApiClient', () => {
expect(body.name).toBe('incident');
});

it('registerKafkaEndpoint() posts the endpoint payload', async () => {
const { fetch, calls } = createTrackingFetch({
ok: true,
status: 200,
body: {
uri: 'urn:dkg:kafka-endpoint:0xabc:hash',
contextGraphId: 'devnet-test',
},
});
globalThis.fetch = fetch;

await client.registerKafkaEndpoint({
contextGraphId: 'devnet-test',
broker: 'kafka.example.com:9092',
topic: 'orders.created',
messageFormat: 'application/json',
});

expect(calls[0].url).toBe(`http://127.0.0.1:${PORT}/api/kafka/endpoint`);
const body = JSON.parse(calls[0].opts.body as string);
expect(body).toEqual({
contextGraphId: 'devnet-test',
broker: 'kafka.example.com:9092',
topic: 'orders.created',
messageFormat: 'application/json',
});
});

it('approveCclPolicy() posts approval payload', async () => {
const { fetch, calls } = createTrackingFetch({ ok: true, status: 200, body: { policyUri: 'urn:policy', bindingUri: 'urn:binding', approvedAt: 'now' } });
globalThis.fetch = fetch;
Expand Down
95 changes: 95 additions & 0 deletions packages/cli/test/kafka-cli-smoke.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { beforeAll, afterAll, describe, expect, it } from 'vitest';
import { createServer } from 'node:http';
import { execFile } from 'node:child_process';
import { promisify } from 'node:util';
import { mkdtemp, writeFile, rm } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import { join, dirname } from 'node:path';
import { fileURLToPath } from 'node:url';
import { tmpdir } from 'node:os';

const execFileAsync = promisify(execFile);
const __dirname = dirname(fileURLToPath(import.meta.url));
const CLI_ENTRY = join(__dirname, '..', 'dist', 'cli.js');

describe.sequential('kafka CLI smoke', () => {
let dkgHome: string;
let server: ReturnType<typeof createServer>;
let smokeApiPort: string;
let lastBody = '';
let lastAuthHeader = '';

beforeAll(async () => {
dkgHome = await mkdtemp(join(tmpdir(), 'dkg-kafka-cli-'));
if (!existsSync(CLI_ENTRY)) {
await execFileAsync('pnpm', ['build'], { cwd: join(__dirname, '..') });
}
if (!existsSync(CLI_ENTRY)) {
throw new Error(`CLI entry not found after build: ${CLI_ENTRY}`);
}

await writeFile(join(dkgHome, 'auth.token'), 'smoke-token\n');

server = createServer(async (req, res) => {
if (req.method === 'POST' && req.url === '/api/kafka/endpoint') {
lastAuthHeader = String(req.headers.authorization ?? '');
const chunks: Buffer[] = [];
for await (const chunk of req) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
lastBody = Buffer.concat(chunks).toString('utf8');
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
uri: 'urn:dkg:kafka-endpoint:0xabc:hash',
contextGraphId: 'devnet-test',
}));
return;
}

res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Not found' }));
});

await new Promise<void>((resolve, reject) => {
server.once('error', reject);
server.listen(0, '127.0.0.1', () => {
const addr = server.address();
smokeApiPort = typeof addr === 'object' && addr ? String(addr.port) : '0';
resolve();
});
});
});

afterAll(async () => {
await new Promise<void>((resolve) => server.close(() => resolve()));
await rm(dkgHome, { recursive: true, force: true });
});

it('registers a Kafka endpoint through the CLI', async () => {
const env = { ...process.env, DKG_HOME: dkgHome, DKG_API_PORT: smokeApiPort };

const result = await execFileAsync('node', [
CLI_ENTRY,
'kafka',
'endpoint',
'register',
'--cg',
'devnet-test',
'--broker',
'kafka.example.com:9092',
'--topic',
'orders.created',
], { env });

expect(result.stdout).toContain('Kafka endpoint registered:');
expect(result.stdout).toContain('urn:dkg:kafka-endpoint:0xabc:hash');
expect(result.stdout).toContain('devnet-test');
expect(lastAuthHeader).toBe('Bearer smoke-token');
expect(JSON.parse(lastBody)).toEqual({
contextGraphId: 'devnet-test',
broker: 'kafka.example.com:9092',
topic: 'orders.created',
messageFormat: 'application/json',
});
}, 15000);
});
1 change: 1 addition & 0 deletions packages/cli/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
{ "path": "../core" },
{ "path": "../agent" },
{ "path": "../epcis" },
{ "path": "../kafka" },
{ "path": "../node-ui" },
{ "path": "../adapter-openclaw" },
{ "path": "../mcp-dkg" }
Expand Down
31 changes: 31 additions & 0 deletions packages/kafka/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"name": "@origintrail-official/dkg-kafka",
"version": "10.0.0-rc.4",
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc",
"test": "vitest run",
"test:coverage": "vitest run --coverage",
"clean": "rm -rf dist tsconfig.tsbuildinfo"
},
"devDependencies": {
"@vitest/coverage-v8": "^4.0.18",
"vitest": "^4.0.18"
},
"publishConfig": {
"access": "public"
},
"files": [
"dist",
"README.md",
"LICENSE"
],
"license": "Apache-2.0",
"repository": {
"type": "git",
"url": "https://github.com/OriginTrail/dkg-v9.git",
"directory": "packages/kafka"
}
}
52 changes: 52 additions & 0 deletions packages/kafka/src/endpoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { buildKafkaEndpointKnowledgeAsset } from './ka-builder.js';
import { buildKafkaEndpointUri } from './uri.js';

/**
* Dependency-inversion boundary: the kafka package needs something that can
* publish a JSON-LD knowledge asset. The package hands the bare KA across this
* interface; envelope wrapping (e.g. `{ public: ... }`) belongs to the caller.
*/
export type KafkaEndpointKnowledgeAsset = ReturnType<typeof buildKafkaEndpointKnowledgeAsset>;

export interface KafkaEndpointPublisher {
publish(
contextGraphId: string,
knowledgeAsset: KafkaEndpointKnowledgeAsset,
): Promise<unknown>;
}

export interface RegisterKafkaEndpointInput {
contextGraphId: string;
owner: string;
broker: string;
topic: string;
messageFormat: string;
issuedAt?: string;
publisher: KafkaEndpointPublisher;
}

export interface RegisterKafkaEndpointResult {
uri: string;
contextGraphId: string;
}

export async function registerKafkaEndpoint(
input: RegisterKafkaEndpointInput,
): Promise<RegisterKafkaEndpointResult> {
const issuedAt = input.issuedAt ?? new Date().toISOString();
const uri = buildKafkaEndpointUri(input);
const knowledgeAsset = buildKafkaEndpointKnowledgeAsset({
owner: input.owner,
broker: input.broker,
topic: input.topic,
messageFormat: input.messageFormat,
issuedAt,
});

await input.publisher.publish(input.contextGraphId, knowledgeAsset);

return {
uri,
contextGraphId: input.contextGraphId,
};
}
3 changes: 3 additions & 0 deletions packages/kafka/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './uri.js';
export * from './ka-builder.js';
export * from './endpoint.js';
Loading
Loading