diff --git a/src/agent_term/matrix_cli.py b/src/agent_term/matrix_cli.py index e821276..a3fe460 100644 --- a/src/agent_term/matrix_cli.py +++ b/src/agent_term/matrix_cli.py @@ -11,6 +11,7 @@ from agent_term.config import load_config from agent_term.dispatch_cli import build_pipeline from agent_term.events import AgentTermEvent +from agent_term.matrix_service import MatrixServiceAdapter, build_matrix_service_backend from agent_term.matrix_service import normalize_sync_payload from agent_term.matrix_state import DEFAULT_STATE_PATH, MatrixStateStore, resolve_matrix_room from agent_term.matrix_state import rooms_from_sync_payload @@ -48,6 +49,13 @@ def build_parser() -> argparse.ArgumentParser: sync.add_argument("--sender", default="@agent-term") sync.add_argument("--channel", default="!matrix-sync") + live_sync = subparsers.add_parser("sync", help="Run incremental Matrix sync through the configured backend.") + live_sync.add_argument("--persist", action="store_true", help="Persist normalized events into EventStore.") + live_sync.add_argument("--save-state", action="store_true", default=True, help="Persist next_batch and room IDs into Matrix state.") + live_sync.add_argument("--no-save-state", dest="save_state", action="store_false") + live_sync.add_argument("--timeout-ms", type=int, default=0) + live_sync.add_argument("--full-state", action="store_true") + state = subparsers.add_parser("state", help="Show durable Matrix sync state.") state.add_argument("--json", action="store_true", help="Print state as JSON.") @@ -118,12 +126,77 @@ def cmd_normalize_sync(args: argparse.Namespace) -> int: payload = _load_json_payload(args.payload) batch = normalize_sync_payload(payload) + print_sync_batch(batch, db_path=db_path, state_path=Path(args.state), persist=args.persist, save_state=args.save_state, payload=payload) + return 0 + + +def cmd_incremental_sync(args: argparse.Namespace) -> int: + config = load_config(args.config) + db_path = Path(args.db or config.event_store.path or DEFAULT_DB_PATH) + state_store = MatrixStateStore(args.state) + state = state_store.load() + event = AgentTermEvent( + channel="!matrix-sync", + sender="@agent-term", + kind="matrix_sync", + source="matrix-service", + body="Run Matrix incremental sync.", + metadata={ + "since": state.next_batch, + "timeout_ms": args.timeout_ms, + "full_state": args.full_state, + }, + ) + result = MatrixServiceAdapter(build_matrix_service_backend(config)).handle(event) + if not result.ok: + print("matrix_sync_status=blocked") + if result.metadata.get("deny_reason"): + print(f"blocked_reason={result.metadata['deny_reason']}") + return 1 + + events_metadata = result.metadata.get("matrix_events") + events = [] + if isinstance(events_metadata, list): + for metadata in events_metadata: + if not isinstance(metadata, dict): + continue + events.append( + AgentTermEvent( + channel=str(metadata.get("matrix_room_alias") or metadata.get("matrix_room_id") or "!matrix-sync"), + sender=str(metadata.get("matrix_sender_mxid") or "@agent-term"), + kind="matrix_room_event", + source="matrix", + body="", + thread_id=_optional_str(metadata.get("matrix_thread_root_event_id")), + metadata=metadata, + ) + ) + + print(f"matrix_sync_events={len(events)}") + next_batch = _optional_str(result.metadata.get("matrix_next_batch")) + if next_batch: + print(f"matrix_next_batch={next_batch}") + if args.persist: + store = EventStore(db_path) + try: + for event in events: + store.append(event) + finally: + store.close() + print(f"persisted_events={len(events)}") + if args.save_state: + state = state_store.update_next_batch(next_batch) + print(f"matrix_state_next_batch={state.next_batch}") + return 0 + + +def print_sync_batch(batch, *, db_path: Path, state_path: Path, persist: bool, save_state: bool, payload: dict[str, Any]) -> None: print(f"matrix_sync_events={len(batch.events)}") if batch.next_batch: print(f"matrix_next_batch={batch.next_batch}") events = [matrix_event.to_agentterm_event() for matrix_event in batch.events] - if args.persist: + if persist: store = EventStore(db_path) try: for event in events: @@ -132,8 +205,8 @@ def cmd_normalize_sync(args: argparse.Namespace) -> int: store.close() print(f"persisted_events={len(events)}") - if args.save_state: - state_store = MatrixStateStore(args.state) + if save_state: + state_store = MatrixStateStore(state_path) state = state_store.update_rooms(rooms_from_sync_payload(payload)) state = state_store.save(state.with_next_batch(batch.next_batch)) print(f"matrix_state_next_batch={state.next_batch}") @@ -141,7 +214,6 @@ def cmd_normalize_sync(args: argparse.Namespace) -> int: for event in events: print(f"{event.channel}\t{event.sender}\t{event.kind}\t{event.body}") - return 0 def cmd_state(args: argparse.Namespace) -> int: @@ -167,12 +239,18 @@ def _load_json_payload(path: str) -> dict[str, Any]: return value +def _optional_str(value: object) -> str | None: + return str(value) if value is not None else None + + def main(argv: list[str] | None = None) -> int: args = build_parser().parse_args(argv) if args.command == "send": return cmd_send(args) if args.command == "normalize-sync": return cmd_normalize_sync(args) + if args.command == "sync": + return cmd_incremental_sync(args) if args.command == "state": return cmd_state(args) raise SystemExit(f"unknown command: {args.command}") diff --git a/src/agent_term/matrix_service.py b/src/agent_term/matrix_service.py index f8a3cc0..9a15748 100644 --- a/src/agent_term/matrix_service.py +++ b/src/agent_term/matrix_service.py @@ -60,6 +60,16 @@ def to_metadata(self) -> dict[str, object]: } +@dataclass(frozen=True) +class MatrixSyncRequest: + """A Matrix incremental sync request.""" + + since: str | None = None + timeout_ms: int = 0 + full_state: bool = False + metadata: dict[str, object] = field(default_factory=dict) + + @dataclass(frozen=True) class MatrixSyncBatch: """Normalized Matrix sync batch.""" @@ -79,6 +89,9 @@ class MatrixServiceBackend(Protocol): def send_text(self, request: MatrixSendRequest) -> MatrixSendResult: """Send a text event into a Matrix room.""" + def sync(self, request: MatrixSyncRequest) -> MatrixSyncBatch: + """Run an incremental Matrix sync.""" + def normalize_sync(self, payload: dict[str, Any]) -> MatrixSyncBatch: """Normalize a Matrix sync payload into AgentTerm MatrixRoomEvents.""" @@ -86,8 +99,10 @@ def normalize_sync(self, payload: dict[str, Any]) -> MatrixSyncBatch: class InMemoryMatrixServiceBackend: """Offline Matrix backend for tests and local development.""" - def __init__(self) -> None: + def __init__(self, sync_payloads: list[dict[str, Any]] | None = None) -> None: self.sent: list[MatrixSendRequest] = [] + self.sync_requests: list[MatrixSyncRequest] = [] + self._sync_payloads = list(sync_payloads or []) def send_text(self, request: MatrixSendRequest) -> MatrixSendResult: self.sent.append(request) @@ -100,6 +115,22 @@ def send_text(self, request: MatrixSendRequest) -> MatrixSendResult: metadata={"local_backend": True, "txn_id": request.txn_id}, ) + def sync(self, request: MatrixSyncRequest) -> MatrixSyncBatch: + self.sync_requests.append(request) + if not self._sync_payloads: + return MatrixSyncBatch( + events=(), + next_batch=request.since, + metadata={"local_backend": True, "empty_sync": True}, + ) + payload = self._sync_payloads.pop(0) + batch = normalize_sync_payload(payload) + return MatrixSyncBatch( + events=batch.events, + next_batch=batch.next_batch, + metadata={**batch.metadata, "local_backend": True, "since": request.since}, + ) + def normalize_sync(self, payload: dict[str, Any]) -> MatrixSyncBatch: return normalize_sync_payload(payload) @@ -127,6 +158,9 @@ def __init__( def send_text(self, request: MatrixSendRequest) -> MatrixSendResult: return asyncio.run(self._send_text_async(request)) + def sync(self, request: MatrixSyncRequest) -> MatrixSyncBatch: + return asyncio.run(self._sync_async(request)) + def normalize_sync(self, payload: dict[str, Any]) -> MatrixSyncBatch: return normalize_sync_payload(payload) @@ -171,6 +205,46 @@ async def _send_text_async(self, request: MatrixSendRequest) -> MatrixSendResult error=repr(response), ) + async def _sync_async(self, request: MatrixSyncRequest) -> MatrixSyncBatch: + try: + from nio import AsyncClient, SyncError, SyncResponse + except ImportError as exc: + raise RuntimeError( + "matrix-nio is required for NioMatrixServiceBackend; install agent-term[matrix]" + ) from exc + + client = AsyncClient(self.homeserver_url, self.user_id, device_id=self.device_name) + client.access_token = self.access_token + try: + response = await client.sync( + timeout=request.timeout_ms, + since=request.since, + full_state=request.full_state, + ) + finally: + await client.close() + + if isinstance(response, SyncResponse): + payload = getattr(response, "source", None) + if isinstance(payload, dict): + return normalize_sync_payload(payload) + return MatrixSyncBatch( + events=(), + next_batch=response.next_batch, + metadata={"matrix_sync_response": "nio_without_source_payload"}, + ) + if isinstance(response, SyncError): + return MatrixSyncBatch( + events=(), + next_batch=request.since, + metadata={"matrix_sync_error": response.message}, + ) + return MatrixSyncBatch( + events=(), + next_batch=request.since, + metadata={"matrix_sync_error": repr(response)}, + ) + class MatrixServiceAdapter: """Adapter that performs Matrix service send/sync operations through a backend.""" @@ -248,16 +322,19 @@ def _send(self, event: AgentTermEvent) -> AdapterResult: ) def _sync(self, event: AgentTermEvent) -> AdapterResult: - payload = event.metadata.get("matrix_sync") or event.metadata.get("payload") - if not isinstance(payload, dict): - return AdapterResult( - ok=False, - source=self.key, - body="Matrix service sync blocked: missing sync payload", - kind="matrix_sync", - metadata={"deny_reason": "missing_sync_payload", "fail_closed": True}, + if isinstance(event.metadata.get("matrix_sync"), dict) or isinstance(event.metadata.get("payload"), dict): + payload = event.metadata.get("matrix_sync") or event.metadata.get("payload") + batch = self.backend.normalize_sync(payload) # type: ignore[arg-type] + else: + timeout_ms = int(event.metadata.get("timeout_ms") or 0) + batch = self.backend.sync( + MatrixSyncRequest( + since=_optional_str(event.metadata.get("since") or event.metadata.get("next_batch")), + timeout_ms=timeout_ms, + full_state=bool(event.metadata.get("full_state")), + metadata={"request_event_id": event.event_id}, + ) ) - batch = self.backend.normalize_sync(payload) return AdapterResult( ok=True, source=self.key, @@ -269,6 +346,7 @@ def _sync(self, event: AgentTermEvent) -> AdapterResult: "matrix_sync_event_count": len(batch.events), "matrix_next_batch": batch.next_batch, "matrix_events": [matrix_event.to_metadata() for matrix_event in batch.events], + **batch.metadata, }, ) diff --git a/tests/test_matrix_cli.py b/tests/test_matrix_cli.py index 90d4e5a..7b2bbfe 100644 --- a/tests/test_matrix_cli.py +++ b/tests/test_matrix_cli.py @@ -228,6 +228,38 @@ def test_matrix_cli_normalize_sync_saves_state(tmp_path, capsys): assert state["rooms"] == {"!room:example.org": "!room:example.org"} +def test_matrix_cli_incremental_sync_uses_stored_next_batch_and_updates_state(tmp_path, capsys): + db_path = tmp_path / "events.sqlite3" + state_path = tmp_path / "matrix-state.json" + state_path.write_text(json.dumps({"next_batch": "batch-1", "rooms": {}}), encoding="utf-8") + + exit_code = main( + [ + "--db", + str(db_path), + "--state", + str(state_path), + "sync", + "--persist", + ] + ) + + captured = capsys.readouterr() + assert exit_code == 0 + assert "matrix_sync_events=0" in captured.out + assert "matrix_next_batch=batch-1" in captured.out + assert "matrix_state_next_batch=batch-1" in captured.out + state = json.loads(state_path.read_text(encoding="utf-8")) + assert state["next_batch"] == "batch-1" + + store = EventStore(db_path) + try: + events = store.tail(limit=10) + finally: + store.close() + assert events == [] + + def test_matrix_cli_state_prints_state(tmp_path, capsys): state_path = tmp_path / "matrix-state.json" state_path.write_text( diff --git a/tests/test_matrix_service.py b/tests/test_matrix_service.py index dd6952f..7fec0e8 100644 --- a/tests/test_matrix_service.py +++ b/tests/test_matrix_service.py @@ -7,6 +7,7 @@ MatrixSendRequest, MatrixServiceAdapter, MatrixServiceConfigError, + MatrixSyncRequest, NioMatrixServiceBackend, build_matrix_service_backend, normalize_sync_payload, @@ -40,6 +41,40 @@ def test_in_memory_matrix_backend_sends_text(): assert backend.sent == [request] +def test_in_memory_matrix_backend_incremental_sync_uses_since_token(): + backend = InMemoryMatrixServiceBackend( + sync_payloads=[ + { + "next_batch": "batch-2", + "rooms": { + "join": { + "!room:example.org": { + "timeline": { + "events": [ + { + "event_id": "$event1", + "sender": "@operator:example.org", + "type": "m.room.message", + "content": {"body": "hello"}, + } + ] + } + } + } + }, + } + ] + ) + + batch = backend.sync(MatrixSyncRequest(since="batch-1", timeout_ms=1000)) + + assert backend.sync_requests[0].since == "batch-1" + assert backend.sync_requests[0].timeout_ms == 1000 + assert batch.next_batch == "batch-2" + assert batch.events[0].event_id == "$event1" + assert batch.metadata["since"] == "batch-1" + + def test_normalize_sync_payload_preserves_room_event_metadata(): batch = normalize_sync_payload( { @@ -160,6 +195,27 @@ def test_matrix_service_adapter_sync_normalizes_payload(): assert result.metadata["matrix_events"][0]["matrix_membership"] == "join" +def test_matrix_service_adapter_incremental_sync_uses_backend(): + backend = InMemoryMatrixServiceBackend(sync_payloads=[{"next_batch": "batch-2"}]) + adapter = MatrixServiceAdapter(backend) + event = AgentTermEvent( + channel="!matrix-sync", + sender="@operator", + kind="matrix_sync", + source="matrix-service", + body="sync", + metadata={"since": "batch-1", "timeout_ms": 500, "full_state": True}, + ) + + result = adapter.handle(event) + + assert result.ok is True + assert backend.sync_requests[0].since == "batch-1" + assert backend.sync_requests[0].timeout_ms == 500 + assert backend.sync_requests[0].full_state is True + assert result.metadata["matrix_next_batch"] == "batch-2" + + def test_build_matrix_backend_defaults_to_in_memory_when_disabled(): config = config_from_dict({"matrix": {"enabled": False}})