Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,29 @@ class DaemonHTTPAPI {
class DKGAgent {
+writeWorkingMemory()
+promoteSharedMemory()
+encodeWorkspaceGossipMessage()
+publishWorkspaceGossip()
}
class AgentKeyStore {
+selectDefaultOrFallbackSigner()
+selectAllowedSigner(contextGraphId)
}
class AgentGateMetadata {
+DKG_ALLOWED_AGENT
+DKG_PARTICIPANT_AGENT
}
class GossipEnvelope {
+version
+type
+contextGraphId
+agentAddress
+timestamp
+signature
+payload
}
class SharedMemoryHandler {
+handle(data, from)
+verifyAgentEnvelope()
}
class AsyncPublisher {
+enqueue()
Expand All @@ -75,10 +98,72 @@ DaemonHTTPAPI --> DKGAgent : delegates memory writes
DaemonHTTPAPI --> AsyncPublisher : delegates lift jobs
DKGAgent --> WorkingMemory : owns
DKGAgent --> SharedWorkingMemory : gossips
DKGAgent --> AgentKeyStore : selects local signing agent
DKGAgent --> AgentGateMetadata : reads agent gates
DKGAgent --> GossipEnvelope : wraps signed SWM gossip
GossipEnvelope --> SharedMemoryHandler : delivered on SWM topic
SharedMemoryHandler --> AgentGateMetadata : authorizes gated writers
SharedMemoryHandler --> SharedWorkingMemory : stores accepted writes
AsyncPublisher --> SharedWorkingMemory : reads source data
AsyncPublisher --> VerifiedMemory : publishes
```

## Shared Memory Gossip Authentication

Shared Working Memory gossip is authenticated at the agent layer when a local
agent private key is available. For non-agent-gated context graphs, the sender
prefers the configured default agent key and falls back to another local signing
agent; if no local signing key exists, the legacy raw SWM payload remains valid.

For agent-gated context graphs, `DKG_ALLOWED_AGENT` and
`DKG_PARTICIPANT_AGENT` metadata define the accepted writer set. Outgoing SWM
gossip must be signed by one of those local agents, otherwise the write is not
broadcast. Receivers accept legacy raw SWM only when the graph is not
agent-gated. For gated graphs, `SharedMemoryHandler` requires a current signed
`GossipEnvelope`, verifies the claimed agent address against the recovered
signature, checks that the envelope context graph matches the payload, and
rejects writers outside the allowed or participant agent set.

```mermaid
sequenceDiagram
actor Writer as Local agent process
participant Agent as DKGAgent
participant Keys as AgentKeyStore
participant Meta as ContextGraphMeta
participant Gossip as GossipSub
participant Handler as SharedMemoryHandler
participant SWM as SharedWorkingMemory

Writer->>Agent: share or promote SWM write
Agent->>Meta: read DKG_ALLOWED_AGENT and DKG_PARTICIPANT_AGENT
alt context graph is agent-gated
Agent->>Keys: select local allowed signing key
alt no allowed private key
Agent-->>Writer: abort SWM gossip
else allowed private key exists
Agent->>Agent: encode signed GossipEnvelope
Agent->>Gossip: publish signed envelope
end
else context graph is not agent-gated
Agent->>Keys: select default or fallback local signing key
alt signing key exists
Agent->>Agent: encode signed GossipEnvelope
Agent->>Gossip: publish signed envelope
else no signing key
Agent->>Gossip: publish legacy raw SWM payload
end
end
Gossip->>Handler: deliver SWM topic message
Handler->>Meta: read accepted agent writers
alt receiver graph is agent-gated
Handler->>Handler: require envelope and verify signature, timestamp, and writer
Handler->>SWM: store accepted write
else receiver graph is not agent-gated
Handler->>Handler: decode envelope or legacy raw payload
Handler->>SWM: store accepted write
end
```

## Source Worker Workflow

Source-worker configuration is sensitive operator material. It contains the
Expand Down Expand Up @@ -157,7 +242,7 @@ participant Architecture as ARCHITECTURE.md
participant Git as LocalGit

User->>Workflow: continue from failure checkpoint
Workflow->>Implementer: implement focused source-worker fix
Workflow->>Implementer: implement focused code change
Implementer-->>Workflow: code and tests changed
Workflow->>Validation: run focused validation and code review
Validation-->>Workflow: passed
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ create assertion ──► write triples ──► promote ──► publish ─

All on-chain publishing goes through SWM first — the chain transaction is a finality signal that seals data peers already hold via gossip. Assertions themselves carry a durable lifecycle record (`created → promoted → published → finalized`, or `discarded`) in the context graph's `_meta` graph, so their history is auditable independently of the data.

SWM gossip is signed when the node has a local agent private key. Context graphs
that declare `DKG_ALLOWED_AGENT` or `DKG_PARTICIPANT_AGENT` require a signed
`GossipEnvelope` from one of those agent addresses; unsigned legacy SWM payloads
are accepted only for context graphs without agent gates.

---

## Quick Start
Expand Down
3 changes: 2 additions & 1 deletion packages/agent/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# @origintrail-official/dkg-agent

Agent runtime for DKG V9. Provides the `DKGAgent` class — the primary entry point for building agents that participate in the decentralized knowledge network.
Agent runtime for DKG V10. Provides the `DKGAgent` class — the primary entry point for building agents that participate in the decentralized knowledge network.

## Features

- **DKGAgent** — unified agent class that wires together a DKG node, storage, publishing, querying, and chain interaction
- **Wallet management** — `DKGAgentWallet` for Ed25519 (P2P identity) and ECDSA (on-chain signing) key pairs, with persistent key storage and operational wallet support
- **Agent profiles** — `ProfileManager` for publishing and updating agent skill profiles to the agent registry paranet
- **Discovery** — `DiscoveryClient` for finding other agents by name, skill keywords, or semantic search over published profiles
- **Signed shared memory gossip** — SWM writes are wrapped in a signed gossip envelope when a local agent key is available, and agent-gated context graphs require a local `DKG_ALLOWED_AGENT` or `DKG_PARTICIPANT_AGENT` signing key before broadcasting
- **Encrypted messaging** — Ed25519-to-X25519 key conversion, ECDH shared secrets, and encrypted P2P message channels
- **Skill invocation** — `MessageHandler` for receiving and responding to skill requests; `SkillHandler` and `ChatHandler` for registering custom capabilities

Expand Down
144 changes: 124 additions & 20 deletions packages/agent/src/dkg-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
computeACKDigest,
encodePublishRequest,
encodeKAUpdateRequest,
encodeGossipEnvelope,
computeGossipSigningPayload,
GOSSIP_ENVELOPE_VERSION,
GOSSIP_TYPE_WORKSPACE_PUBLISH,
encodeFinalizationMessage, type FinalizationMessageMsg,
getGenesisQuads, computeNetworkId, SYSTEM_PARANETS, DKG_ONTOLOGY,
Logger, createOperationContext, sparqlString, escapeSparqlLiteral,
Expand Down Expand Up @@ -2861,6 +2865,121 @@
return this._publish(contextGraphId, input as Quad[], undefined, thirdArg ?? fourthArg);
}

private getWorkspaceGossipSigningAgent(): (AgentKeyRecord & { privateKey: string }) | null {
const defaultAddress = this.defaultAgentAddress?.toLowerCase();
let fallback: (AgentKeyRecord & { privateKey: string }) | null = null;
for (const record of this.localAgents.values()) {
if (!record.privateKey) continue;
const signingRecord = { ...record, privateKey: record.privateKey };
if (defaultAddress && record.agentAddress.toLowerCase() === defaultAddress) {
return signingRecord;
}
fallback ??= signingRecord;
}
return fallback;
}

private async getContextGraphAgentGateAddresses(contextGraphId: string): Promise<string[] | null> {
const seen = new Set<string>();
const agents: string[] = [];
let sawAgentGate = false;
const add = (value: string | undefined) => {
if (!value || !ethers.isAddress(value)) return;
const checksum = ethers.getAddress(value);
const key = checksum.toLowerCase();
if (seen.has(key)) return;
seen.add(key);
agents.push(checksum);
};

const subscriptionAgents = this.subscribedContextGraphs.get(contextGraphId)?.participantAgents ?? [];
if (subscriptionAgents.length > 0) sawAgentGate = true;
for (const agentAddress of subscriptionAgents) {
add(agentAddress);
}

const contextGraphUri = paranetDataGraphUri(contextGraphId);
const cgMetaGraph = paranetMetaGraphUri(contextGraphId);
const result = await this.store.query(
`SELECT ?agent WHERE {
GRAPH <${cgMetaGraph}> {
{ <${contextGraphUri}> <${DKG_ONTOLOGY.DKG_ALLOWED_AGENT}> ?agent }
UNION
{ <${contextGraphUri}> <${DKG_ONTOLOGY.DKG_PARTICIPANT_AGENT}> ?agent }
}
}`,
);
if (result.type === 'bindings') {
if (result.bindings.length > 0) sawAgentGate = true;
for (const row of result.bindings) {
const raw = row['agent'];
if (typeof raw === 'string') {
add(raw.replace(/^"/, '').replace(/"(@[a-zA-Z-]+|\^\^<[^>]+>)?$/, ''));
}
}
}

return sawAgentGate ? agents : null;
}

private async resolveWorkspaceGossipSigningAgent(
contextGraphId: string,
): Promise<(AgentKeyRecord & { privateKey: string }) | null> {
const allowedAgents = await this.getContextGraphAgentGateAddresses(contextGraphId);
if (!allowedAgents) {
return this.getWorkspaceGossipSigningAgent();
}

const allowedSet = new Set(allowedAgents.map((agent) => agent.toLowerCase()));
for (const record of this.localAgents.values()) {
if (record.privateKey && allowedSet.has(record.agentAddress.toLowerCase())) {
return { ...record, privateKey: record.privateKey };
}
}

throw new Error(`Cannot gossip SWM write for agent-gated context graph "${contextGraphId}": no local allowed signing agent key`);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: throwing here turns a missing local signer into a hard failure after the local SWM write has already been committed. publishAsync(), share(), and conditionalShare() all call writeToWorkspace() first, so callers can get an error even though local state and async-lift jobs were already created. Either decide this before mutating local state, or downgrade it to “skip broadcast and keep the local write” so the API result matches what actually happened.

}

private async encodeWorkspaceGossipMessage(contextGraphId: string, message: Uint8Array): Promise<Uint8Array> {
const signer = await this.resolveWorkspaceGossipSigningAgent(contextGraphId);
if (!signer) {
return message;
}

const timestamp = new Date().toISOString();
const payload = new Uint8Array(message);
const signingPayload = computeGossipSigningPayload(
GOSSIP_TYPE_WORKSPACE_PUBLISH,
contextGraphId,
timestamp,
payload,
);
const signature = await new ethers.Wallet(signer.privateKey).signMessage(signingPayload);
return encodeGossipEnvelope({
version: GOSSIP_ENVELOPE_VERSION,
type: GOSSIP_TYPE_WORKSPACE_PUBLISH,
contextGraphId,
agentAddress: signer.agentAddress,
timestamp,
signature: ethers.getBytes(signature),
payload,
});
}

private async publishWorkspaceGossip(
contextGraphId: string,
message: Uint8Array,
ctx: OperationContext,
): Promise<void> {
const topic = paranetWorkspaceTopic(contextGraphId);
const wireMessage = await this.encodeWorkspaceGossipMessage(contextGraphId, message);
try {
await this.gossip.publish(topic, wireMessage);
} catch {
this.log.warn(ctx, `No peers subscribed to ${topic} yet`);
}
}

async publishAsync(
contextGraphIdOrUal: string,
content: PublishAsyncContent,
Expand Down Expand Up @@ -2938,12 +3057,7 @@
});

if (!opts?.localOnly) {
const topic = paranetWorkspaceTopic(contextGraphId);
try {
await this.gossip.publish(topic, message);
} catch {
this.log.warn(ctx, `No peers subscribed to ${topic} yet`);
}
await this.publishWorkspaceGossip(contextGraphId, message, ctx);
}

return { captureID };
Expand Down Expand Up @@ -3063,12 +3177,7 @@
subGraphName: opts?.subGraphName,
});
if (!opts?.localOnly) {
const topic = paranetWorkspaceTopic(contextGraphId);
try {
await this.gossip.publish(topic, message);
} catch {
this.log.warn(ctx, `No peers subscribed to ${topic} yet`);
}
await this.publishWorkspaceGossip(contextGraphId, message, ctx);
}
return { shareOperationId };
}
Expand All @@ -3094,12 +3203,7 @@
subGraphName: opts?.subGraphName,
});
if (!opts?.localOnly) {
const topic = paranetWorkspaceTopic(contextGraphId);
try {
await this.gossip.publish(topic, message);
} catch {
this.log.warn(ctx, `No peers subscribed to ${topic} yet`);
}
await this.publishWorkspaceGossip(contextGraphId, message, ctx);
}
return { shareOperationId };
}
Expand Down Expand Up @@ -3767,6 +3871,7 @@
this.sharedMemoryHandler = new SharedMemoryHandler(this.store, this.eventBus, {
sharedMemoryOwnedEntities: this.workspaceOwnedEntities,
writeLocks: this.writeLocks,
localAgentAddresses: () => [...this.localAgents.keys()],
});
}
return this.sharedMemoryHandler;
Expand Down Expand Up @@ -6893,7 +6998,7 @@
}

get peerId(): string {
return this.node.peerId;

Check failure on line 7001 in packages/agent/src/dkg-agent.ts

View workflow job for this annotation

GitHub Actions / Tornado: agent [3/10]

test/e2e-flows.test.ts > Query safety (SPARQL guard) > allows queries with PREFIX declarations

Error: DKGNode not started ❯ DKGNode.requireNode ../core/src/node.ts:501:27 ❯ DKGNode.get peerId [as peerId] ../core/src/node.ts:479:17 ❯ DKGAgent.get peerId [as peerId] src/dkg-agent.ts:7001:22 ❯ DKGAgent.query src/dkg-agent.ts:3525:27 ❯ test/e2e-flows.test.ts:459:28

Check failure on line 7001 in packages/agent/src/dkg-agent.ts

View workflow job for this annotation

GitHub Actions / Tornado: agent [3/10]

test/e2e-flows.test.ts > Query safety (SPARQL guard) > allows DESCRIBE queries

Error: DKGNode not started ❯ DKGNode.requireNode ../core/src/node.ts:501:27 ❯ DKGNode.get peerId [as peerId] ../core/src/node.ts:479:17 ❯ DKGAgent.get peerId [as peerId] src/dkg-agent.ts:7001:22 ❯ DKGAgent.query src/dkg-agent.ts:3525:27 ❯ test/e2e-flows.test.ts:446:28

Check failure on line 7001 in packages/agent/src/dkg-agent.ts

View workflow job for this annotation

GitHub Actions / Tornado: agent [3/10]

test/e2e-flows.test.ts > Query safety (SPARQL guard) > allows ASK queries

Error: DKGNode not started ❯ DKGNode.requireNode ../core/src/node.ts:501:27 ❯ DKGNode.get peerId [as peerId] ../core/src/node.ts:479:17 ❯ DKGAgent.get peerId [as peerId] src/dkg-agent.ts:7001:22 ❯ DKGAgent.query src/dkg-agent.ts:3525:27 ❯ test/e2e-flows.test.ts:433:28

Check failure on line 7001 in packages/agent/src/dkg-agent.ts

View workflow job for this annotation

GitHub Actions / Tornado: agent [3/10]

test/e2e-flows.test.ts > Query safety (SPARQL guard) > allows CONSTRUCT queries

Error: DKGNode not started ❯ DKGNode.requireNode ../core/src/node.ts:501:27 ❯ DKGNode.get peerId [as peerId] ../core/src/node.ts:479:17 ❯ DKGAgent.get peerId [as peerId] src/dkg-agent.ts:7001:22 ❯ DKGAgent.query src/dkg-agent.ts:3525:27 ❯ test/e2e-flows.test.ts:420:28

Check failure on line 7001 in packages/agent/src/dkg-agent.ts

View workflow job for this annotation

GitHub Actions / Tornado: agent [3/10]

test/e2e-flows.test.ts > Query safety (SPARQL guard) > allows SELECT queries

Error: DKGNode not started ❯ DKGNode.requireNode ../core/src/node.ts:501:27 ❯ DKGNode.get peerId [as peerId] ../core/src/node.ts:479:17 ❯ DKGAgent.get peerId [as peerId] src/dkg-agent.ts:7001:22 ❯ DKGAgent.query src/dkg-agent.ts:3525:27 ❯ test/e2e-flows.test.ts:407:28
}

get nodeName(): string {
Expand Down Expand Up @@ -7929,9 +8034,8 @@
{ ...opts, publisherPeerId: agent.node.peerId.toString() },
);
if (gossipMessage) {
const topic = paranetWorkspaceTopic(contextGraphId);
try {
await agent.gossip.publish(topic, gossipMessage);
await agent.publishWorkspaceGossip(contextGraphId, gossipMessage, createOperationContext('share'));
} catch (err: any) {
agent.log.warn(createOperationContext('share'), `Promote gossip failed (local SWM committed): ${err?.message ?? err}`);
}
Expand Down
44 changes: 20 additions & 24 deletions packages/agent/test/agent-audit-extra.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
decodeWorkspacePublishRequest,
encodeFinalizationMessage,
decodeGossipEnvelope,
computeGossipSigningPayload,
GOSSIP_ENVELOPE_VERSION,
GOSSIP_TYPE_WORKSPACE_PUBLISH,
} from '@origintrail-official/dkg-core';
import {
SharedMemoryHandler,
Expand Down Expand Up @@ -462,12 +465,12 @@
const m = body.match(/did:dkg:agent:(?!0x|\$\{)[A-Za-z0-9]+/g);
if (m) offenders.push(`${f}: ${m.slice(0, 3).join(', ')}`);
}
expect(offenders).toEqual([]);

Check failure on line 468 in packages/agent/test/agent-audit-extra.test.ts

View workflow job for this annotation

GitHub Actions / Tornado: agent [10/10]

test/agent-audit-extra.test.ts > [A-12] DID format drift in agent.endorse > PROD-BUG: agent test fixtures hard-code non-spec did:dkg:agent: URIs (drift scan)

AssertionError: expected [ Array(1) ] to deeply equal [] - Expected + Received - [] + [ + "agent.test.ts: did:dkg:agent:12D3KooWForeignCreatorPeer111111111111111111111111", + ] ❯ test/agent-audit-extra.test.ts:468:23
});
});

describe('[A-15] Publisher signs every gossip message (SWM share)', () => {
it('PROD-BUG: DKGAgent.share emits raw WorkspacePublishRequest bytes — NOT wrapped in a signed GossipEnvelope', async () => {
it('DKGAgent.share emits a signed GossipEnvelope that carries the WorkspacePublishRequest', async () => {
const agent = await makeAgent('A15-Share');

// Intercept libp2p pubsub publish to capture the raw wire bytes without
Expand All @@ -490,30 +493,23 @@
const shareMsg = captured.find(c => c.topic.includes('shared-memory'));
expect(shareMsg, `expected a shared-memory gossip publish; saw: ${captured.map(c => c.topic).join(', ')}`).toBeTruthy();

// ① The bytes successfully decode as WorkspacePublishRequest (raw payload).
const decoded = decodeWorkspacePublishRequest(shareMsg!.data);
const envelope = decodeGossipEnvelope(shareMsg!.data);
expect(envelope.version).toBe(GOSSIP_ENVELOPE_VERSION);
expect(envelope.type).toBe(GOSSIP_TYPE_WORKSPACE_PUBLISH);
expect(envelope.contextGraphId).toBe(CG);
expect(envelope.signature.length).toBeGreaterThan(0);

const signingPayload = computeGossipSigningPayload(
envelope.type,
envelope.contextGraphId,
envelope.timestamp,
envelope.payload,
);
const recovered = ethers.verifyMessage(signingPayload, ethers.hexlify(envelope.signature));
expect(recovered.toLowerCase()).toBe(envelope.agentAddress.toLowerCase());

const decoded = decodeWorkspacePublishRequest(envelope.payload);
expect(decoded.paranetId).toBe(CG);
expect(decoded.publisherPeerId).toBe(agent.peerId);

// ② When decoded as a GossipEnvelope (spec — §GossipEnvelopeSchema),
// the signature field is EMPTY. Protobuf decode will not throw
// because the wire types happen to align, but `signature.length`
// is zero, proving nothing was signed.
let envelopeView: any = undefined;
try {
envelopeView = decodeGossipEnvelope(shareMsg!.data);
} catch {
// Some permutations of wire layout will throw — that is ALSO a pass
// for this assertion: if it doesn't even parse as a GossipEnvelope,
// then it certainly isn't a signed GossipEnvelope.
}
if (envelopeView) {
const sig: Uint8Array | undefined = envelopeView.signature;
const sigLen = sig ? sig.length : 0;
// PROD-BUG (audit A-15): V10 requires every gossip message to ride
// inside a signed envelope. The WM share path bypasses the envelope
// entirely, so there is no signature to verify.
expect(sigLen).toBe(0);
}
}, 20_000);
});
Loading
Loading