Skip to content
8 changes: 7 additions & 1 deletion packages/agent/src/dkg-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6251,7 +6251,13 @@ export class DKGAgent {
});
}

private async isPrivateContextGraph(contextGraphId: string): Promise<boolean> {
/**
* Returns true when the CG carries a `"private"` access-policy triple in
* the ontology or `_meta` graph, OR when its `_meta` graph contains any
* allowlist predicate (`DKG_ALLOWED_PEER`, `DKG_ALLOWED_AGENT`, or
* `DKG_PARTICIPANT_AGENT`). System paranets always return false.
*/
async isPrivateContextGraph(contextGraphId: string): Promise<boolean> {
if ((Object.values(SYSTEM_PARANETS) as string[]).includes(contextGraphId)) {
return false;
}
Expand Down
22 changes: 16 additions & 6 deletions packages/cli/src/api-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,14 +551,24 @@ export class ApiClient {
return this.get(`/api/context-graph/${encodeURIComponent(contextGraphId)}/participants`);
}

async registerKafkaEndpoint(request: {
contextGraphId: string;
broker: string;
topic: string;
messageFormat: string;
}): Promise<{
async registerKafkaEndpoint(
request:
| {
contextGraphId: string;
broker: string;
topic: string;
messageFormat: string;
}
| {
useLocalCg: true;
broker: string;
topic: string;
messageFormat: string;
},
): Promise<{
uri: string;
contextGraphId: string;
cgScope: 'local' | 'shared';
}> {
return this.post('/api/kafka/endpoint', request);
}
Expand Down
62 changes: 53 additions & 9 deletions packages/cli/src/cli.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env node

import { Command } from 'commander';
import { Command, Option } from 'commander';
import { readFileSync, existsSync } from 'node:fs';
import { createInterface } from 'node:readline';
import { spawn, execSync } from 'node:child_process';
Expand Down Expand Up @@ -1725,25 +1725,69 @@ const kafkaEndpointCmd = kafkaCmd
.command('endpoint')
.description('Kafka topic endpoint operations');

// ADR-0004: explicit local-vs-shared CG choice. The caller must pick
// `--cg <id>` (publish into a named shared CG) or `--local` (publish into the
// node-local "kafka-local" free CG, lazy-created on first use). Passing
// neither — or both — is rejected by commander before any network call so
// the user gets a clean error pre-network.
kafkaEndpointCmd
.command('register')
.description('Register a Kafka topic endpoint as a knowledge asset in a named context graph')
.requiredOption('--cg <id>', 'Target context graph')
.description('Register a Kafka topic endpoint as a knowledge asset in a context graph (named or kafka-local)')
// Mutual exclusion is wired declaratively via `Option#conflicts` so the
// pairing lives next to the option definition. commander throws before
// the action runs if both --cg and --local are passed.
.addOption(
new Option('--cg <id>', 'Target named context graph (mutually exclusive with --local)')
.conflicts('local'),
)
.addOption(
new Option('--local', 'Publish into the node-local "kafka-local" free CG (mutually exclusive with --cg)')
.conflicts('cg'),
)
.requiredOption('--broker <host:port>', 'Kafka broker host:port')
.requiredOption('--topic <name>', 'Kafka topic name')
.option('--format <mime>', 'Kafka message format MIME type', 'application/json')
.addHelpText(
'after',
'\nExactly one of --cg or --local must be passed. There is no implicit default.',
)
.action(async (opts: ActionOpts) => {
const cgId = typeof opts.cg === 'string' ? opts.cg : undefined;
const useLocal = opts.local === true;

// Mutual-exclusion is enforced at the parser level via Option#conflicts
// on the addOption() declarations above (commander throws before this
// action runs). The "neither" case is the only thing left to guard here:
// commander treats both options as optional, so we check that exactly
// one is set.
if (!cgId && !useLocal) {
console.error(
'Pass exactly one of "--cg <id>" (publish into a named shared CG) ' +
'or "--local" (publish into the local "kafka-local" free CG).',
);
process.exit(1);
}

try {
const client = await ApiClient.connect();
const result = await client.registerKafkaEndpoint({
contextGraphId: opts.cg,
broker: opts.broker,
topic: opts.topic,
messageFormat: opts.format,
});
const request = useLocal
? {
useLocalCg: true as const,
broker: opts.broker,
topic: opts.topic,
messageFormat: opts.format,
}
: {
contextGraphId: cgId!,
broker: opts.broker,
topic: opts.topic,
messageFormat: opts.format,
};
const result = await client.registerKafkaEndpoint(request);
console.log('Kafka endpoint registered:');
console.log(` URI: ${result.uri}`);
console.log(` Context graph: ${result.contextGraphId}`);
console.log(` CG scope: ${result.cgScope}`);
} catch (err) {
console.error(toErrorMessage(err));
process.exit(1);
Expand Down
54 changes: 54 additions & 0 deletions packages/cli/src/daemon/routes/kafka-adapters.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Adapters that bind the daemon's `DKGAgent` to the small interfaces the
// `@origintrail-official/dkg-kafka` package depends on. Kept on the CLI side
// because the agent type lives here; the kafka package stays agent-agnostic.
//
// Slice 02 only consumes these from `routes/kafka.ts`; later kafka routes
// (discover, revoke) will reuse them — one home, one responsibility each.

import type { DKGAgent } from '@origintrail-official/dkg-agent';
import type {
KafkaEndpointPublisher,
LocalCgPrimitive,
} from '@origintrail-official/dkg-kafka';

/**
* Wraps `agent.publish` with the daemon's `{ public: content }` envelope so
* the kafka package can stay envelope-agnostic.
*/
export function kafkaPublisherFromAgent(agent: DKGAgent): KafkaEndpointPublisher {
return {
async publish(contextGraphId, knowledgeAsset) {
await agent.publish(
Comment thread
zsculac marked this conversation as resolved.
contextGraphId,
{ public: knowledgeAsset } as Record<string, unknown>,
);
},
};
}

/**
* Adapts the agent's free-CG surface to the `LocalCgPrimitive` shape the
* kafka-local ensurer consumes. `callerAgentAddress` is threaded into
* `createContextGraph` so the create runs under the requesting agent.
*
* The `private: true` flag is hardcoded HERE — the kafka package never sees
* the boolean, so a future refactor cannot accidentally drop it. Without
* `private: true`, the agent auto-subscribes to the CG's gossip topic and
* broadcasts the CG definition (`dkg-agent.ts:3837`); the slice-02 spec
* requires kafka-local to be truly node-local, which means no gossip.
*/
export function kafkaLocalCgFromAgent(
agent: DKGAgent,
callerAgentAddress: string,
): LocalCgPrimitive {
return {
contextGraphExists: (id) => agent.contextGraphExists(id),
isPrivateContextGraph: (id) => agent.isPrivateContextGraph(id),
createPrivateContextGraph: (opts) =>
agent.createContextGraph({
...opts,
private: true,
callerAgentAddress,
}),
};
}
47 changes: 31 additions & 16 deletions packages/cli/src/daemon/routes/kafka.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import { jsonResponse, readBody, validateRequiredContextGraphId } from '../http-utils.js';
import { jsonResponse, readBody, isValidContextGraphId } from '../http-utils.js';
import type { RequestContext } from './context.js';
import {
createKafkaLocalCgEnsurer,
registerKafkaEndpoint,
type KafkaEndpointPublisher,
validateContextGraphSelection,
} from '@origintrail-official/dkg-kafka';
import {
kafkaLocalCgFromAgent,
kafkaPublisherFromAgent,
} from './kafka-adapters.js';

function isNonEmptyString(value: unknown): value is string {
return typeof value === 'string' && value.trim().length > 0;
Expand All @@ -29,15 +34,26 @@ export async function handleKafkaRoutes(ctx: RequestContext): Promise<void> {

const {
contextGraphId,
useLocalCg,
broker,
topic,
messageFormat,
} = parsed as Record<string, unknown>;

if (!validateRequiredContextGraphId(contextGraphId, res)) {
return;
// ADR-0004: explicit local-vs-shared CG choice. We surface the pure
// validator's error message via the daemon's standard 400 envelope.
let selection;
try {
selection = validateContextGraphSelection({ contextGraphId, useLocalCg });
} catch (err: unknown) {
const message = err instanceof Error ? err.message : String(err);
return jsonResponse(res, 400, { error: message });
}

if (selection.kind === 'shared' && !isValidContextGraphId(selection.contextGraphId)) {
return jsonResponse(res, 400, { error: 'Invalid "contextGraphId"' });
}
const targetContextGraphId = contextGraphId as string;

if (!isNonEmptyString(broker)) {
return jsonResponse(res, 400, { error: '"broker" must be a non-empty string' });
}
Expand All @@ -48,22 +64,21 @@ export async function handleKafkaRoutes(ctx: RequestContext): Promise<void> {
return jsonResponse(res, 400, { error: '"messageFormat" must be a non-empty string' });
}

const publisher: KafkaEndpointPublisher = {
async publish(cgId, content) {
await agent.publish(
cgId,
{ public: content } as Record<string, unknown>,
);
},
};

const result = await registerKafkaEndpoint({
contextGraphId: targetContextGraphId,
selection,
owner: requestAgentAddress.toLowerCase(),
broker,
topic,
messageFormat,
publisher,
publisher: kafkaPublisherFromAgent(agent),
// Fresh ensurer per request; hoisting per-agent is a deferred optimization
// (the agent's "already exists" guard plus the exists-check make repeats cheap).
// The peer-id scopes the kafka-local CG id per node — `kafka-local-{peerId}` —
// so two nodes cannot collide on the literal "kafka-local" id.
ensureLocalCg: createKafkaLocalCgEnsurer(
kafkaLocalCgFromAgent(agent, requestAgentAddress),
agent.peerId,
),
});

return jsonResponse(res, 200, result);
Expand Down
Loading
Loading