diff --git a/src/agent_term/matrix_cli.py b/src/agent_term/matrix_cli.py index 56a2852..e821276 100644 --- a/src/agent_term/matrix_cli.py +++ b/src/agent_term/matrix_cli.py @@ -12,6 +12,8 @@ from agent_term.dispatch_cli import build_pipeline from agent_term.events import AgentTermEvent from agent_term.matrix_service import normalize_sync_payload +from agent_term.matrix_state import DEFAULT_STATE_PATH, MatrixStateStore, resolve_matrix_room +from agent_term.matrix_state import rooms_from_sync_payload from agent_term.store import DEFAULT_DB_PATH, EventStore @@ -22,6 +24,7 @@ def build_parser() -> argparse.ArgumentParser: ) parser.add_argument("--config", help="Optional AgentTerm JSON config path.") parser.add_argument("--db", help="Path to local AgentTerm SQLite event log.") + parser.add_argument("--state", default=str(DEFAULT_STATE_PATH), help="Path to Matrix sync state JSON.") subparsers = parser.add_subparsers(dest="command", required=True) send = subparsers.add_parser("send", help="Send a Matrix message through the AgentTerm dispatch pipeline.") @@ -41,17 +44,25 @@ def build_parser() -> argparse.ArgumentParser: sync = subparsers.add_parser("normalize-sync", help="Normalize a Matrix /sync JSON payload.") sync.add_argument("payload", help="Path to a Matrix sync payload JSON file, or '-' for stdin.") sync.add_argument("--persist", action="store_true", help="Persist normalized events into EventStore.") + sync.add_argument("--save-state", action="store_true", help="Persist next_batch and room IDs into Matrix state.") sync.add_argument("--sender", default="@agent-term") sync.add_argument("--channel", default="!matrix-sync") + state = subparsers.add_parser("state", help="Show durable Matrix sync state.") + state.add_argument("--json", action="store_true", help="Print state as JSON.") + return parser def cmd_send(args: argparse.Namespace) -> int: config = load_config(args.config) db_path = Path(args.db or config.event_store.path or DEFAULT_DB_PATH) + state_store = MatrixStateStore(args.state) + state = state_store.load() + room_id = resolve_matrix_room(args.room, config, state) metadata: dict[str, object] = { - "matrix_room_id": args.room, + "matrix_room_id": room_id, + "matrix_room_alias": args.room if args.room != room_id else None, "msgtype": args.msgtype, "policy_action": args.policy_action, } @@ -64,7 +75,7 @@ def cmd_send(args: argparse.Namespace) -> int: metadata["matrix_e2ee_verified"] = bool(args.matrix_verified) event = AgentTermEvent( - channel=args.room, + channel=room_id, sender=args.sender, kind="matrix_service_send", source="matrix-service", @@ -90,6 +101,7 @@ def cmd_send(args: argparse.Namespace) -> int: outcome = build_pipeline(dispatch_args, event, store, config).dispatch(event) status = "ok" if outcome.ok else "blocked" print(f"matrix_send_status={status}") + print(f"matrix_room_id={room_id}") if outcome.blocked_reason: print(f"blocked_reason={outcome.blocked_reason}") print(f"persisted_events={len(outcome.persisted_events)}") @@ -120,11 +132,30 @@ def cmd_normalize_sync(args: argparse.Namespace) -> int: store.close() print(f"persisted_events={len(events)}") + if args.save_state: + state_store = MatrixStateStore(args.state) + state = state_store.update_rooms(rooms_from_sync_payload(payload)) + state = state_store.save(state.with_next_batch(batch.next_batch)) + print(f"matrix_state_next_batch={state.next_batch}") + print(f"matrix_state_rooms={len(state.rooms)}") + for event in events: print(f"{event.channel}\t{event.sender}\t{event.kind}\t{event.body}") return 0 +def cmd_state(args: argparse.Namespace) -> int: + state = MatrixStateStore(args.state).load() + if args.json: + print(json.dumps(state.to_dict(), indent=2, sort_keys=True)) + return 0 + print(f"matrix_state_next_batch={state.next_batch or ''}") + print(f"matrix_state_rooms={len(state.rooms)}") + for alias, room_id in sorted(state.rooms.items()): + print(f"{alias}\t{room_id}") + return 0 + + def _load_json_payload(path: str) -> dict[str, Any]: if path == "-": raw = sys.stdin.read() @@ -142,6 +173,8 @@ def main(argv: list[str] | None = None) -> int: return cmd_send(args) if args.command == "normalize-sync": return cmd_normalize_sync(args) + if args.command == "state": + return cmd_state(args) raise SystemExit(f"unknown command: {args.command}") diff --git a/src/agent_term/matrix_state.py b/src/agent_term/matrix_state.py new file mode 100644 index 0000000..9ed5d28 --- /dev/null +++ b/src/agent_term/matrix_state.py @@ -0,0 +1,119 @@ +"""Durable Matrix sync state and room resolution helpers.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from datetime import UTC, datetime +from pathlib import Path + +from agent_term.config import AgentTermConfig + + +DEFAULT_STATE_PATH = Path(".agent-term/matrix-state.json") + + +@dataclass(frozen=True) +class MatrixSyncState: + """Durable sync cursor and room-alias state for Matrix workflows.""" + + next_batch: str | None = None + rooms: dict[str, str] = field(default_factory=dict) + updated_at: str | None = None + + def to_dict(self) -> dict[str, object]: + return { + "next_batch": self.next_batch, + "rooms": self.rooms, + "updated_at": self.updated_at, + } + + @classmethod + def from_dict(cls, value: dict[str, object]) -> "MatrixSyncState": + rooms_raw = value.get("rooms") + rooms = rooms_raw if isinstance(rooms_raw, dict) else {} + return cls( + next_batch=_optional_str(value.get("next_batch")), + rooms={str(key): str(room_id) for key, room_id in rooms.items()}, + updated_at=_optional_str(value.get("updated_at")), + ) + + def with_next_batch(self, next_batch: str | None) -> "MatrixSyncState": + if next_batch is None: + return self + return MatrixSyncState( + next_batch=next_batch, + rooms=self.rooms, + updated_at=datetime.now(UTC).isoformat(), + ) + + def with_rooms(self, rooms: dict[str, str]) -> "MatrixSyncState": + merged = {**self.rooms, **rooms} + return MatrixSyncState( + next_batch=self.next_batch, + rooms=merged, + updated_at=datetime.now(UTC).isoformat(), + ) + + +class MatrixStateStore: + """JSON-backed Matrix sync state store.""" + + def __init__(self, path: Path | str = DEFAULT_STATE_PATH) -> None: + self.path = Path(path) + + def load(self) -> MatrixSyncState: + if not self.path.exists(): + return MatrixSyncState() + with self.path.open("r", encoding="utf-8") as handle: + raw = json.load(handle) + if not isinstance(raw, dict): + raise ValueError("Matrix state file must contain a JSON object") + return MatrixSyncState.from_dict(raw) + + def save(self, state: MatrixSyncState) -> MatrixSyncState: + self.path.parent.mkdir(parents=True, exist_ok=True) + with self.path.open("w", encoding="utf-8") as handle: + json.dump(state.to_dict(), handle, indent=2, sort_keys=True) + handle.write("\n") + return state + + def update_next_batch(self, next_batch: str | None) -> MatrixSyncState: + state = self.load().with_next_batch(next_batch) + return self.save(state) + + def update_rooms(self, rooms: dict[str, str]) -> MatrixSyncState: + state = self.load().with_rooms(rooms) + return self.save(state) + + +def resolve_matrix_room(room: str, config: AgentTermConfig, state: MatrixSyncState) -> str: + """Resolve a room alias/key to a Matrix room ID when possible.""" + + if room in state.rooms: + return state.rooms[room] + if room in config.matrix.rooms: + return config.matrix.rooms[room] + for alias, room_id in config.matrix.rooms.items(): + if room == alias or room == room_id: + return room_id + return room + + +def rooms_from_sync_payload(payload: dict[str, object]) -> dict[str, str]: + """Extract room ID mappings from a Matrix sync payload.""" + + rooms: dict[str, str] = {} + rooms_raw = payload.get("rooms") + if not isinstance(rooms_raw, dict): + return rooms + joined = rooms_raw.get("join") + if not isinstance(joined, dict): + return rooms + for room_id in joined: + rooms[str(room_id)] = str(room_id) + return rooms + + +def _optional_str(value: object) -> str | None: + return str(value) if value is not None else None diff --git a/tests/test_matrix_cli.py b/tests/test_matrix_cli.py index 37885d3..90d4e5a 100644 --- a/tests/test_matrix_cli.py +++ b/tests/test_matrix_cli.py @@ -24,6 +24,7 @@ def test_matrix_cli_send_dispatches_policy_admitted_message(tmp_path, capsys): captured = capsys.readouterr() assert exit_code == 0 assert "matrix_send_status=ok" in captured.out + assert "matrix_room_id=!room:example.org" in captured.out assert "persisted_events=3" in captured.out store = EventStore(db_path) @@ -40,6 +41,35 @@ def test_matrix_cli_send_dispatches_policy_admitted_message(tmp_path, capsys): assert events[-1].metadata["matrix_event_id"] == "$local-1" +def test_matrix_cli_send_resolves_room_from_config(tmp_path, capsys): + db_path = tmp_path / "events.sqlite3" + config_path = tmp_path / "agent-term.json" + config_path.write_text( + json.dumps( + { + "eventStore": {"driver": "sqlite", "path": str(db_path)}, + "matrix": {"rooms": {"sourceosOps": "!sourceos-ops:example.org"}}, + "localRuntime": {"allowPolicies": ["matrix-service.matrix_service_send"]}, + } + ), + encoding="utf-8", + ) + + exit_code = main(["--config", str(config_path), "send", "sourceosOps", "Hello"]) + + captured = capsys.readouterr() + assert exit_code == 0 + assert "matrix_room_id=!sourceos-ops:example.org" in captured.out + + store = EventStore(db_path) + try: + events = store.tail(limit=10) + finally: + store.close() + assert events[0].channel == "!sourceos-ops:example.org" + assert events[0].metadata["matrix_room_alias"] == "sourceosOps" + + def test_matrix_cli_send_blocks_unverified_encrypted_sensitive_context(tmp_path, capsys): db_path = tmp_path / "events.sqlite3" @@ -146,3 +176,69 @@ def test_matrix_cli_normalize_sync_persists_events(tmp_path, capsys): assert len(events) == 1 assert events[0].kind == "matrix_membership" assert events[0].metadata["matrix_membership"] == "join" + + +def test_matrix_cli_normalize_sync_saves_state(tmp_path, capsys): + db_path = tmp_path / "events.sqlite3" + state_path = tmp_path / "matrix-state.json" + payload_path = tmp_path / "sync.json" + payload_path.write_text( + json.dumps( + { + "next_batch": "batch-3", + "rooms": { + "join": { + "!room:example.org": { + "timeline": { + "events": [ + { + "event_id": "$event1", + "sender": "@operator:example.org", + "type": "m.room.message", + "content": {"body": "hello"}, + } + ] + } + } + } + }, + } + ), + encoding="utf-8", + ) + + exit_code = main( + [ + "--db", + str(db_path), + "--state", + str(state_path), + "normalize-sync", + str(payload_path), + "--save-state", + ] + ) + + captured = capsys.readouterr() + assert exit_code == 0 + assert "matrix_state_next_batch=batch-3" in captured.out + assert "matrix_state_rooms=1" in captured.out + state = json.loads(state_path.read_text(encoding="utf-8")) + assert state["next_batch"] == "batch-3" + assert state["rooms"] == {"!room:example.org": "!room:example.org"} + + +def test_matrix_cli_state_prints_state(tmp_path, capsys): + state_path = tmp_path / "matrix-state.json" + state_path.write_text( + json.dumps({"next_batch": "batch-4", "rooms": {"sourceosOps": "!room"}}), + encoding="utf-8", + ) + + exit_code = main(["--state", str(state_path), "state"]) + + captured = capsys.readouterr() + assert exit_code == 0 + assert "matrix_state_next_batch=batch-4" in captured.out + assert "matrix_state_rooms=1" in captured.out + assert "sourceosOps\t!room" in captured.out diff --git a/tests/test_matrix_state.py b/tests/test_matrix_state.py new file mode 100644 index 0000000..c7cc117 --- /dev/null +++ b/tests/test_matrix_state.py @@ -0,0 +1,69 @@ +import json + +from agent_term.config import config_from_dict +from agent_term.matrix_state import MatrixStateStore, MatrixSyncState, resolve_matrix_room +from agent_term.matrix_state import rooms_from_sync_payload + + +def test_matrix_state_store_round_trips_next_batch_and_rooms(tmp_path): + state_path = tmp_path / "matrix-state.json" + store = MatrixStateStore(state_path) + + saved = store.save( + MatrixSyncState(next_batch="batch-1", rooms={"sourceosOps": "!room:example.org"}) + ) + loaded = store.load() + + assert saved.next_batch == "batch-1" + assert loaded.next_batch == "batch-1" + assert loaded.rooms == {"sourceosOps": "!room:example.org"} + assert state_path.exists() + + +def test_matrix_state_updates_next_batch_without_dropping_rooms(tmp_path): + store = MatrixStateStore(tmp_path / "matrix-state.json") + store.save(MatrixSyncState(next_batch="batch-1", rooms={"sourceosOps": "!room"})) + + updated = store.update_next_batch("batch-2") + + assert updated.next_batch == "batch-2" + assert updated.rooms == {"sourceosOps": "!room"} + assert updated.updated_at is not None + + +def test_resolve_matrix_room_prefers_state_then_config_then_literal(): + config = config_from_dict({"matrix": {"rooms": {"sourceosOps": "!config:example.org"}}}) + state = MatrixSyncState(rooms={"sourceosOps": "!state:example.org"}) + + assert resolve_matrix_room("sourceosOps", config, state) == "!state:example.org" + assert resolve_matrix_room("!config:example.org", config, MatrixSyncState()) == "!config:example.org" + assert resolve_matrix_room("unknown", config, state) == "unknown" + + +def test_rooms_from_sync_payload_extracts_joined_room_ids(): + rooms = rooms_from_sync_payload( + { + "rooms": { + "join": { + "!one:example.org": {"timeline": {"events": []}}, + "!two:example.org": {"timeline": {"events": []}}, + } + } + } + ) + + assert rooms == { + "!one:example.org": "!one:example.org", + "!two:example.org": "!two:example.org", + } + + +def test_state_file_is_json_object(tmp_path): + state_path = tmp_path / "matrix-state.json" + store = MatrixStateStore(state_path) + store.save(MatrixSyncState(next_batch="batch-1", rooms={"!room": "!room"})) + + raw = json.loads(state_path.read_text(encoding="utf-8")) + + assert raw["next_batch"] == "batch-1" + assert raw["rooms"] == {"!room": "!room"}