From a6aaf3b29c56e78aa3bed69d4404934551f3525d Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 12:03:41 -0400 Subject: [PATCH 1/6] Add Matrix service backend boundary --- src/agent_term/matrix_service.py | 199 +++++++++++++++++++++++++++++++ 1 file changed, 199 insertions(+) create mode 100644 src/agent_term/matrix_service.py diff --git a/src/agent_term/matrix_service.py b/src/agent_term/matrix_service.py new file mode 100644 index 0000000..19644e6 --- /dev/null +++ b/src/agent_term/matrix_service.py @@ -0,0 +1,199 @@ +"""Matrix service backend boundary. + +This module is the first live-service seam for Matrix. It keeps network I/O behind a +small protocol, keeps `matrix-nio` optional, and lets CI exercise Matrix send/sync +behavior without a live homeserver. +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from typing import Any, Protocol + +from agent_term.matrix_adapter import MatrixRoomEvent, normalize_matrix_payload + + +@dataclass(frozen=True) +class MatrixSendRequest: + """A Matrix message send request.""" + + room_id: str + body: str + msgtype: str = "m.text" + thread_root_event_id: str | None = None + txn_id: str | None = None + metadata: dict[str, object] = field(default_factory=dict) + + def content(self) -> dict[str, object]: + content: dict[str, object] = {"msgtype": self.msgtype, "body": self.body} + if self.thread_root_event_id: + content["m.relates_to"] = { + "rel_type": "m.thread", + "event_id": self.thread_root_event_id, + } + return content + + +@dataclass(frozen=True) +class MatrixSendResult: + """Result of a Matrix message send attempt.""" + + ok: bool + room_id: str + event_id: str | None = None + status: str = "sent" + error: str | None = None + metadata: dict[str, object] = field(default_factory=dict) + + def to_metadata(self) -> dict[str, object]: + return { + "matrix_room_id": self.room_id, + "matrix_event_id": self.event_id, + "matrix_send_status": self.status, + "matrix_send_error": self.error, + **self.metadata, + } + + +@dataclass(frozen=True) +class MatrixSyncBatch: + """Normalized Matrix sync batch.""" + + events: tuple[MatrixRoomEvent, ...] + next_batch: str | None = None + metadata: dict[str, object] = field(default_factory=dict) + + +class MatrixServiceBackend(Protocol): + """Backend contract for Matrix service operations.""" + + def send_text(self, request: MatrixSendRequest) -> MatrixSendResult: + """Send a text event into a Matrix room.""" + + def normalize_sync(self, payload: dict[str, Any]) -> MatrixSyncBatch: + """Normalize a Matrix sync payload into AgentTerm MatrixRoomEvents.""" + + +class InMemoryMatrixServiceBackend: + """Offline Matrix backend for tests and local development.""" + + def __init__(self) -> None: + self.sent: list[MatrixSendRequest] = [] + + def send_text(self, request: MatrixSendRequest) -> MatrixSendResult: + self.sent.append(request) + event_id = f"$local-{len(self.sent)}" + return MatrixSendResult( + ok=True, + room_id=request.room_id, + event_id=event_id, + status="sent", + metadata={"local_backend": True, "txn_id": request.txn_id}, + ) + + def normalize_sync(self, payload: dict[str, Any]) -> MatrixSyncBatch: + return normalize_sync_payload(payload) + + +class NioMatrixServiceBackend: + """Optional matrix-nio backed Matrix service backend. + + The class imports `nio` lazily so the base package and CI do not require the + optional Matrix dependency. Callers should use the `matrix` extra to enable it. + """ + + def __init__( + self, + *, + homeserver_url: str, + user_id: str, + access_token: str, + device_name: str | None = None, + ) -> None: + self.homeserver_url = homeserver_url + self.user_id = user_id + self.access_token = access_token + self.device_name = device_name + + def send_text(self, request: MatrixSendRequest) -> MatrixSendResult: + return asyncio.run(self._send_text_async(request)) + + def normalize_sync(self, payload: dict[str, Any]) -> MatrixSyncBatch: + return normalize_sync_payload(payload) + + async def _send_text_async(self, request: MatrixSendRequest) -> MatrixSendResult: + try: + from nio import AsyncClient, RoomSendError, RoomSendResponse + except ImportError as exc: + raise RuntimeError( + "matrix-nio is required for NioMatrixServiceBackend; install agent-term[matrix]" + ) from exc + + client = AsyncClient(self.homeserver_url, self.user_id, device_id=self.device_name) + client.access_token = self.access_token + try: + response = await client.room_send( + room_id=request.room_id, + message_type="m.room.message", + content=request.content(), + txn_id=request.txn_id, + ) + finally: + await client.close() + + if isinstance(response, RoomSendResponse): + return MatrixSendResult( + ok=True, + room_id=request.room_id, + event_id=response.event_id, + status="sent", + ) + if isinstance(response, RoomSendError): + return MatrixSendResult( + ok=False, + room_id=request.room_id, + status="error", + error=response.message, + ) + return MatrixSendResult( + ok=False, + room_id=request.room_id, + status="unknown_response", + error=repr(response), + ) + + +def normalize_sync_payload(payload: dict[str, Any]) -> MatrixSyncBatch: + """Normalize Matrix `/sync`-style payload into MatrixRoomEvents.""" + + events: list[MatrixRoomEvent] = [] + rooms = _dict(payload.get("rooms")) + joined = _dict(rooms.get("join")) + for room_id, room_payload_raw in joined.items(): + room_payload = _dict(room_payload_raw) + timeline = _dict(room_payload.get("timeline")) + for event_raw in _list(timeline.get("events")): + if not isinstance(event_raw, dict): + continue + event_payload = {**event_raw} + event_payload.setdefault("room_id", str(room_id)) + events.append(normalize_matrix_payload(event_payload)) + + return MatrixSyncBatch( + events=tuple(events), + next_batch=_optional_str(payload.get("next_batch")), + metadata={"matrix_sync_event_count": len(events)}, + ) + + +def _dict(value: object) -> dict[str, Any]: + return value if isinstance(value, dict) else {} + + +def _list(value: object) -> list[object]: + return value if isinstance(value, list) else [] + + +def _optional_str(value: object) -> str | None: + return str(value) if value is not None else None From cede8a5de08d662625c529a6a881aafd03e82c8b Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 12:04:45 -0400 Subject: [PATCH 2/6] Add Matrix service adapter and config factory --- src/agent_term/matrix_service.py | 142 ++++++++++++++++++++++++++++++- 1 file changed, 141 insertions(+), 1 deletion(-) diff --git a/src/agent_term/matrix_service.py b/src/agent_term/matrix_service.py index 19644e6..f8a3cc0 100644 --- a/src/agent_term/matrix_service.py +++ b/src/agent_term/matrix_service.py @@ -8,10 +8,14 @@ from __future__ import annotations import asyncio +import os from dataclasses import dataclass, field from typing import Any, Protocol -from agent_term.matrix_adapter import MatrixRoomEvent, normalize_matrix_payload +from agent_term.adapters import AdapterResult +from agent_term.config import AgentTermConfig +from agent_term.events import AgentTermEvent +from agent_term.matrix_adapter import MatrixRoomEvent, normalize_matrix_payload, posture_from_metadata @dataclass(frozen=True) @@ -65,6 +69,10 @@ class MatrixSyncBatch: metadata: dict[str, object] = field(default_factory=dict) +class MatrixServiceConfigError(RuntimeError): + """Raised when Matrix service config is insufficient for a live backend.""" + + class MatrixServiceBackend(Protocol): """Backend contract for Matrix service operations.""" @@ -164,6 +172,138 @@ async def _send_text_async(self, request: MatrixSendRequest) -> MatrixSendResult ) +class MatrixServiceAdapter: + """Adapter that performs Matrix service send/sync operations through a backend.""" + + key = "matrix-service" + + def __init__(self, backend: MatrixServiceBackend) -> None: + self.backend = backend + + def supports(self, event: AgentTermEvent) -> bool: + return event.source == self.key or event.kind in {"matrix_service_send", "matrix_sync"} + + def handle(self, event: AgentTermEvent) -> AdapterResult: + if event.kind == "matrix_service_send": + return self._send(event) + if event.kind == "matrix_sync": + return self._sync(event) + return AdapterResult( + ok=False, + source=self.key, + body=f"Unsupported Matrix service event kind: {event.kind}", + metadata={"matrix_service_status": "unsupported_kind", "fail_closed": True}, + ) + + def _send(self, event: AgentTermEvent) -> AdapterResult: + posture = posture_from_metadata(event.metadata) + if event.metadata.get("sensitive_context") and not posture.can_release_sensitive_context: + return AdapterResult( + ok=False, + source=self.key, + body="Matrix service send blocked by E2EE posture", + kind="matrix_service_send", + metadata={ + "request_event_id": event.event_id, + "matrix_service_status": "blocked", + "deny_reason": "matrix_posture_blocked", + "fail_closed": True, + **posture.to_metadata(), + }, + ) + + room_id = _optional_str(event.metadata.get("matrix_room_id") or event.channel) + if not room_id: + return AdapterResult( + ok=False, + source=self.key, + body="Matrix service send blocked: missing room ID", + kind="matrix_service_send", + metadata={"deny_reason": "missing_matrix_room_id", "fail_closed": True}, + ) + + result = self.backend.send_text( + MatrixSendRequest( + room_id=room_id, + body=event.body, + msgtype=str(event.metadata.get("msgtype") or "m.text"), + thread_root_event_id=_optional_str( + event.metadata.get("matrix_thread_root_event_id") or event.thread_id + ), + txn_id=_optional_str(event.metadata.get("txn_id")), + metadata={"request_event_id": event.event_id}, + ) + ) + return AdapterResult( + ok=result.ok, + source=self.key, + body=f"Matrix service send {result.status}: {room_id}", + kind="matrix_service_send", + metadata={ + "request_event_id": event.event_id, + "matrix_service_status": result.status, + **result.to_metadata(), + **posture.to_metadata(), + }, + ) + + def _sync(self, event: AgentTermEvent) -> AdapterResult: + payload = event.metadata.get("matrix_sync") or event.metadata.get("payload") + if not isinstance(payload, dict): + return AdapterResult( + ok=False, + source=self.key, + body="Matrix service sync blocked: missing sync payload", + kind="matrix_sync", + metadata={"deny_reason": "missing_sync_payload", "fail_closed": True}, + ) + batch = self.backend.normalize_sync(payload) + return AdapterResult( + ok=True, + source=self.key, + body=f"Matrix service normalized {len(batch.events)} sync events", + kind="matrix_sync", + metadata={ + "request_event_id": event.event_id, + "matrix_service_status": "synced", + "matrix_sync_event_count": len(batch.events), + "matrix_next_batch": batch.next_batch, + "matrix_events": [matrix_event.to_metadata() for matrix_event in batch.events], + }, + ) + + +def build_matrix_service_backend( + config: AgentTermConfig, + *, + access_token_env: str = "AGENT_TERM_MATRIX_ACCESS_TOKEN", +) -> MatrixServiceBackend: + """Build a Matrix backend from AgentTerm config. + + Disabled Matrix config returns the offline backend. Enabled Matrix config requires + homeserver URL, user ID, and an access token from the environment. We avoid storing + access tokens in JSON config. + """ + + if not config.matrix.enabled: + return InMemoryMatrixServiceBackend() + + token = os.environ.get(access_token_env) + if not token: + raise MatrixServiceConfigError(f"missing Matrix access token env var: {access_token_env}") + if not config.matrix.homeserver_url: + raise MatrixServiceConfigError("missing matrix.homeserverUrl") + if not config.matrix.user_id: + raise MatrixServiceConfigError("missing matrix.userId") + + return NioMatrixServiceBackend( + homeserver_url=config.matrix.homeserver_url, + user_id=config.matrix.user_id, + access_token=token, + device_name=config.matrix.device_name, + ) + + def normalize_sync_payload(payload: dict[str, Any]) -> MatrixSyncBatch: """Normalize Matrix `/sync`-style payload into MatrixRoomEvents.""" From 9ce04efde3db86933975a192ce3b601467b3d0c8 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 12:05:39 -0400 Subject: [PATCH 3/6] Treat Matrix service send as side-effecting policy-gated action --- src/agent_term/policy_fabric.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/agent_term/policy_fabric.py b/src/agent_term/policy_fabric.py index 729b3f7..9691e3b 100644 --- a/src/agent_term/policy_fabric.py +++ b/src/agent_term/policy_fabric.py @@ -40,6 +40,7 @@ "ci_retry", "tool_grant", "revocation", + "matrix_service_send", } ) From 98aa470a32d89f756cc5ab946a8b4ef554aef62d Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 12:07:21 -0400 Subject: [PATCH 4/6] Wire Matrix service adapter into dispatch CLI --- src/agent_term/dispatch_cli.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/agent_term/dispatch_cli.py b/src/agent_term/dispatch_cli.py index 19d9902..5afe562 100644 --- a/src/agent_term/dispatch_cli.py +++ b/src/agent_term/dispatch_cli.py @@ -28,6 +28,7 @@ SlashTopicsAdapter, ) from agent_term.matrix_adapter import MatrixAdapter +from agent_term.matrix_service import MatrixServiceAdapter, build_matrix_service_backend from agent_term.participants import InMemoryParticipantBackend, RegisteredParticipantAdapter from agent_term.pipeline import OperatorDispatchPipeline from agent_term.policy_fabric import ( @@ -186,6 +187,7 @@ def build_pipeline( adapters = ( MatrixAdapter(), + MatrixServiceAdapter(build_matrix_service_backend(config)), CloudShellFogAdapter(InMemoryCloudShellFogBackend()), AgentPlaneAdapter(InMemoryAgentPlaneBackend()), SociosphereAdapter(InMemorySociosphereBackend()), From 9661b56d79c8e61713b9de3dcc91ca79585e4f3b Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 12:08:19 -0400 Subject: [PATCH 5/6] Add Matrix service backend tests --- tests/test_matrix_service.py | 206 +++++++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 tests/test_matrix_service.py diff --git a/tests/test_matrix_service.py b/tests/test_matrix_service.py new file mode 100644 index 0000000..dd6952f --- /dev/null +++ b/tests/test_matrix_service.py @@ -0,0 +1,206 @@ +import pytest + +from agent_term.config import config_from_dict +from agent_term.events import AgentTermEvent +from agent_term.matrix_service import ( + InMemoryMatrixServiceBackend, + MatrixSendRequest, + MatrixServiceAdapter, + MatrixServiceConfigError, + NioMatrixServiceBackend, + build_matrix_service_backend, + normalize_sync_payload, +) + + +def test_matrix_send_request_content_preserves_thread_root(): + request = MatrixSendRequest( + room_id="!room:example.org", + body="hello", + thread_root_event_id="$root", + ) + + assert request.content() == { + "msgtype": "m.text", + "body": "hello", + "m.relates_to": {"rel_type": "m.thread", "event_id": "$root"}, + } + + +def test_in_memory_matrix_backend_sends_text(): + backend = InMemoryMatrixServiceBackend() + request = MatrixSendRequest(room_id="!room:example.org", body="hello", txn_id="txn-1") + + result = backend.send_text(request) + + assert result.ok is True + assert result.room_id == "!room:example.org" + assert result.event_id == "$local-1" + assert result.metadata["txn_id"] == "txn-1" + assert backend.sent == [request] + + +def test_normalize_sync_payload_preserves_room_event_metadata(): + batch = normalize_sync_payload( + { + "next_batch": "batch-2", + "rooms": { + "join": { + "!room:example.org": { + "timeline": { + "events": [ + { + "event_id": "$event1", + "sender": "@operator:example.org", + "type": "m.room.message", + "content": { + "body": "hello", + "m.relates_to": { + "rel_type": "m.thread", + "event_id": "$root", + }, + }, + } + ] + } + } + } + }, + } + ) + + assert batch.next_batch == "batch-2" + assert batch.metadata["matrix_sync_event_count"] == 1 + assert batch.events[0].room_id == "!room:example.org" + assert batch.events[0].event_id == "$event1" + assert batch.events[0].thread_root_event_id == "$root" + + +def test_matrix_service_adapter_send_uses_backend_and_preserves_metadata(): + backend = InMemoryMatrixServiceBackend() + adapter = MatrixServiceAdapter(backend) + event = AgentTermEvent( + channel="!room:example.org", + sender="@operator", + kind="matrix_service_send", + source="matrix-service", + body="hello", + thread_id="$root", + metadata={"txn_id": "txn-1"}, + ) + + result = adapter.handle(event) + + assert result.ok is True + assert result.kind == "matrix_service_send" + assert result.metadata["matrix_service_status"] == "sent" + assert result.metadata["matrix_event_id"] == "$local-1" + assert backend.sent[0].thread_root_event_id == "$root" + assert backend.sent[0].txn_id == "txn-1" + + +def test_matrix_service_adapter_blocks_unverified_encrypted_sensitive_send(): + adapter = MatrixServiceAdapter(InMemoryMatrixServiceBackend()) + event = AgentTermEvent( + channel="!room:example.org", + sender="@operator", + kind="matrix_service_send", + source="matrix-service", + body="sensitive", + metadata={ + "sensitive_context": True, + "matrix_encrypted": True, + "matrix_e2ee_verified": False, + }, + ) + + result = adapter.handle(event) + + assert result.ok is False + assert result.metadata["deny_reason"] == "matrix_posture_blocked" + assert result.metadata["fail_closed"] is True + + +def test_matrix_service_adapter_sync_normalizes_payload(): + adapter = MatrixServiceAdapter(InMemoryMatrixServiceBackend()) + event = AgentTermEvent( + channel="!room:example.org", + sender="@operator", + kind="matrix_sync", + source="matrix-service", + body="sync", + metadata={ + "matrix_sync": { + "next_batch": "batch-1", + "rooms": { + "join": { + "!room:example.org": { + "timeline": { + "events": [ + { + "event_id": "$event1", + "sender": "@operator:example.org", + "type": "m.room.member", + "content": {"membership": "join"}, + } + ] + } + } + } + }, + } + }, + ) + + result = adapter.handle(event) + + assert result.ok is True + assert result.metadata["matrix_sync_event_count"] == 1 + assert result.metadata["matrix_next_batch"] == "batch-1" + assert result.metadata["matrix_events"][0]["matrix_membership"] == "join" + + +def test_build_matrix_backend_defaults_to_in_memory_when_disabled(): + config = config_from_dict({"matrix": {"enabled": False}}) + + backend = build_matrix_service_backend(config) + + assert isinstance(backend, InMemoryMatrixServiceBackend) + + +def test_build_matrix_backend_requires_token_when_enabled(monkeypatch): + monkeypatch.delenv("AGENT_TERM_MATRIX_ACCESS_TOKEN", raising=False) + config = config_from_dict( + { + "matrix": { + "enabled": True, + "homeserverUrl": "https://matrix.example.org", + "userId": "@agent-term:example.org", + } + } + ) + + with pytest.raises(MatrixServiceConfigError, match="missing Matrix access token"): + build_matrix_service_backend(config) + + +def test_build_matrix_backend_returns_nio_backend_when_enabled_with_env(monkeypatch): + monkeypatch.setenv("AGENT_TERM_MATRIX_ACCESS_TOKEN", "token-redacted") + config = config_from_dict( + { + "matrix": { + "enabled": True, + "homeserverUrl": "https://matrix.example.org", + "userId": "@agent-term:example.org", + "deviceName": "agent-term-ci", + } + } + ) + + backend = build_matrix_service_backend(config) + + assert isinstance(backend, NioMatrixServiceBackend) + assert backend.homeserver_url == "https://matrix.example.org" + assert backend.user_id == "@agent-term:example.org" + assert backend.access_token == "token-redacted" + assert backend.device_name == "agent-term-ci" From fb42e8ca2efb2a002510189533891ca9516c8777 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 12:09:10 -0400 Subject: [PATCH 6/6] Add dispatch CLI Matrix service send test --- tests/test_dispatch_cli.py | 39 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/test_dispatch_cli.py b/tests/test_dispatch_cli.py index c8f95fb..3d3343d 100644 --- a/tests/test_dispatch_cli.py +++ b/tests/test_dispatch_cli.py @@ -39,6 +39,45 @@ def test_dispatch_cli_success_persists_events_and_snapshot(tmp_path, capsys): assert [event.source for event in events] == ["memory-mesh", "policy-fabric", "memory-mesh"] +def test_dispatch_cli_matrix_service_send_is_policy_gated_and_persisted(tmp_path, capsys): + db_path = tmp_path / "events.sqlite3" + + exit_code = main( + [ + "matrix-service", + "matrix_service_send", + "!room:example.org", + "Hello Matrix", + "--db", + str(db_path), + "--policy-action", + "matrix-service.matrix_service_send", + "--allow-policy", + "matrix-service.matrix_service_send", + "--metadata-json", + '{"txn_id":"txn-1"}', + ] + ) + + captured = capsys.readouterr() + assert exit_code == 0 + assert "dispatch_status=ok" in captured.out + assert "adapter=matrix-service" in captured.out + + store = EventStore(db_path) + try: + events = store.tail(limit=10) + finally: + store.close() + assert [event.source for event in events] == [ + "matrix-service", + "policy-fabric", + "matrix-service", + ] + assert events[-1].metadata["matrix_service_status"] == "sent" + assert events[-1].metadata["matrix_event_id"] == "$local-1" + + def test_dispatch_cli_uses_config_event_store_and_local_runtime_fixtures(tmp_path, capsys): db_path = tmp_path / "configured-events.sqlite3" config_path = tmp_path / "agent-term.json"