From 448bb241a1e8115c605e0ce99b09d0ef1753a5fd Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 10:15:12 -0400 Subject: [PATCH 1/2] Add operator dispatch pipeline --- src/agent_term/pipeline.py | 256 +++++++++++++++++++++++++++++++++++++ 1 file changed, 256 insertions(+) create mode 100644 src/agent_term/pipeline.py diff --git a/src/agent_term/pipeline.py b/src/agent_term/pipeline.py new file mode 100644 index 0000000..62ab5f2 --- /dev/null +++ b/src/agent_term/pipeline.py @@ -0,0 +1,256 @@ +"""Operator dispatch pipeline. + +The pipeline wires existing AgentTerm scaffolds together without becoming a new +authority plane. It records every input, gate, decision, adapter result, and TUI +snapshot source event in the local event log. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Iterable + +from agent_term.adapters import AdapterResult, AgentTermAdapter +from agent_term.agent_registry import AgentRegistryAdapter +from agent_term.events import AgentTermEvent +from agent_term.matrix_adapter import MatrixAdapter +from agent_term.participants import PARTICIPANT_AGENT_IDS +from agent_term.policy_fabric import PolicyFabricAdapter, requires_admission +from agent_term.store import EventStore +from agent_term.tui_model import TuiSnapshot, TuiSnapshotBuilder + + +@dataclass(frozen=True) +class DispatchOutcome: + """Result of dispatching a single operator event.""" + + ok: bool + input_event: AgentTermEvent + persisted_events: tuple[AgentTermEvent, ...] + snapshot: TuiSnapshot + blocked_reason: str | None = None + adapter_key: str | None = None + metadata: dict[str, object] = field(default_factory=dict) + + +@dataclass(frozen=True) +class DispatchPipelineConfig: + """Pipeline policy switches.""" + + require_matrix_posture_for_sensitive_context: bool = True + require_agent_registry_for_participants: bool = True + require_policy_for_admitted_events: bool = True + + +class OperatorDispatchPipeline: + """Dispatches events through Matrix, Agent Registry, Policy Fabric, and adapters.""" + + def __init__( + self, + *, + store: EventStore, + matrix_adapter: MatrixAdapter | None = None, + agent_registry_adapter: AgentRegistryAdapter | None = None, + policy_fabric_adapter: PolicyFabricAdapter | None = None, + adapters: Iterable[AgentTermAdapter] = (), + snapshot_builder: TuiSnapshotBuilder | None = None, + config: DispatchPipelineConfig | None = None, + ) -> None: + self.store = store + self.matrix_adapter = matrix_adapter + self.agent_registry_adapter = agent_registry_adapter + self.policy_fabric_adapter = policy_fabric_adapter + self.adapters = tuple(adapters) + self.snapshot_builder = snapshot_builder or TuiSnapshotBuilder() + self.config = config or DispatchPipelineConfig() + + def dispatch(self, event: AgentTermEvent) -> DispatchOutcome: + persisted: list[AgentTermEvent] = [self.store.append(event)] + + matrix_event = self._matrix_gate(event) + if matrix_event is not None: + persisted.append(self.store.append(matrix_event)) + if _is_blocked(matrix_event): + return self._outcome(False, event, persisted, "matrix_posture_blocked") + + registry_events = self._agent_registry_gates(event) + for registry_event in registry_events: + persisted.append(self.store.append(registry_event)) + if _is_blocked(registry_event): + return self._outcome(False, event, persisted, _deny_reason(registry_event)) + + policy_event = self._policy_gate(event) + if policy_event is not None: + persisted.append(self.store.append(policy_event)) + if _is_blocked(policy_event): + return self._outcome(False, event, persisted, _deny_reason(policy_event)) + + adapter = self._select_adapter(event) + if adapter is None: + no_adapter = _result_event( + event, + AdapterResult( + ok=False, + source="pipeline", + kind="adapter_result", + body=f"No adapter found for {event.source}.{event.kind}", + metadata={"deny_reason": "no_adapter", "fail_closed": True}, + ), + ) + persisted.append(self.store.append(no_adapter)) + return self._outcome(False, event, persisted, "no_adapter") + + result_event = _result_event(event, adapter.handle(event)) + persisted.append(self.store.append(result_event)) + return self._outcome( + not _is_blocked(result_event), + event, + persisted, + _deny_reason(result_event) if _is_blocked(result_event) else None, + adapter_key=getattr(adapter, "key", None), + ) + + def _matrix_gate(self, event: AgentTermEvent) -> AgentTermEvent | None: + if not self.config.require_matrix_posture_for_sensitive_context: + return None + if self.matrix_adapter is None: + return None + if not event.metadata.get("sensitive_context"): + return None + if event.source == "matrix" and event.kind == "matrix_e2ee_posture_check": + return None + + gate_event = AgentTermEvent( + channel=event.channel, + sender="@agent-term", + kind="matrix_e2ee_posture_check", + source="matrix", + body="Check Matrix E2EE posture before sensitive context release.", + thread_id=event.thread_id, + metadata={ + "request_event_id": event.event_id, + "matrix_room_id": event.metadata.get("matrix_room_id"), + "matrix_room_alias": event.metadata.get("matrix_room_alias"), + "matrix_encrypted": event.metadata.get("matrix_encrypted"), + "matrix_e2ee_verified": event.metadata.get("matrix_e2ee_verified"), + "sensitive_context": True, + }, + ) + return _result_event(gate_event, self.matrix_adapter.handle(gate_event)) + + def _agent_registry_gates(self, event: AgentTermEvent) -> tuple[AgentTermEvent, ...]: + if not self.config.require_agent_registry_for_participants: + return () + if self.agent_registry_adapter is None: + return () + + agent_id = _agent_id(event) + if not agent_id: + return () + + identity_request = AgentTermEvent( + channel=event.channel, + sender="@agent-term", + kind="agent_identity", + source="agent-registry", + body=f"Resolve participant identity: {agent_id}", + thread_id=event.thread_id, + metadata={"request_event_id": event.event_id, "agent_id": agent_id}, + ) + identity_event = _result_event( + identity_request, + self.agent_registry_adapter.handle(identity_request), + ) + if _is_blocked(identity_event): + return (identity_event,) + + events = [identity_event] + tool = event.metadata.get("tool") or event.metadata.get("tool_name") + if tool: + grant_request = AgentTermEvent( + channel=event.channel, + sender="@agent-term", + kind="tool_grant", + source="agent-registry", + body=f"Resolve participant tool grant: {agent_id} -> {tool}", + thread_id=event.thread_id, + metadata={ + "request_event_id": event.event_id, + "agent_id": agent_id, + "tool": tool, + }, + ) + events.append( + _result_event(grant_request, self.agent_registry_adapter.handle(grant_request)) + ) + return tuple(events) + + def _policy_gate(self, event: AgentTermEvent) -> AgentTermEvent | None: + if not self.config.require_policy_for_admitted_events: + return None + if self.policy_fabric_adapter is None: + return None + if not requires_admission(event): + return None + if event.source == "policy-fabric": + return None + return _result_event(event, self.policy_fabric_adapter.handle(event)) + + def _select_adapter(self, event: AgentTermEvent) -> AgentTermAdapter | None: + for adapter in self.adapters: + if adapter.supports(event): + return adapter + return None + + def _outcome( + self, + ok: bool, + input_event: AgentTermEvent, + persisted: list[AgentTermEvent], + blocked_reason: str | None, + *, + adapter_key: str | None = None, + ) -> DispatchOutcome: + snapshot = self.snapshot_builder.build(self.store.tail(limit=500)) + return DispatchOutcome( + ok=ok, + input_event=input_event, + persisted_events=tuple(persisted), + snapshot=snapshot, + blocked_reason=blocked_reason, + adapter_key=adapter_key, + ) + + +def _result_event(request: AgentTermEvent, result: AdapterResult) -> AgentTermEvent: + return result.to_event(request) + + +def _agent_id(event: AgentTermEvent) -> str | None: + value = ( + event.metadata.get("agent_id") + or event.metadata.get("agentRegistryId") + or event.metadata.get("agent_registry_id") + or PARTICIPANT_AGENT_IDS.get(event.source) + ) + return str(value) if value else None + + +def _is_blocked(event: AgentTermEvent) -> bool: + metadata = event.metadata + if metadata.get("deny_reason"): + return True + if metadata.get("admission_status") == "denied": + return True + if metadata.get("registry_status") == "denied": + return True + if metadata.get("matrix_status") == "blocked": + return True + if metadata.get("fail_closed") and metadata.get("deny_reason"): + return True + return False + + +def _deny_reason(event: AgentTermEvent) -> str | None: + value = event.metadata.get("deny_reason") or event.metadata.get("matrix_status") + return str(value) if value else None From 4df0f1279caf5827e1937bb6e510c4ecfc583d7b Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 10:21:57 -0400 Subject: [PATCH 2/2] Add operator dispatch pipeline tests --- tests/test_pipeline.py | 243 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 243 insertions(+) create mode 100644 tests/test_pipeline.py diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..7610e80 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,243 @@ +from agent_term.agent_registry import AgentRegistration, AgentRegistryAdapter, InMemoryAgentRegistryBackend +from agent_term.adapters import AdapterResult +from agent_term.events import AgentTermEvent +from agent_term.matrix_adapter import MatrixAdapter +from agent_term.pipeline import OperatorDispatchPipeline +from agent_term.policy_fabric import ALLOW, DENY, InMemoryPolicyFabricBackend, PolicyDecision +from agent_term.policy_fabric import PolicyFabricAdapter +from agent_term.store import EventStore + + +class EchoAdapter: + key = "echo" + + def supports(self, event: AgentTermEvent) -> bool: + return event.source == self.key + + def handle(self, event: AgentTermEvent) -> AdapterResult: + return AdapterResult( + ok=True, + source=self.key, + kind="adapter_result", + body=f"echo: {event.body}", + metadata={"echoed": True}, + ) + + +def make_store(tmp_path) -> EventStore: + return EventStore(tmp_path / "events.sqlite3") + + +def make_pipeline( + tmp_path, + *, + agents=None, + decisions=None, + adapters=None, +) -> tuple[OperatorDispatchPipeline, EventStore]: + store = make_store(tmp_path) + pipeline = OperatorDispatchPipeline( + store=store, + matrix_adapter=MatrixAdapter(), + agent_registry_adapter=AgentRegistryAdapter(InMemoryAgentRegistryBackend(agents=agents or [])), + policy_fabric_adapter=PolicyFabricAdapter(InMemoryPolicyFabricBackend(decisions=decisions or [])), + adapters=adapters or (EchoAdapter(),), + ) + return pipeline, store + + +def test_pipeline_dispatches_successful_event_and_records_snapshot(tmp_path): + pipeline, store = make_pipeline(tmp_path) + try: + event = AgentTermEvent( + channel="!echo", + sender="@operator", + kind="message", + source="echo", + body="hello", + ) + + outcome = pipeline.dispatch(event) + finally: + store.close() + + assert outcome.ok is True + assert outcome.adapter_key == "echo" + assert len(outcome.persisted_events) == 2 + assert outcome.persisted_events[-1].metadata["echoed"] is True + assert "echo: hello" in outcome.snapshot.render_text() + + +def test_pipeline_blocks_unknown_registered_participant(tmp_path): + pipeline, store = make_pipeline(tmp_path) + try: + event = AgentTermEvent( + channel="!codex", + sender="@operator", + kind="agent_message", + source="codex", + body="do work", + ) + + outcome = pipeline.dispatch(event) + finally: + store.close() + + assert outcome.ok is False + assert outcome.blocked_reason == "unknown_agent" + assert len(outcome.persisted_events) == 2 + assert outcome.persisted_events[-1].source == "agent-registry" + assert outcome.persisted_events[-1].metadata["deny_reason"] == "unknown_agent" + + +def test_pipeline_allows_registered_participant_after_registry_gate(tmp_path): + pipeline, store = make_pipeline( + tmp_path, + agents=[ + AgentRegistration( + agent_id="agent.codex", + registry_ref="SocioProphet/agent-registry#agent.codex", + spec_version="v0.1", + ) + ], + adapters=(EchoAdapter(),), + ) + try: + event = AgentTermEvent( + channel="!codex", + sender="@operator", + kind="agent_message", + source="echo", + body="do work", + metadata={"agent_id": "agent.codex"}, + ) + + outcome = pipeline.dispatch(event) + finally: + store.close() + + assert outcome.ok is True + assert [item.source for item in outcome.persisted_events] == ["echo", "agent-registry", "echo"] + assert outcome.persisted_events[1].metadata["registry_status"] == "resolved" + + +def test_pipeline_blocks_policy_denial_before_adapter_dispatch(tmp_path): + pipeline, store = make_pipeline( + tmp_path, + decisions=[ + PolicyDecision( + decision_id="decision-deny", + action="echo.context_pack", + status=DENY, + policy_ref="SocioProphet/policy-fabric#deny", + reason="not authorized", + ) + ], + ) + try: + event = AgentTermEvent( + channel="!echo", + sender="@operator", + kind="context_pack", + source="echo", + body="sensitive", + metadata={"sensitive_context": True, "matrix_encrypted": False}, + ) + + outcome = pipeline.dispatch(event) + finally: + store.close() + + assert outcome.ok is False + assert outcome.blocked_reason == "not authorized" + assert outcome.persisted_events[-1].source == "policy-fabric" + assert outcome.persisted_events[-1].metadata["policy_decision_id"] == "decision-deny" + + +def test_pipeline_blocks_unverified_encrypted_matrix_context_before_policy(tmp_path): + pipeline, store = make_pipeline(tmp_path) + try: + event = AgentTermEvent( + channel="!echo", + sender="@operator", + kind="context_pack", + source="echo", + body="sensitive", + metadata={ + "sensitive_context": True, + "matrix_encrypted": True, + "matrix_e2ee_verified": False, + }, + ) + + outcome = pipeline.dispatch(event) + finally: + store.close() + + assert outcome.ok is False + assert outcome.blocked_reason == "matrix_posture_blocked" + assert outcome.persisted_events[-1].source == "matrix" + assert outcome.persisted_events[-1].metadata["matrix_status"] == "blocked" + + +def test_pipeline_fails_closed_when_no_adapter_matches(tmp_path): + pipeline, store = make_pipeline(tmp_path, adapters=()) + try: + event = AgentTermEvent( + channel="!unknown", + sender="@operator", + kind="message", + source="unknown", + body="hello", + ) + + outcome = pipeline.dispatch(event) + finally: + store.close() + + assert outcome.ok is False + assert outcome.blocked_reason == "no_adapter" + assert outcome.persisted_events[-1].source == "pipeline" + assert outcome.persisted_events[-1].metadata["deny_reason"] == "no_adapter" + + +def test_pipeline_allows_sensitive_context_when_matrix_and_policy_allow(tmp_path): + pipeline, store = make_pipeline( + tmp_path, + decisions=[ + PolicyDecision( + decision_id="decision-allow", + action="echo.context_pack", + status=ALLOW, + policy_ref="SocioProphet/policy-fabric#allow", + ) + ], + ) + try: + event = AgentTermEvent( + channel="!echo", + sender="@operator", + kind="context_pack", + source="echo", + body="sensitive", + metadata={ + "sensitive_context": True, + "matrix_encrypted": True, + "matrix_e2ee_verified": True, + }, + ) + + outcome = pipeline.dispatch(event) + finally: + store.close() + + assert outcome.ok is True + assert [item.source for item in outcome.persisted_events] == [ + "echo", + "matrix", + "policy-fabric", + "echo", + ] + assert outcome.persisted_events[1].metadata["matrix_status"] == "allowed" + assert outcome.persisted_events[2].metadata["admission_status"] == "admitted" + assert outcome.persisted_events[3].metadata["echoed"] is True