Skip to content

[kafka pr-1 · 05/6] Slice 05 — Endpoint list, get, soft-revoke, re-verify#395

Open
zsculac wants to merge 17 commits intofeat/kafka-probefrom
feat/kafka-list-revoke-verify
Open

[kafka pr-1 · 05/6] Slice 05 — Endpoint list, get, soft-revoke, re-verify#395
zsculac wants to merge 17 commits intofeat/kafka-probefrom
feat/kafka-list-revoke-verify

Conversation

@zsculac
Copy link
Copy Markdown
Contributor

@zsculac zsculac commented May 5, 2026

Stacked on #393 (slice 04 — opportunistic Kafka probe). Merge order: 393 → this. When 393 merges into feat/kafka-walking-skeleton, retarget this PR there.

Summary

Rounds out the endpoint side of the kafka registry with full lifecycle support:

  • GET /api/kafka/endpoint?contextGraphId=X[&status=active|revoked|all] — list endpoints in a CG (default status=active excludes revoked).
  • GET /api/kafka/endpoint/<urlencoded-uri>?contextGraphId=X — single fetch, returns revoked KAs unconditionally.
  • DELETE /api/kafka/endpoint/<urlencoded-uri>?contextGraphId=X — soft-revoke via the V10 publisher update flow. Mutate-by-add: existing KA properties survive; dkg:status "revoked" + dkg:revokedAt added. NOT a delete-and-recreate — KA history preserved per ADR-0004.
  • POST /api/kafka/endpoint/verify — re-verify with fresh credentials. Reuses slice 04's kafka-probe. Probe failure recorded on the KA (verificationStatus = "failed"), not 4xx.
  • CLI: dkg kafka endpoint list/show/revoke/verify with the same security/PEM flags as register.
  • api-client: listKafkaEndpoints, getKafkaEndpoint, revokeKafkaEndpoint, verifyKafkaEndpoint.

After this slice the endpoint side is feature-complete except for scope enforcement (slice 06).

Notable design moves

  • agent.update gains a URI-keyed JSON-LD overload (agent.update(rootEntityUri, contextGraphId, { public: ka })). The route adapter is now a one-liner. URI→kcId resolution lives in packages/agent/src/dkg-agent-utils.ts (resolveKcIdByRootEntity) — pure, mock-store testable, reusable by slice 07's subscription revoke.
  • Strict URI validator (isValidKafkaEndpointUri / assertValidKafkaEndpointUri in packages/kafka/src/uri.ts) gates every SPARQL interpolation site. Regex /^urn:dkg:kafka-endpoint:[a-z0-9._-]+:[0-9a-f]{64}$/ derived from the canonical URI builder shape — rejects every char that would break out of <...> IRI position. Defence-in-depth: validators at both route and package layers.
  • Mutate-by-compose strategy. composeKafkaEndpointKnowledgeAsset reads the existing KA, applies an overlay (revocation status / new verifiedAt), publishes the full new KA via publisher.update. Predictable shape; tests pin idempotency on double-revoke.
  • Per-verb route dispatch. packages/cli/src/daemon/routes/kafka.ts split into focused per-verb handlers + a single extractEndpointUri helper for path parsing (sub-path rejection centralised).

Spec deviation (accepted)

Single-URI GET and DELETE /api/kafka/endpoint/<uri> require ?contextGraphId=X. The original ticket spec didn't mention it; the per-CG SPARQL named-graph store needs the CG scope to find the URI. CLI/api-client coherent. Slice 07 subscriptions will hit the same constraint.

ADR alignment

  • ADR-0001 (kafka package writes metadata only): re-verify uses the same one-shot describeTopics probe as slice 04. No consumer, no offsets.
  • ADR-0002 (verification is opportunistic): re-verify is the on-demand verb. Creds present → probe and update; creds absent → 400 (verify with no creds is meaningless).
  • ADR-0004 (soft-revoke preserves KA history): KA stays in its CG; dkg:status "revoked" + dkg:revokedAt added.

Test plan

  • pnpm --filter @origintrail-official/dkg-kafka test — 90 unit/integration pass, 4 e2e gated on DKG_KAFKA_E2E=1
  • pnpm --filter @origintrail-official/dkg-agent exec vitest run test/resolve-kcid.test.ts — 9/9
  • pnpm --filter @origintrail-official/dkg-cli test (kafka-cli-smoke + kafka-route-parsers) — 35/35
  • DKG_KAFKA_E2E=1 pnpm --filter @origintrail-official/dkg-kafka exec vitest run test/e2e/ against fresh devnet — 4/4 (slice-04 + slice-05 lifecycle scenarios)
  • Workspace build — 21/21
  • Coverage packages/kafka/src/**: 100/98.54/100/100 (lines/branches/funcs/stmts) — ratchet floor 98 held
  • Type-check clean across kafka/agent/cli
  • SPARQL-injection tests pin the canonical payload urn:dkg:kafka-endpoint:foo:bar> } UNION ... rejected at every entry point

Out of scope (deliberately not in this PR)

Follow-ups (deferred from review)

Quality items intentionally deferred to slice 07 review or follow-up cleanup, where the pattern's reuse will force the question:

  • Drop redundant URI param from KafkaEndpointPublisher.update signature.
  • composeKafkaEndpointKnowledgeAsset silently drops unknown KA properties — pin with a test or document.
  • Replace regex "not found" → 404 with a typed KafkaEndpointNotFoundError.
  • handleVerify double-reads the same KA.
  • KafkaEndpointSummary duplicated kafka↔api-client.
  • 'unattempted' value in api-client verify response type (impossible).
  • Misc nits (centralise stripTypedLiteral, validation order, defensive owner-uri fallback).

Caveats for slice 07

  • Lifecycle verbs require a confirmed KC; during the V10 "tentative" window dkg:batchId is absent in _meta and agent.update(uri, ...) correctly errors with Publish the KA before calling update(). Subscription revoke/verify will inherit this — document in operator-facing CLI help.
  • Pre-existing flake: kafka unit + integration suites race for testcontainers ports under file-parallelism. Workaround --no-file-parallelism. Worth a follow-up.

🤖 Generated with Claude Code

Zvonimir and others added 8 commits May 5, 2026 10:38
Adds the pure-package surface for slice 05's lifecycle verbs. Every new verb
takes the same dependency-inversion shape as slice 01/04's
`registerKafkaEndpoint`: a `KafkaEndpointPublisher` for mutations and a new
`KafkaEndpointQueryEngine` for SPARQL reads. The package never opens a
SPARQL or chain connection of its own — the route adapter implements both
interfaces.

- `listKafkaEndpoints({ contextGraphId, status?, queryEngine })` with
  `status='active'` (default — excludes revoked KAs via `FILTER NOT EXISTS`),
  `'revoked'`, or `'all'`.
- `getKafkaEndpoint({ contextGraphId, uri, queryEngine })` returns a single
  endpoint regardless of revocation state, including `dkg:status` and
  `dkg:revokedAt` when revoked.
- `revokeKafkaEndpoint(...)` reads the existing properties, composes the
  full new KA with `dkg:status "revoked"` + `dkg:revokedAt` added, and hands
  it to `publisher.update(cgId, uri, ka)` (the V10 update flow). NOT
  delete-and-recreate — KA history is preserved per ADR-0004.
- `verifyKafkaEndpoint(...)` mirrors revoke but updates `verifiedAt` and
  `verificationStatus` from a fresh probe outcome. Failure is recorded on
  the KA, never thrown — the verb's contract is "tell me what the broker
  says, write it down" (ADR-0002, on-demand re-verification).
- `buildKafkaEndpointRevocationMutation(uri, ts)` is the partial-builder
  shape the PRD calls for; the orchestrator composes it with existing
  properties before calling the publisher.

Coverage ratchet bumped from 96 → 98 branches; new code is at 100% on every
metric. The two surviving uncovered branches live in slice-04 `kafka-probe.ts`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…client

HTTP surface:
- GET /api/kafka/endpoint?contextGraphId=X[&status=...] — list, default
  status=active (excludes revoked KAs).
- GET /api/kafka/endpoint/<urlencoded-uri>?contextGraphId=X — single fetch,
  returns revoked endpoints regardless of any status param.
- DELETE /api/kafka/endpoint/<urlencoded-uri>?contextGraphId=X — soft-revoke
  via the V10 publisher update flow (mutate-by-add-only). The route adapter
  resolves the URI to its kcId via a `_meta` SPARQL query
  (`?ka dkg:rootEntity <uri> ; dkg:partOf ?kc . ?kc dkg:batchId ?bid`),
  converts the new full KA's JSON-LD to quads via the freshly re-exported
  `jsonLdToQuads` helper, and calls `agent.update(kcId, cgId, quads)`.
- POST /api/kafka/endpoint/verify — re-verify with fresh creds. Body
  `{ contextGraphId, uri, securityProtocol?, sasl?, ssl?, broker?, topic? }`.
  Broker / topic / securityProtocol default to the recorded KA values when
  omitted. Verify with no creds at all is rejected as 400 (ADR 0002:
  unattempted is OK on register but meaningless on re-verify).

CLI:
- `dkg kafka endpoint list --cg X [--status active|revoked|all]`
- `dkg kafka endpoint show <uri> --cg X`
- `dkg kafka endpoint revoke <uri> --cg X`
- `dkg kafka endpoint verify <uri> --cg X --username … --password …`
  (plus the same --security-protocol / --ca-pem-path / --cert-pem-path /
  --key-pem-path flags as register).

api-client:
- `listKafkaEndpoints`, `getKafkaEndpoint`, `revokeKafkaEndpoint`,
  `verifyKafkaEndpoint` mirror the existing `registerKafkaEndpoint` style.
- Adds a `delete<T>` HTTP helper alongside `get`/`post`/`postForm`.

Other:
- `parsers/kafka-request.ts` gains a `KafkaEndpointVerifyRequestBody` type
  and a `hasAnyKafkaCredentials` precondition gate.
- The dispatcher in `routes/kafka.ts` is split into per-verb handlers; URI
  parsing is centralised in `extractEndpointUri` so every verb that takes a
  `/<urlencoded-uri>` path goes through the same `safeDecodeURIComponent`
  guard.

Tests:
- 8 new `hasAnyKafkaCredentials` parser tests (new branch coverage).
- 6 new `kafka-cli-smoke` tests (list / show / revoke / verify, including
  URL-encode round-trip and PEM-file shipping).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extends `walking-skeleton.test.ts` with the slice 05 lifecycle paths:

- `register → revoke → list-active excludes → list-all includes → single-fetch
  returns revoked KA`. Pins the load-bearing default-active filter behaviour
  against a real CG that contains both an active and a freshly-revoked KA.

- `register with creds → re-verify with same creds → SPARQL confirms verifiedAt
  was updated`. Sleeps 1.1s between register and verify so the new
  `dkg:verifiedAt` is observably later (the probe stamps to ms precision).

Both scenarios share `waitForRevokedRow` / `waitForVerifiedAtAdvance` polling
helpers — the V10 update flow lands in the data graph asynchronously after
the chain ack, same as the slice 04 register path.

The file isn't renamed to `endpoint-lifecycle.test.ts` (which would arguably
fit the broader scope better) — leaving that for a separate PR so the slice
05 diff stays self-contained.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…outes

Move the "no nested path beyond /<uri>" guard into `extractEndpointUri` so
GET (single fetch), DELETE (revoke), and any future verb that takes a
path-encoded URI all reject sub-paths consistently. Encodes a small fact
about URN syntax: a `urn:dkg:kafka-endpoint:…` URI never contains an
unencoded `/`, so an extra path segment means the route is undefined.

Behaviour: `GET /api/kafka/endpoint/<uri>/foo` → 404, was 200/500 depending
on which sub-handler grabbed the request.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The URI was interpolated raw into SPARQL IRI positions at three sites:
- `getKafkaEndpoint`: `BIND(<${uri}> AS ?endpoint)` (kafka package)
- `resolveKcIdForUri`: `?ka <…rootEntity> <${uri}>` (route adapter)
- the route's `extractEndpointUri` only checked `startsWith` for the prefix.
- `handleVerify` had no URI shape check at all — only `isNonEmptyString`.

A payload like `urn:dkg:kafka-endpoint:foo:bar> } UNION { ?ka <p> ?o BIND(<x`
satisfied both `startsWith` and `isNonEmptyString`, then closed the IRI early
and spliced an arbitrary graph pattern into the executed SPARQL.

Fix: a strict regex validator co-located with the URI builder in
`packages/kafka/src/uri.ts`. The shape `urn:dkg:kafka-endpoint:<owner>:<sha256-hex-64>`
is fully constrained by `buildKafkaEndpointUri`'s output, so a tight regex is
both correct and minimal:

  /^urn:dkg:kafka-endpoint:[a-z0-9._-]+:[0-9a-f]{64}$/

Wired in:
- `assertValidKafkaEndpointUri(uri)` at the entry of `getKafkaEndpoint` —
  defence-in-depth so any future caller of the package gets the same
  protection. `revokeKafkaEndpoint` and `verifyKafkaEndpoint` inherit it
  transitively (both call `getKafkaEndpoint` first).
- `isValidKafkaEndpointUri(decoded)` replaces the loose `startsWith` check
  in `extractEndpointUri` (route, path-based verbs).
- `isValidKafkaEndpointUri(uri)` added to `handleVerify` (route, body-based).

The `contextGraphId` was already gated by `validateRequiredContextGraphId`
(`packages/core/src/constants.ts`), which enforces `/^[\w:/.@\-]+$/` — `<`,
`>`, `}`, whitespace are already rejected. The new `GRAPH <${metaGraph}>`
interpolation site in `resolveKcIdForUri` therefore inherits a safe shape;
no additional change needed.

Tests:
- 11 new `isValidKafkaEndpointUri` / `assertValidKafkaEndpointUri` tests in
  `packages/kafka/test/uri.test.ts` — accepts canonical shape, rejects
  non-strings, missing prefix, empty owner, wrong hash length / casing /
  hex-set, every SPARQL-IRI-breaking char in the owner, and the worked
  injection payload from the review brief.
- `getKafkaEndpoint`, `revokeKafkaEndpoint`, `verifyKafkaEndpoint`: one new
  rejection test each, asserting (a) the validator throws and (b) no SPARQL
  query / publisher.update was issued.
- Existing test fixtures updated from informal URIs (`…:0xowner:abc123`) to
  canonical-shape URIs (`…:0xowner:<64 'a's>`) so they pass validation
  alongside the new rejection assertions.

Coverage: 100/98.54/100/100 lines/branches/funcs/stmts — slightly above the
98 floor, no ratchet change needed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The V10 update flow previously took a `bigint` `kcId` (the on-chain
batchId) plus raw `Quad[]`. Callers that knew a KA's URI but not its
internal kcId — every route adapter that hands the user an opaque URN —
had to reach into agent internals: a SPARQL query against the CG's `_meta`
graph for the KA's kcId, plus `jsonLdToQuads` to convert the document.

This commit lifts both into the agent so route handlers stay one line:

  // Before (slice 05 baseline):
  const kcId = await resolveKcIdForUri(ctx, cgId, uri);   // route helper
  const { publicQuads } = await jsonLdToQuads({ public: ka });  // re-export
  await agent.update(kcId, cgId, publicQuads);

  // After:
  await agent.update(uri, cgId, { public: ka });

The new overload mirrors the existing JSON-LD overload on `agent.publish`.
Discrimination is by first-arg type: `bigint` → original kcId-keyed path,
`string` → resolve-and-update.

The resolution itself is extracted as a pure helper:

  resolveKcIdByRootEntity(store, contextGraphId, rootEntityUri)
    → Promise<bigint | null>

Co-located with the existing JSON-LD / N-Quads helpers in
`dkg-agent-utils.ts` (already imports from `dkg-publisher` /
`dkg-storage` / `dkg-core`). The helper is exported so slice 07
subscription routes — which face the same URI-vs-kcId asymmetry — can
copy-paste a one-liner. Inputs are validated before SPARQL interpolation
(`validateContextGraphId` for the CG id, `assertSafeIri` for the URI).

Tests:
- 9 new unit tests in `packages/agent/test/resolve-kcid.test.ts` exercising
  the helper with a tiny mock `TripleStore`: bigint result on match, both
  binding shapes (`"42"^^xsd:integer` and SPARQL-JSON object), null on
  miss / missing batchId / unparseable bigint / non-bindings result, and
  rejection of unsafe contextGraphId / SPARQL-IRI-breaking URI inputs
  before any query is issued.

The existing `agent.update(kcId, cgId, quads)` callers — `agent-chat.ts`
route, plus the two e2e tests in `e2e-flows.test.ts` — keep working
unchanged: the original signature is preserved as the first overload.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three things go away in the route adapter:

1. The 38-line `resolveKcIdForUri(ctx, cgId, uri)` SPARQL helper —
   replaced by the new `agent.update(uri, cgId, content)` URI-keyed
   overload, which calls the same `resolveKcIdByRootEntity` from
   `dkg-agent-utils` internally. The helper depends on `RequestContext`
   and was untestable in unit form; the agent-side equivalent is unit-
   tested via a tiny mock `TripleStore`.

2. The `jsonLdToQuads` re-export from `packages/agent/src/index.ts` —
   nobody outside the agent package imports it now (`git grep` confirms
   the kafka route was the only consumer). The agent's URI-keyed update
   overload converts JSON-LD internally, mirroring the existing
   `agent.publish(cgId, content)` JSON-LD path.

3. The local `DKG_ONTOLOGY` constant + `stripTypedLiteral` helper that
   only existed to support `resolveKcIdForUri`.

Net change in `packages/cli/src/daemon/routes/kafka.ts`: -73 lines. The
update implementation in `buildKafkaEndpointPublisher` is now a one-liner:

  async update(cgId, uri, content) {
    await agent.update(uri, cgId, { public: content as object });
  }

Slice 07 (subscriptions) will reuse the same agent overload — same kcId
lookup against the same `_meta` graph for `urn:dkg:kafka-subscription:`
URIs — without copy-pasting any of the route adapter's plumbing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The slice-05 review (I4) flagged `buildKafkaEndpointRevocationMutation`,
the `KafkaEndpointRevocationMutation` type alias, and the
`KAFKA_ENDPOINT_REVOCATION_CONTEXT` constant as dead code: the production
revoke path uses `composeKafkaEndpointKnowledgeAsset` (in `endpoint.ts`),
which inlines the same overlay onto the existing KA's properties. The
mutation builder was only ever exercised by its own tests.

Drops:
- `buildKafkaEndpointRevocationMutation` + type alias + context constant
  in `packages/kafka/src/ka-builder.ts`
- Four test cases in `packages/kafka/test/ka-builder.test.ts`
- The golden fixture `packages/kafka/test/fixtures/endpoint-ka-revocation.json`

`git grep` confirms zero remaining references in the repo.

Coverage stays at 100/98.54/100/100 — `composeKafkaEndpointKnowledgeAsset`
already had full coverage from `endpoint.revoke.test.ts` and
`endpoint.verify.test.ts`, so no new tests were needed to compensate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@zsculac zsculac force-pushed the feat/kafka-list-revoke-verify branch from f3cf099 to 7cf5fb2 Compare May 5, 2026 08:52
Comment thread packages/cli/src/daemon/routes/kafka.ts Outdated
Comment thread packages/agent/src/dkg-agent-utils.ts Outdated
Comment thread packages/kafka/src/endpoint.ts Outdated
Zvonimir and others added 3 commits May 5, 2026 11:14
The rebase from slice 04 introduced a regression in `handleVerify`: the
shape-consistency gate (`validateKafkaAuthConsistency`) and the creds-
required gate (`hasAnyKafkaCredentials`) ran on the BARE request body, BEFORE
`getKafkaEndpoint` loaded the existing KA. The verify route is documented to
default broker / topic / securityProtocol from the recorded values when the
caller omits them, so legitimate inputs were 400'd by the gates because the
defaulting hadn't happened yet:

- `POST /api/kafka/endpoint/verify { uri, sasl: {...} }` against a stored
  SASL_SSL endpoint → 400 "sasl without securityProtocol" (was supposed to
  succeed by inheriting SASL_SSL from the KA).
- URI-only re-verify of a stored PLAINTEXT endpoint → 400 "Re-verify
  requires credentials" (was supposed to succeed because PLAINTEXT-as-
  reachability-probe satisfies the creds-required gate).

Fix: move both gates to AFTER the KA load and run them against the
EFFECTIVE values (`body.X ?? existing.X`). Genuine misconfig still 400s
(sasl block with neither body nor stored protocol; no creds with a stored
KA that records nothing).

Side effect: `verifyKafkaEndpoint` now accepts an optional pre-fetched
`existing` snapshot and skips its own SPARQL read when the caller supplies
one — the route does, so this saves a round-trip on every verify call.
This is the I6 deferred item; it's a consequential change of Bug 1's fix
(the route now needs `existing` early to compute the effective values, so
not threading it through would mean two reads per verify).

Tests:
- New `packages/cli/test/kafka-route-verify.test.ts` — 5 route-level tests
  using a mocked agent and constructed `RequestContext`. Pins the load-
  bearing case (URI-only re-verify of stored PLAINTEXT → not 400) and the
  regression-bound cases (no protocol AND no creds AND no stored protocol
  → still 400; sasl block without effective protocol → still 400; missing
  KA → 404).
- Extended `packages/kafka/test/endpoint.verify.test.ts` with one new test
  for the `existing`-passthrough path: when the caller supplies `existing`,
  the function makes ZERO `queryEngine.query` calls and uses the supplied
  snapshot as the basis for the composed update.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ntity

The resolver's SPARQL ended with `LIMIT 1`, which made multi-match results
nondeterministic at the SEMANTIC layer (even though deterministic from the
SPARQL execution perspective). V10's `publish()` can produce multiple KCs
sharing the same `dkg:rootEntity` URI in the same context graph (republish-
after-prune races, manual data import, etc.). With `LIMIT 1` and no `ORDER
BY`, the helper returned whichever the store happened to enumerate first,
so `agent.update(uri, ...)` could land its mutation on the original instead
of the latest collection (or vice versa) without ever flagging the
ambiguity. `revoke` and `verify` would silently mutate the wrong KC.

Fix:
- Drop `LIMIT 1`. Fetch all matches (cost: at most one extra binding row in
  the common 1-match case).
- 0 matches → return `null` (existing contract).
- 1 match → return that kcId (existing contract).
- 2+ matches → throw a new typed `AmbiguousRootEntityError` carrying the
  URI, the CG, and the match count. Route adapters can map this to HTTP
  409 Conflict.

The new error type is exported from the agent package alongside
`resolveKcIdByRootEntity`, so route handlers can `instanceof`-discriminate
without parsing the message string. The agent's `update(uri, ...)`
overload already lets `null` propagate as a "publish first" error; the
typed ambiguity error follows the same pattern.

Tests:
- 3 new tests in `packages/agent/test/resolve-kcid.test.ts`:
  - 2+ matches → throws `AmbiguousRootEntityError` (the load-bearing case)
  - error message + typed fields name URI / CG / count (operator
    debugging contract)
  - 1-match path still resolves cleanly (regression guard for the SPARQL
    shape change)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`ownerFromPublisherUri`'s "defensive fallback" handed unknown publisher
URIs through verbatim to `buildKafkaEndpointKnowledgeAsset`, which
lower-cased and baked them into `urn:dkg:kafka-endpoint:<owner>:<hash>`
as the rebuilt KA's `@id`. When the publisher URN didn't match the two
canonical shapes (`urn:dkg:agent:` / `did:dkg:agent:`), the rebuilt `@id`
DIFFERED from `existing.uri`. `agent.update(existing.uri, ...)` still
resolved to the original kcId and wrote the new triples there — but the
triples carried a DIFFERENT subject URI. Original subject's triples got
orphaned; new subject's triples coexisted confusingly inside the original
kcId. Silent mutation, hard to detect, hard to recover from.

Codex re-raised this as 🔴 Bug despite low probability in normal V10
deployments — the silent-mutation pathway is real and easy to eliminate by
failing closed.

Fix:
- New typed error `KafkaEndpointPublisherFormatError` exported from the
  kafka package, carrying the offending publisher URN as `e.publisher`
  for programmatic introspection.
- `ownerFromPublisherUri` throws on unrecognised shapes instead of
  returning the raw string. The publisher URN is non-secret operational
  metadata (it's a public agent address), so the throw message can echo
  it for operator debugging.
- The two existing canonical shapes (`urn:dkg:agent:` and
  `did:dkg:agent:`) keep working unchanged — both `revoke` and `verify`
  paths exercised by the tests above pass through cleanly.

Tests:
- Replaced the existing "defensive fallback" test in
  `packages/kafka/test/endpoint.revoke.test.ts` with two new tests:
  - 2+ matches throws `KafkaEndpointPublisherFormatError`, no
    publisher.update call (the silent-mutation pathway is shut)
  - error message + typed `publisher` field name the offending URN
- The earlier test name and assertion ("hands an unrecognised publisher
  URI shape through verbatim (defensive fallback)") explicitly described
  the broken behaviour. Replaced with a name + comment that explains why
  failing closed is the right call.
- `git grep` confirms zero remaining references to the old soft behaviour.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread packages/agent/src/dkg-agent.ts Outdated
Comment thread packages/kafka/src/endpoint.ts Outdated
Zvonimir and others added 3 commits May 5, 2026 12:10
…erload

The URI-keyed overload `agent.update(rootEntityUri, contextGraphId, content)`
resolved `rootEntityUri → kcId` and then wrote whatever quads the JSON-LD
content carried — without verifying the quads actually described
`rootEntityUri`. So a caller that supplied JSON-LD with:
- A different `@id` (typo, stale builder), or
- No `@id` (jsonLdToQuads synthesises a `urn:dkg:private:<uuid>` anchor on
  private-only payloads), or
- Mixed subjects under one update call,

would have its triples written into the kcId resolved from `rootEntityUri`,
but carrying a DIFFERENT subject. The original subject's triples become
stale; the new subject's triples coexist confusingly inside the same kcId.

Slice 05's compose path doesn't trip this today (Bug 3's fix made
`ownerFromPublisherUri` fail closed, so `composeKafkaEndpointKnowledgeAsset`
always produces `@id === existing.uri`). But the agent overload is the
reusable primitive that slice 07 (subscriptions) and other future callers
will inherit — defence belongs at the agent boundary, not solely on slice
05's caller.

Fix:
- New pure helper `assertJsonLdRootMatches(expectedUri, publicQuads,
  privateQuads)` in `packages/agent/src/dkg-agent-utils.ts`. Throws a new
  typed `RootEntityMismatchError` (also exported) when the (publicQuads,
  privateQuads) don't all share a single subject IRI equal to `expectedUri`.
  Carries `expected: string` and `actual: string[]` (sorted, deduped) for
  programmatic introspection.
- Wired into the URI-keyed branch of `agent.update` BEFORE `_update` is
  invoked: after `jsonLdToQuads(...)` resolves, before the kcId-keyed
  dispatch.
- The kcId-keyed overload (`update(kcId: bigint, ...)`) does NOT get this
  validation — kcId callers manage their own subject discipline and we
  don't second-guess them.

Decision (deliberate): the helper is strict. Mixed-subject content under a
single rootEntity update is suspicious even when `expectedUri` IS one of
the subjects. If a real use case for "child triples under different
subjects but anchored to the same rootEntity" emerges, the rule can be
widened — but the conservative posture today eliminates the silent-mutation
hole entirely.

Tests:
- New `packages/agent/test/agent-update-overload.test.ts` — 10 unit tests
  for the helper covering: happy path (single subject equals expected,
  public-only / private-only / both); throws on mismatched single subject,
  zero subjects, mixed subjects (even when expected is among them),
  synthetic anchors (`urn:dkg:private:`), and blank nodes (`_:b1`); error
  message + typed fields name expected and actual; actual list is sorted +
  deduplicated.
- Slice 05's compose-path tests (`endpoint.revoke.test.ts`,
  `endpoint.verify.test.ts`) continue to pass: the canonical builder
  always produces `@id === existing.uri`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`getKafkaEndpoint` and `listKafkaEndpoints` wrapped every BGP in
`GRAPH ?g { ... }`, on the (then-believed) basis that the daemon engine
needed an explicit named-graph pattern to find published triples. Codex
correctly noticed that's backwards: `DKGQueryEngine.wrapWithGraph` (in
`packages/query/src/dkg-query-engine.ts:431`) bails out as soon as the
SPARQL contains the literal "graph " (case-insensitive) — its whole job
is to AUTO-wrap the WHERE in `GRAPH <did:dkg:context-graph:${cgId}> { ... }`
when no explicit GRAPH is present.

Net effect of the previous wrapper: the auto-wrap was suppressed, the
query became a wildcard scan across every named graph in the store, and
because endpoint URIs are content-addressed (`urn:dkg:kafka-endpoint:
<owner>:<sha256(broker|topic)>`) the same broker/topic registered in two
CGs collapsed to ONE URI. `getKafkaEndpoint(uri, contextGraphId: A)`
then returned whichever row Oxigraph happened to enumerate first —
silently mutating the wrong CG's KA on `revokeKafkaEndpoint` /
`verifyKafkaEndpoint`.

Slice 04's commit `348ffd19` introduced the wrapper based on the
mis-diagnosis that unwrapped queries returned zero bindings; the actual
cause of that test's silence was the second issue the same commit fixed
(the `result.type === 'bindings'` check). The wrapper has been a latent
bug since.

Fix (Option B1): drop `GRAPH ?g` from both `listKafkaEndpoints` and
`getKafkaEndpoint`. Let `agent.query(sparql, contextGraphId)` auto-wrap.
The query is then scoped to the per-CG data graph by the engine; no
kafka-package knowledge of the named-graph URI layout required.

Tests:
- `endpoint.list.test.ts` and `endpoint.get.test.ts`: the two tests that
  asserted the OLD (broken) `/GRAPH\s+\?g\s*\{/` pattern are flipped to
  assert the SPARQL does NOT contain "graph " (case-insensitive — same
  check the engine uses). Comments updated to explain why the previous
  pin was wrong.
- The unit-test mock (`KafkaEndpointQueryEngine`) does NOT model the
  auto-wrap; the unit tests can't catch CG-scoping bugs by themselves.
  Real-store coverage lives in the new
  `test/integration/cg-isolation.test.ts` (separate commit).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Real-store integration coverage for Codex Bug B's fix. The unit-test mock
(`KafkaEndpointQueryEngine`) doesn't model `DKGQueryEngine`'s `wrapWithGraph`
auto-scope, so it can't catch a regression that lets `getKafkaEndpoint` /
`listKafkaEndpoints` scan every named graph in the store. This test stands
up an in-memory `OxigraphStore` + a real `DKGQueryEngine`, inserts two
endpoint KAs that share the same content-addressed URI in different per-CG
data graphs, and asserts the package returns ONLY the requested CG's row.

Mirrors the route adapter's wiring shape (`buildKafkaEndpointQueryEngine`
in `packages/cli/src/daemon/routes/kafka.ts`) so we exercise the same code
path the production daemon uses, not a hand-rolled mock.

Three scenarios:
- `getKafkaEndpoint`: same URI in CG-A and CG-B → query for CG-A returns
  A's broker/topic, never B's.
- `listKafkaEndpoints`: distinct endpoints in CG-A and CG-B → list for CG-A
  returns exactly one endpoint (A's), not two.
- `getKafkaEndpoint` against an empty CG: URI exists in CG-B → query for
  CG-A returns null, never B's row pretending to belong to CG-A.

Pre-fix all three either failed deterministically or were
order-dependent (single-row case). Post-fix all three pass.

Adds `@origintrail-official/dkg-query` and `@origintrail-official/dkg-storage`
as kafka-package dev deps so the test can construct a real engine + store
without pulling them into the runtime surface.

No daemon, no chain, no kafkajs — just the SPARQL plumbing the bug lives in.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread packages/agent/src/dkg-agent.ts Outdated
Comment thread packages/cli/test/kafka-route-verify.test.ts Outdated
Zvonimir and others added 2 commits May 5, 2026 12:57
…nested + synthetic anchors

Codex Round 3 noticed Round 2's `assertJsonLdRootMatches` was too strict.
The original helper inspected POST-conversion quads and required EVERY
subject to equal `keyOrUri`, which over-rejected two legitimate JSON-LD
patterns that `publish()` already accepts:

1. Private-only payloads (`{ private: { "x:secret": "S" } }`) —
   `jsonLdToQuads` mints a `urn:dkg:private:<uuid>` synthetic anchor on
   the public side. Anchor URN definitionally can't equal the supplied
   rootEntity URI → old rule rejected.

2. Nested entities (`{ "@id": uri, "dkg:hasMeta": { "@id": "urn:meta:1",
   "x:label": "L" } }`) — emits two distinct subjects after conversion,
   both correct. Old "all subjects must match" rule rejected even though
   the rebuilt KA was correctly rooted.

The actual hazard Round 2 was about (typo'd `@id` writes to wrong
subject) is narrower: the user supplied a top-level `@id` that doesn't
match the URI they passed to `update()`. Inspecting only that — not
every emitted quad — catches the hazard without over-rejecting.

Fix:
- New `assertJsonLdContentRootMatches(expectedUri, content)` operates on
  the JsonLdContent shape BEFORE `jsonLdToQuads` runs (so a malformed
  `@id` 422s without paying the conversion cost).
- Validation rule: collect every TOP-LEVEL `@id` on either envelope side
  (or on the bare doc), skip those without `@id` (synthetic-anchor
  case), skip those with `^urn:dkg:private:` shape (synthetic-anchor
  convention is opaque, not forbidden as user input), and throw
  `RootEntityMismatchError` if any remaining `@id` ≠ `expectedUri`.
- Nested entities under a top-level resource are intentionally NOT
  checked — they're legitimate JSON-LD child resources.
- Array forms are walked element-by-element; any element with a wrong
  top-level `@id` triggers the throw (per the brief: "the wrong sibling
  is the issue").
- The kcId-keyed overload (`update(kcId: bigint, ...)`) does NOT use
  this gate — kcId callers manage their own subject discipline.

`RootEntityMismatchError` keeps the same `expected: string` /
`actual: string[]` typed-field shape; only the message wording was
narrowed from "subject" to "top-level @id" to match the new semantics.

The old `assertJsonLdRootMatches` helper (quad-based) was deleted —
`git grep` confirmed the only consumer was the agent.update call site.

Tests:
- Rewrote `packages/agent/test/agent-update-overload.test.ts` (20 tests)
  to cover the new semantics:
  - Happy paths: matching top-level @id (public, private, both); nested
    entity with own @id passes; missing @id passes (synthetic anchor);
    private-only with synthetic anchor passes; bare doc with matching
    @id; bare doc with no @id; explicit `urn:dkg:private:*` @id passes;
    empty envelope passes.
  - Array forms: all-matching passes; mixed @id-and-no-@id passes;
    one mismatched sibling throws; bare-doc array passes.
  - Mismatch paths: mismatched public @id throws; mismatched private @id
    throws; bare-doc mismatched @id throws; typed-error fields name
    expected and actual; actual list deduplicates; non-string @id
    throws cleanly.
- Slice 05 compose path keeps passing — the canonical builder always
  produces `@id === existing.uri` (verified via the existing
  endpoint.revoke.test.ts and endpoint.verify.test.ts).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…test

Codex Issue B2: `expect(updateCalls.length).toBeGreaterThanOrEqual(0)` is
always true (`.length` is never negative). The flagged test passed even
if the route 400'd early before reaching the agent.update path —
defeating the entire intent of the Bug 1 regression guard.

Also caught (Codex flagged one but a sibling check had a similar weak
shape): the second test (URI-only re-verify of stored PLAINTEXT) only
asserted `not.toBe(400)`, which would let a 500 through silently.

Fix:
- Test 1 (no body protocol, stored PLAINTEXT defaulting): replace the
  tautological length-check with concrete `expect(updateCalls).toHaveLength(1)`
  + `expect(updateCalls[0].uri).toBe(VALID_URI)` + `expect(updateCalls[0].cgId).toBe('devnet-test')`.
  Also tighten the status from `not.toBe(400)` to `toBe(200)`, and pin
  the response body shape (`verificationStatus === 'failed'`,
  `probe.status` matches `failed|unreachable`). A regression that
  re-introduced the validation-order bug now fails on at least three
  separate assertions.
- Test 2 (minimum-input URI-only re-verify): same status tightening
  (`toBe(200)`) + `expect(updateCalls).toHaveLength(1)` + uri/cgId pin.
- Tests 3-5 (the 400/404 negative paths): added
  `expect(updateCalls).toHaveLength(0)` so a regression that 400s/404s
  but ALSO mutates can't slip through. Cheap defence in depth.

Verified empirically (`node` invocation against the built dist) that the
no-protocol/stored-PLAINTEXT scenario does reach the URI-keyed
`agent.update` call after Bug 1's fix; status 200 with
`verificationStatus: "failed"` is the exact shape the route emits when
the probe is unreachable. The tightened assertions are not aspirational.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread packages/agent/src/dkg-agent-utils.ts
Comment thread packages/cli/src/daemon/parsers/kafka-request.ts Outdated
Comment thread packages/cli/src/daemon/routes/kafka.ts
…ials (verify regression)

Codex Bug B3: an operator who registered an SSL endpoint via the system
trust store (no inline `ca`/`cert`/`key` PEM material — relies on the
platform's default CA bundle) could register fine — `shouldProbe` accepts
bare SSL — but re-verify of the same endpoint with no body creds 400'd
with "Re-verify requires credentials...". Inconsistent with register's
acceptance, and a UX regression: bare SSL is meaningful to verify (TLS
connect + server-cert validation against the system CA, same "broker
reachable + serving valid cert" semantics PLAINTEXT has for plain
reachability).

Fix: extend `hasAnyKafkaCredentials`'s explicit-protocol branch to accept
SSL the same way it accepts PLAINTEXT. SASL_PLAINTEXT/SASL_SSL stay on
the negative side — those require an explicit sasl block.

After this change, `shouldProbe` and `hasAnyKafkaCredentials` agree on
the bare-protocol case for both PLAINTEXT and SSL. SASL protocols
require credentials in both gates (shouldProbe checks `body.sasl?.username
&& body.sasl?.password`; hasAnyKafkaCredentials checks the same).
`validateKafkaAuthConsistency` was already permissive on SSL-without-ssl-block
(only `requiresSasl` is checked), so no consistency-validator change
needed.

Tests:
- Parser unit (`packages/cli/test/kafka-route-parsers.test.ts`): the
  existing test that pinned the broken behaviour ("false when only an
  empty securityProtocol of SSL is set") is flipped — now expects `true`,
  with comment explaining Bug B3. Two negative-side tests added pinning
  that SASL_PLAINTEXT and SASL_SSL still return `false` on bare protocol
  (so the fix can't accidentally widen).
- Route-level (`packages/cli/test/kafka-route-verify.test.ts`): new
  scenario — stored SSL endpoint + URI-only body → reaches probe →
  reaches `agent.update` → 200. Same shape as the existing PLAINTEXT
  defaulting test (B2's tightening: `expect(updateCalls).toHaveLength(1)`,
  uri/cgId pin, response shape pin including `verificationStatus: 'failed'`
  and `probe.status` matches `failed|unreachable`).

Sibling weak-assertion sweep on `kafka-route-verify.test.ts`: only the
two B2 comments referencing the prior tautologies remain — no remaining
`not.toBe(...)` or `toBeGreaterThanOrEqual(0)` assertions to tighten.

Coverage holds at 100/98.61/100/100 (above 98 ratchet).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread packages/agent/src/dkg-agent-utils.ts
Comment thread packages/kafka/src/endpoint.ts
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant