Skip to content

Commit 40f5bfb

Browse files
authored
Add operator dispatch CLI
Add agent-term-dispatch CLI for dispatching one event through the AgentTerm pipeline. This wires local/default Matrix posture, Agent Registry, Policy Fabric, registered participants, plane adapters, EventStore persistence, and optional snapshot rendering through a tested operator command.
1 parent a25d8b2 commit 40f5bfb

4 files changed

Lines changed: 384 additions & 4 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ matrix = [
5050

5151
[project.scripts]
5252
agent-term = "agent_term.cli:main"
53+
agent-term-dispatch = "agent_term.dispatch_cli:main"
5354
agent-term-snapshot = "agent_term.snapshot_cli:main"
5455

5556
[tool.setuptools.packages.find]

src/agent_term/dispatch_cli.py

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
"""CLI entry point for dispatching an event through the operator pipeline."""
2+
3+
from __future__ import annotations
4+
5+
import argparse
6+
import json
7+
import sys
8+
from pathlib import Path
9+
10+
from agent_term.agent_registry import AgentRegistration, AgentRegistryAdapter
11+
from agent_term.agent_registry import InMemoryAgentRegistryBackend, ToolGrant
12+
from agent_term.agentplane import AgentPlaneAdapter, InMemoryAgentPlaneBackend
13+
from agent_term.cloudshell_fog import CloudShellFogAdapter, InMemoryCloudShellFogBackend
14+
from agent_term.events import AgentTermEvent
15+
from agent_term.knowledge import (
16+
HolmesAdapter,
17+
InMemoryHolmesBackend,
18+
InMemoryMemoryMeshBackend,
19+
InMemoryMeshRushBackend,
20+
InMemoryNewHopeBackend,
21+
InMemorySherlockSearchBackend,
22+
InMemorySlashTopicsBackend,
23+
MemoryMeshAdapter,
24+
MeshRushAdapter,
25+
NewHopeAdapter,
26+
SherlockSearchAdapter,
27+
SlashTopicsAdapter,
28+
)
29+
from agent_term.matrix_adapter import MatrixAdapter
30+
from agent_term.participants import InMemoryParticipantBackend, RegisteredParticipantAdapter
31+
from agent_term.pipeline import OperatorDispatchPipeline
32+
from agent_term.policy_fabric import (
33+
ALLOW,
34+
DENY,
35+
PENDING,
36+
InMemoryPolicyFabricBackend,
37+
PolicyDecision,
38+
PolicyFabricAdapter,
39+
)
40+
from agent_term.policy_fabric import action_for_event
41+
from agent_term.store import DEFAULT_DB_PATH, EventStore
42+
from agent_term.workspace import (
43+
InMemoryProphetWorkspaceBackend,
44+
InMemorySociosphereBackend,
45+
ProphetWorkspaceAdapter,
46+
SociosphereAdapter,
47+
)
48+
49+
50+
def build_parser() -> argparse.ArgumentParser:
51+
parser = argparse.ArgumentParser(
52+
prog="agent-term-dispatch",
53+
description="Dispatch one AgentTerm event through Matrix, Agent Registry, Policy Fabric, adapters, EventStore, and snapshot generation.",
54+
)
55+
parser.add_argument("source", help="Event source/adapter key, e.g. memory-mesh, codex, matrix.")
56+
parser.add_argument("kind", help="Event kind, e.g. memory_recall, agent_message, context_pack.")
57+
parser.add_argument("channel", help="Logical channel or Matrix room alias/ID.")
58+
parser.add_argument("body", help="Event body.")
59+
parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Path to local AgentTerm SQLite event log.")
60+
parser.add_argument("--sender", default="@operator")
61+
parser.add_argument("--thread-id")
62+
parser.add_argument("--metadata-json", default="{}")
63+
parser.add_argument("--agent-id", help="Agent Registry ID to include on the event and pre-register locally.")
64+
parser.add_argument("--register-agent", action="append", default=[], help="Register an agent ID in the local in-memory registry. Repeatable.")
65+
parser.add_argument("--grant", action="append", default=[], help="Grant in form agent_id:tool[:grant_id]. Repeatable.")
66+
parser.add_argument("--tool", help="Tool name requested by this event.")
67+
parser.add_argument("--allow-policy", action="append", default=[], help="Allow policy action. Repeatable.")
68+
parser.add_argument("--deny-policy", action="append", default=[], help="Deny policy action. Repeatable.")
69+
parser.add_argument("--pending-policy", action="append", default=[], help="Pending policy action. Repeatable.")
70+
parser.add_argument("--policy-action", help="Explicit policy action for this event.")
71+
parser.add_argument("--policy-ref", default="local://policy-fabric/dispatch-cli")
72+
parser.add_argument("--sensitive-context", action="store_true")
73+
parser.add_argument("--matrix-encrypted", action="store_true")
74+
parser.add_argument("--matrix-verified", action="store_true")
75+
parser.add_argument("--show-snapshot", action="store_true", help="Print the generated operator snapshot after dispatch.")
76+
return parser
77+
78+
79+
def parse_metadata(metadata_json: str) -> dict[str, object]:
80+
try:
81+
value = json.loads(metadata_json)
82+
except json.JSONDecodeError as exc:
83+
raise SystemExit(f"metadata must be valid JSON: {exc}") from exc
84+
if not isinstance(value, dict):
85+
raise SystemExit("metadata must decode to a JSON object")
86+
return value
87+
88+
89+
def build_event(args: argparse.Namespace) -> AgentTermEvent:
90+
metadata = parse_metadata(args.metadata_json)
91+
if args.agent_id:
92+
metadata["agent_id"] = args.agent_id
93+
if args.tool:
94+
metadata["tool"] = args.tool
95+
if args.policy_action:
96+
metadata["policy_action"] = args.policy_action
97+
if args.sensitive_context:
98+
metadata["sensitive_context"] = True
99+
if args.matrix_encrypted:
100+
metadata["matrix_encrypted"] = True
101+
metadata["matrix_e2ee_verified"] = bool(args.matrix_verified)
102+
103+
return AgentTermEvent(
104+
channel=args.channel,
105+
sender=args.sender,
106+
kind=args.kind,
107+
source=args.source,
108+
body=args.body,
109+
thread_id=args.thread_id,
110+
metadata=metadata,
111+
)
112+
113+
114+
def build_registry_backend(args: argparse.Namespace) -> InMemoryAgentRegistryBackend:
115+
agent_ids = set(args.register_agent)
116+
if args.agent_id:
117+
agent_ids.add(args.agent_id)
118+
119+
agents = [
120+
AgentRegistration(
121+
agent_id=agent_id,
122+
registry_ref=f"local://agent-registry/{agent_id}",
123+
spec_version="local-dev",
124+
session_id=f"session-{agent_id.replace('.', '-')}",
125+
)
126+
for agent_id in sorted(agent_ids)
127+
]
128+
grants = [_parse_grant(raw) for raw in args.grant]
129+
return InMemoryAgentRegistryBackend(agents=agents, grants=grants)
130+
131+
132+
def _parse_grant(raw: str) -> ToolGrant:
133+
parts = raw.split(":")
134+
if len(parts) not in {2, 3}:
135+
raise SystemExit("--grant must use form agent_id:tool[:grant_id]")
136+
agent_id, tool = parts[0], parts[1]
137+
grant_id = parts[2] if len(parts) == 3 else f"grant.{agent_id}.{tool}"
138+
return ToolGrant(grant_id=grant_id, agent_id=agent_id, tool=tool)
139+
140+
141+
def build_policy_backend(args: argparse.Namespace, event: AgentTermEvent) -> InMemoryPolicyFabricBackend:
142+
decisions: list[PolicyDecision] = []
143+
for action in args.allow_policy:
144+
decisions.append(_decision(action, ALLOW, args.policy_ref))
145+
for action in args.deny_policy:
146+
decisions.append(_decision(action, DENY, args.policy_ref, reason="denied by dispatch CLI"))
147+
for action in args.pending_policy:
148+
decisions.append(_decision(action, PENDING, args.policy_ref))
149+
150+
if args.policy_action and args.policy_action not in {decision.action for decision in decisions}:
151+
decisions.append(_decision(args.policy_action, ALLOW, args.policy_ref))
152+
elif args.sensitive_context and not decisions:
153+
decisions.append(_decision(action_for_event(event), ALLOW, args.policy_ref))
154+
155+
return InMemoryPolicyFabricBackend(decisions)
156+
157+
158+
def _decision(action: str, status: str, policy_ref: str, reason: str | None = None) -> PolicyDecision:
159+
return PolicyDecision(
160+
decision_id=f"decision.{status}.{action}",
161+
action=action,
162+
status=status,
163+
policy_ref=policy_ref,
164+
reason=reason,
165+
)
166+
167+
168+
def build_pipeline(args: argparse.Namespace, event: AgentTermEvent, store: EventStore) -> OperatorDispatchPipeline:
169+
registry_backend = build_registry_backend(args)
170+
policy_backend = build_policy_backend(args, event)
171+
participant_backend = InMemoryParticipantBackend()
172+
173+
adapters = (
174+
MatrixAdapter(),
175+
CloudShellFogAdapter(InMemoryCloudShellFogBackend()),
176+
AgentPlaneAdapter(InMemoryAgentPlaneBackend()),
177+
SociosphereAdapter(InMemorySociosphereBackend()),
178+
ProphetWorkspaceAdapter(InMemoryProphetWorkspaceBackend()),
179+
SlashTopicsAdapter(InMemorySlashTopicsBackend()),
180+
MemoryMeshAdapter(InMemoryMemoryMeshBackend()),
181+
NewHopeAdapter(InMemoryNewHopeBackend()),
182+
SherlockSearchAdapter(InMemorySherlockSearchBackend()),
183+
HolmesAdapter(InMemoryHolmesBackend()),
184+
MeshRushAdapter(InMemoryMeshRushBackend()),
185+
RegisteredParticipantAdapter(registry_backend, policy_backend, participant_backend),
186+
)
187+
188+
return OperatorDispatchPipeline(
189+
store=store,
190+
matrix_adapter=MatrixAdapter(),
191+
agent_registry_adapter=AgentRegistryAdapter(registry_backend),
192+
policy_fabric_adapter=PolicyFabricAdapter(policy_backend),
193+
adapters=adapters,
194+
)
195+
196+
197+
def main(argv: list[str] | None = None) -> int:
198+
args = build_parser().parse_args(argv)
199+
event = build_event(args)
200+
store = EventStore(Path(args.db))
201+
try:
202+
outcome = build_pipeline(args, event, store).dispatch(event)
203+
status = "ok" if outcome.ok else "blocked"
204+
print(f"dispatch_status={status}")
205+
if outcome.adapter_key:
206+
print(f"adapter={outcome.adapter_key}")
207+
if outcome.blocked_reason:
208+
print(f"blocked_reason={outcome.blocked_reason}")
209+
print(f"persisted_events={len(outcome.persisted_events)}")
210+
print(f"input_event_id={outcome.input_event.event_id}")
211+
if args.show_snapshot:
212+
print(outcome.snapshot.render_text())
213+
return 0 if outcome.ok else 1
214+
finally:
215+
store.close()
216+
217+
218+
if __name__ == "__main__":
219+
raise SystemExit(main(sys.argv[1:]))

src/agent_term/pipeline.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def __init__(
6666

6767
def dispatch(self, event: AgentTermEvent) -> DispatchOutcome:
6868
persisted: list[AgentTermEvent] = [self.store.append(event)]
69+
dispatch_event = event
6970

7071
matrix_event = self._matrix_gate(event)
7172
if matrix_event is not None:
@@ -84,23 +85,24 @@ def dispatch(self, event: AgentTermEvent) -> DispatchOutcome:
8485
persisted.append(self.store.append(policy_event))
8586
if _is_blocked(policy_event):
8687
return self._outcome(False, event, persisted, _deny_reason(policy_event))
88+
dispatch_event = _event_with_policy_decision(event, policy_event)
8789

88-
adapter = self._select_adapter(event)
90+
adapter = self._select_adapter(dispatch_event)
8991
if adapter is None:
9092
no_adapter = _result_event(
91-
event,
93+
dispatch_event,
9294
AdapterResult(
9395
ok=False,
9496
source="pipeline",
9597
kind="adapter_result",
96-
body=f"No adapter found for {event.source}.{event.kind}",
98+
body=f"No adapter found for {dispatch_event.source}.{dispatch_event.kind}",
9799
metadata={"deny_reason": "no_adapter", "fail_closed": True},
98100
),
99101
)
100102
persisted.append(self.store.append(no_adapter))
101103
return self._outcome(False, event, persisted, "no_adapter")
102104

103-
result_event = _result_event(event, adapter.handle(event))
105+
result_event = _result_event(dispatch_event, adapter.handle(dispatch_event))
104106
persisted.append(self.store.append(result_event))
105107
return self._outcome(
106108
not _is_blocked(result_event),
@@ -226,6 +228,29 @@ def _result_event(request: AgentTermEvent, result: AdapterResult) -> AgentTermEv
226228
return result.to_event(request)
227229

228230

231+
def _event_with_policy_decision(event: AgentTermEvent, policy_event: AgentTermEvent) -> AgentTermEvent:
232+
policy_decision_id = policy_event.metadata.get("policy_decision_id")
233+
if not policy_decision_id:
234+
return event
235+
metadata = {
236+
**event.metadata,
237+
"policy_decision_ref": str(policy_decision_id),
238+
"policy_decision_id": str(policy_decision_id),
239+
"policy_ref": policy_event.metadata.get("policy_ref"),
240+
}
241+
return AgentTermEvent(
242+
channel=event.channel,
243+
sender=event.sender,
244+
kind=event.kind,
245+
source=event.source,
246+
body=event.body,
247+
thread_id=event.thread_id,
248+
metadata=metadata,
249+
event_id=event.event_id,
250+
created_at=event.created_at,
251+
)
252+
253+
229254
def _agent_id(event: AgentTermEvent) -> str | None:
230255
value = (
231256
event.metadata.get("agent_id")

0 commit comments

Comments
 (0)