From 073c0173b5ec0fafd84d44f5683c740c17e90703 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 16:40:36 -0400 Subject: [PATCH 1/5] Add Matrix helper CLI --- src/agent_term/matrix_cli.py | 147 +++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 src/agent_term/matrix_cli.py diff --git a/src/agent_term/matrix_cli.py b/src/agent_term/matrix_cli.py new file mode 100644 index 0000000..0108b36 --- /dev/null +++ b/src/agent_term/matrix_cli.py @@ -0,0 +1,147 @@ +"""CLI helpers for Matrix send and sync normalization workflows.""" + +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path +from typing import Any + +from agent_term.config import load_config +from agent_term.dispatch_cli import build_pipeline +from agent_term.events import AgentTermEvent +from agent_term.matrix_service import normalize_sync_payload +from agent_term.store import DEFAULT_DB_PATH, EventStore + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="agent-term-matrix", + description="Matrix helper workflows for AgentTerm send and sync normalization.", + ) + parser.add_argument("--config", help="Optional AgentTerm JSON config path.") + parser.add_argument("--db", help="Path to local AgentTerm SQLite event log.") + subparsers = parser.add_subparsers(dest="command", required=True) + + send = subparsers.add_parser("send", help="Send a Matrix message through the AgentTerm dispatch pipeline.") + send.add_argument("room") + send.add_argument("body") + send.add_argument("--sender", default="@operator") + send.add_argument("--thread-id") + send.add_argument("--txn-id") + send.add_argument("--msgtype", default="m.text") + send.add_argument("--policy-action", default="matrix-service.matrix_service_send") + send.add_argument("--allow-policy", action="append", default=[]) + send.add_argument("--sensitive-context", action="store_true") + send.add_argument("--matrix-encrypted", action="store_true") + send.add_argument("--matrix-verified", action="store_true") + send.add_argument("--show-snapshot", action="store_true") + + sync = subparsers.add_parser("normalize-sync", help="Normalize a Matrix /sync JSON payload.") + sync.add_argument("payload", help="Path to a Matrix sync payload JSON file, or '-' for stdin.") + sync.add_argument("--persist", action="store_true", help="Persist normalized events into EventStore.") + sync.add_argument("--sender", default="@agent-term") + sync.add_argument("--channel", default="!matrix-sync") + + return parser + + +def cmd_send(args: argparse.Namespace) -> int: + config = load_config(args.config) + db_path = Path(args.db or config.event_store.path or DEFAULT_DB_PATH) + metadata: dict[str, object] = { + "matrix_room_id": args.room, + "msgtype": args.msgtype, + "policy_action": args.policy_action, + } + if args.txn_id: + metadata["txn_id"] = args.txn_id + if args.sensitive_context: + metadata["sensitive_context"] = True + if args.matrix_encrypted: + metadata["matrix_encrypted"] = True + metadata["matrix_e2ee_verified"] = bool(args.matrix_verified) + + event = AgentTermEvent( + channel=args.room, + sender=args.sender, + kind="matrix_service_send", + source="matrix-service", + body=args.body, + thread_id=args.thread_id, + metadata=metadata, + ) + + store = EventStore(db_path) + try: + dispatch_args = argparse.Namespace( + agent_id=None, + register_agent=[], + grant=[], + allow_policy=args.allow_policy, + deny_policy=[], + pending_policy=[], + policy_action=args.policy_action, + policy_ref="local://policy-fabric/matrix-cli", + ) + outcome = build_pipeline(dispatch_args, event, store, config).dispatch(event) + status = "ok" if outcome.ok else "blocked" + print(f"matrix_send_status={status}") + if outcome.blocked_reason: + print(f"blocked_reason={outcome.blocked_reason}") + print(f"persisted_events={len(outcome.persisted_events)}") + if args.show_snapshot: + print(outcome.snapshot.render_text()) + return 0 if outcome.ok else 1 + finally: + store.close() + + +def cmd_normalize_sync(args: argparse.Namespace) -> int: + config = load_config(args.config) + db_path = Path(args.db or config.event_store.path or DEFAULT_DB_PATH) + payload = _load_json_payload(args.payload) + batch = normalize_sync_payload(payload) + + print(f"matrix_sync_events={len(batch.events)}") + if batch.next_batch: + print(f"matrix_next_batch={batch.next_batch}") + + events = [matrix_event.to_agentterm_event() for matrix_event in batch.events] + if args.persist: + store = EventStore(db_path) + try: + for event in events: + store.append(event) + finally: + store.close() + print(f"persisted_events={len(events)}") + + for event in events: + print(f"{event.channel}\t{event.sender}\t{event.kind}\t{event.body}") + return 0 + + +def _load_json_payload(path: str) -> dict[str, Any]: + if path == "-": + raw = sys.stdin.read() + else: + raw = Path(path).read_text(encoding="utf-8") + value = json.loads(raw) + if not isinstance(value, dict): + raise SystemExit("Matrix sync payload must be a JSON object") + return value + + +def main(argv: list[str] | None = None) -> int: + args = build_parser().parse_args(argv) + if args.command == "send": + return cmd_send(args) + if args.command == "normalize-sync": + return cmd_normalize_sync(args) + raise SystemExit(f"unknown command: {args.command}") + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:])) From 9caa7d1e20e92ff434ae7f604f0d8277c4118284 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 16:43:41 -0400 Subject: [PATCH 2/5] Add matrix helper console script --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index f9d85a1..01cd14b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ matrix = [ [project.scripts] agent-term = "agent_term.cli:main" agent-term-dispatch = "agent_term.dispatch_cli:main" +agent-term-matrix = "agent_term.matrix_cli:main" agent-term-snapshot = "agent_term.snapshot_cli:main" [tool.setuptools.packages.find] From b14574a6088be5b5e6bdfe40c03137ecc2b69cec Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 17:31:01 -0400 Subject: [PATCH 3/5] Add Matrix helper CLI tests --- tests/test_matrix_cli.py | 148 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 tests/test_matrix_cli.py diff --git a/tests/test_matrix_cli.py b/tests/test_matrix_cli.py new file mode 100644 index 0000000..37885d3 --- /dev/null +++ b/tests/test_matrix_cli.py @@ -0,0 +1,148 @@ +import json + +from agent_term.matrix_cli import main +from agent_term.store import EventStore + + +def test_matrix_cli_send_dispatches_policy_admitted_message(tmp_path, capsys): + db_path = tmp_path / "events.sqlite3" + + exit_code = main( + [ + "--db", + str(db_path), + "send", + "!room:example.org", + "Hello Matrix", + "--allow-policy", + "matrix-service.matrix_service_send", + "--txn-id", + "txn-1", + ] + ) + + captured = capsys.readouterr() + assert exit_code == 0 + assert "matrix_send_status=ok" in captured.out + assert "persisted_events=3" in captured.out + + store = EventStore(db_path) + try: + events = store.tail(limit=10) + finally: + store.close() + assert [event.source for event in events] == [ + "matrix-service", + "policy-fabric", + "matrix-service", + ] + assert events[-1].metadata["matrix_send_status"] == "sent" + assert events[-1].metadata["matrix_event_id"] == "$local-1" + + +def test_matrix_cli_send_blocks_unverified_encrypted_sensitive_context(tmp_path, capsys): + db_path = tmp_path / "events.sqlite3" + + exit_code = main( + [ + "--db", + str(db_path), + "send", + "!room:example.org", + "Sensitive Matrix", + "--sensitive-context", + "--matrix-encrypted", + ] + ) + + captured = capsys.readouterr() + assert exit_code == 1 + assert "matrix_send_status=blocked" in captured.out + assert "blocked_reason=matrix_posture_blocked" in captured.out + + store = EventStore(db_path) + try: + events = store.tail(limit=10) + finally: + store.close() + assert events[-1].source == "matrix" + assert events[-1].metadata["matrix_status"] == "blocked" + + +def test_matrix_cli_normalize_sync_prints_events(tmp_path, capsys): + payload_path = tmp_path / "sync.json" + payload_path.write_text( + json.dumps( + { + "next_batch": "batch-2", + "rooms": { + "join": { + "!room:example.org": { + "timeline": { + "events": [ + { + "event_id": "$event1", + "sender": "@operator:example.org", + "type": "m.room.message", + "content": {"body": "hello"}, + } + ] + } + } + } + }, + } + ), + encoding="utf-8", + ) + + exit_code = main(["normalize-sync", str(payload_path)]) + + captured = capsys.readouterr() + assert exit_code == 0 + assert "matrix_sync_events=1" in captured.out + assert "matrix_next_batch=batch-2" in captured.out + assert "!room:example.org\t@operator:example.org\tmatrix_room_event\thello" in captured.out + + +def test_matrix_cli_normalize_sync_persists_events(tmp_path, capsys): + db_path = tmp_path / "events.sqlite3" + payload_path = tmp_path / "sync.json" + payload_path.write_text( + json.dumps( + { + "rooms": { + "join": { + "!room:example.org": { + "timeline": { + "events": [ + { + "event_id": "$member", + "sender": "@user:example.org", + "type": "m.room.member", + "content": {"membership": "join"}, + } + ] + } + } + } + } + } + ), + encoding="utf-8", + ) + + exit_code = main(["--db", str(db_path), "normalize-sync", str(payload_path), "--persist"]) + + captured = capsys.readouterr() + assert exit_code == 0 + assert "persisted_events=1" in captured.out + + store = EventStore(db_path) + try: + events = store.tail(limit=10) + finally: + store.close() + assert len(events) == 1 + assert events[0].kind == "matrix_membership" + assert events[0].metadata["matrix_membership"] == "join" From 5092e042bf15c6435f9c541de680b438fd85a6e5 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 17:43:14 -0400 Subject: [PATCH 4/5] Include source in matrix CLI dispatch namespace --- src/agent_term/matrix_cli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/agent_term/matrix_cli.py b/src/agent_term/matrix_cli.py index 0108b36..fc36582 100644 --- a/src/agent_term/matrix_cli.py +++ b/src/agent_term/matrix_cli.py @@ -76,6 +76,7 @@ def cmd_send(args: argparse.Namespace) -> int: store = EventStore(db_path) try: dispatch_args = argparse.Namespace( + source="matrix-service", agent_id=None, register_agent=[], grant=[], From 6e75e7e941135c91cdf473fd6c955490f81be4df Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Sat, 2 May 2026 17:50:17 -0400 Subject: [PATCH 5/5] Include sensitive_context in matrix CLI dispatch namespace --- src/agent_term/matrix_cli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/agent_term/matrix_cli.py b/src/agent_term/matrix_cli.py index fc36582..56a2852 100644 --- a/src/agent_term/matrix_cli.py +++ b/src/agent_term/matrix_cli.py @@ -85,6 +85,7 @@ def cmd_send(args: argparse.Namespace) -> int: pending_policy=[], policy_action=args.policy_action, policy_ref="local://policy-fabric/matrix-cli", + sensitive_context=args.sensitive_context, ) outcome = build_pipeline(dispatch_args, event, store, config).dispatch(event) status = "ok" if outcome.ok else "blocked"