Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/agent_term/matrix_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from agent_term.dispatch_cli import build_pipeline
from agent_term.events import AgentTermEvent
from agent_term.matrix_service import normalize_sync_payload
from agent_term.matrix_state import DEFAULT_STATE_PATH, MatrixStateStore, resolve_matrix_room
from agent_term.matrix_state import rooms_from_sync_payload
from agent_term.store import DEFAULT_DB_PATH, EventStore


Expand All @@ -22,6 +24,7 @@ def build_parser() -> argparse.ArgumentParser:
)
parser.add_argument("--config", help="Optional AgentTerm JSON config path.")
parser.add_argument("--db", help="Path to local AgentTerm SQLite event log.")
parser.add_argument("--state", default=str(DEFAULT_STATE_PATH), help="Path to Matrix sync state JSON.")
subparsers = parser.add_subparsers(dest="command", required=True)

send = subparsers.add_parser("send", help="Send a Matrix message through the AgentTerm dispatch pipeline.")
Expand All @@ -41,17 +44,25 @@ def build_parser() -> argparse.ArgumentParser:
sync = subparsers.add_parser("normalize-sync", help="Normalize a Matrix /sync JSON payload.")
sync.add_argument("payload", help="Path to a Matrix sync payload JSON file, or '-' for stdin.")
sync.add_argument("--persist", action="store_true", help="Persist normalized events into EventStore.")
sync.add_argument("--save-state", action="store_true", help="Persist next_batch and room IDs into Matrix state.")
sync.add_argument("--sender", default="@agent-term")
sync.add_argument("--channel", default="!matrix-sync")

state = subparsers.add_parser("state", help="Show durable Matrix sync state.")
state.add_argument("--json", action="store_true", help="Print state as JSON.")

return parser


def cmd_send(args: argparse.Namespace) -> int:
config = load_config(args.config)
db_path = Path(args.db or config.event_store.path or DEFAULT_DB_PATH)
state_store = MatrixStateStore(args.state)
state = state_store.load()
room_id = resolve_matrix_room(args.room, config, state)
metadata: dict[str, object] = {
"matrix_room_id": args.room,
"matrix_room_id": room_id,
"matrix_room_alias": args.room if args.room != room_id else None,
"msgtype": args.msgtype,
"policy_action": args.policy_action,
}
Expand All @@ -64,7 +75,7 @@ def cmd_send(args: argparse.Namespace) -> int:
metadata["matrix_e2ee_verified"] = bool(args.matrix_verified)

event = AgentTermEvent(
channel=args.room,
channel=room_id,
sender=args.sender,
kind="matrix_service_send",
source="matrix-service",
Expand All @@ -90,6 +101,7 @@ def cmd_send(args: argparse.Namespace) -> int:
outcome = build_pipeline(dispatch_args, event, store, config).dispatch(event)
status = "ok" if outcome.ok else "blocked"
print(f"matrix_send_status={status}")
print(f"matrix_room_id={room_id}")
if outcome.blocked_reason:
print(f"blocked_reason={outcome.blocked_reason}")
print(f"persisted_events={len(outcome.persisted_events)}")
Expand Down Expand Up @@ -120,11 +132,30 @@ def cmd_normalize_sync(args: argparse.Namespace) -> int:
store.close()
print(f"persisted_events={len(events)}")

if args.save_state:
state_store = MatrixStateStore(args.state)
state = state_store.update_rooms(rooms_from_sync_payload(payload))
state = state_store.save(state.with_next_batch(batch.next_batch))
print(f"matrix_state_next_batch={state.next_batch}")
print(f"matrix_state_rooms={len(state.rooms)}")

for event in events:
print(f"{event.channel}\t{event.sender}\t{event.kind}\t{event.body}")
return 0


def cmd_state(args: argparse.Namespace) -> int:
state = MatrixStateStore(args.state).load()
if args.json:
print(json.dumps(state.to_dict(), indent=2, sort_keys=True))
return 0
print(f"matrix_state_next_batch={state.next_batch or ''}")
print(f"matrix_state_rooms={len(state.rooms)}")
for alias, room_id in sorted(state.rooms.items()):
print(f"{alias}\t{room_id}")
return 0


def _load_json_payload(path: str) -> dict[str, Any]:
if path == "-":
raw = sys.stdin.read()
Expand All @@ -142,6 +173,8 @@ def main(argv: list[str] | None = None) -> int:
return cmd_send(args)
if args.command == "normalize-sync":
return cmd_normalize_sync(args)
if args.command == "state":
return cmd_state(args)
raise SystemExit(f"unknown command: {args.command}")


Expand Down
119 changes: 119 additions & 0 deletions src/agent_term/matrix_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Durable Matrix sync state and room resolution helpers."""

from __future__ import annotations

import json
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path

from agent_term.config import AgentTermConfig


DEFAULT_STATE_PATH = Path(".agent-term/matrix-state.json")


@dataclass(frozen=True)
class MatrixSyncState:
"""Durable sync cursor and room-alias state for Matrix workflows."""

next_batch: str | None = None
rooms: dict[str, str] = field(default_factory=dict)
updated_at: str | None = None

def to_dict(self) -> dict[str, object]:
return {
"next_batch": self.next_batch,
"rooms": self.rooms,
"updated_at": self.updated_at,
}

@classmethod
def from_dict(cls, value: dict[str, object]) -> "MatrixSyncState":
rooms_raw = value.get("rooms")
rooms = rooms_raw if isinstance(rooms_raw, dict) else {}
return cls(
next_batch=_optional_str(value.get("next_batch")),
rooms={str(key): str(room_id) for key, room_id in rooms.items()},
updated_at=_optional_str(value.get("updated_at")),
)

def with_next_batch(self, next_batch: str | None) -> "MatrixSyncState":
if next_batch is None:
return self
return MatrixSyncState(
next_batch=next_batch,
rooms=self.rooms,
updated_at=datetime.now(UTC).isoformat(),
)

def with_rooms(self, rooms: dict[str, str]) -> "MatrixSyncState":
merged = {**self.rooms, **rooms}
return MatrixSyncState(
next_batch=self.next_batch,
rooms=merged,
updated_at=datetime.now(UTC).isoformat(),
)


class MatrixStateStore:
"""JSON-backed Matrix sync state store."""

def __init__(self, path: Path | str = DEFAULT_STATE_PATH) -> None:
self.path = Path(path)

def load(self) -> MatrixSyncState:
if not self.path.exists():
return MatrixSyncState()
with self.path.open("r", encoding="utf-8") as handle:
raw = json.load(handle)
if not isinstance(raw, dict):
raise ValueError("Matrix state file must contain a JSON object")
return MatrixSyncState.from_dict(raw)

def save(self, state: MatrixSyncState) -> MatrixSyncState:
self.path.parent.mkdir(parents=True, exist_ok=True)
with self.path.open("w", encoding="utf-8") as handle:
json.dump(state.to_dict(), handle, indent=2, sort_keys=True)
handle.write("\n")
return state

def update_next_batch(self, next_batch: str | None) -> MatrixSyncState:
state = self.load().with_next_batch(next_batch)
return self.save(state)

def update_rooms(self, rooms: dict[str, str]) -> MatrixSyncState:
state = self.load().with_rooms(rooms)
return self.save(state)


def resolve_matrix_room(room: str, config: AgentTermConfig, state: MatrixSyncState) -> str:
"""Resolve a room alias/key to a Matrix room ID when possible."""

if room in state.rooms:
return state.rooms[room]
if room in config.matrix.rooms:
return config.matrix.rooms[room]
for alias, room_id in config.matrix.rooms.items():
if room == alias or room == room_id:
return room_id
return room


def rooms_from_sync_payload(payload: dict[str, object]) -> dict[str, str]:
"""Extract room ID mappings from a Matrix sync payload."""

rooms: dict[str, str] = {}
rooms_raw = payload.get("rooms")
if not isinstance(rooms_raw, dict):
return rooms
joined = rooms_raw.get("join")
if not isinstance(joined, dict):
return rooms
for room_id in joined:
rooms[str(room_id)] = str(room_id)
return rooms


def _optional_str(value: object) -> str | None:
return str(value) if value is not None else None
96 changes: 96 additions & 0 deletions tests/test_matrix_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def test_matrix_cli_send_dispatches_policy_admitted_message(tmp_path, capsys):
captured = capsys.readouterr()
assert exit_code == 0
assert "matrix_send_status=ok" in captured.out
assert "matrix_room_id=!room:example.org" in captured.out
assert "persisted_events=3" in captured.out

store = EventStore(db_path)
Expand All @@ -40,6 +41,35 @@ def test_matrix_cli_send_dispatches_policy_admitted_message(tmp_path, capsys):
assert events[-1].metadata["matrix_event_id"] == "$local-1"


def test_matrix_cli_send_resolves_room_from_config(tmp_path, capsys):
db_path = tmp_path / "events.sqlite3"
config_path = tmp_path / "agent-term.json"
config_path.write_text(
json.dumps(
{
"eventStore": {"driver": "sqlite", "path": str(db_path)},
"matrix": {"rooms": {"sourceosOps": "!sourceos-ops:example.org"}},
"localRuntime": {"allowPolicies": ["matrix-service.matrix_service_send"]},
}
),
encoding="utf-8",
)

exit_code = main(["--config", str(config_path), "send", "sourceosOps", "Hello"])

captured = capsys.readouterr()
assert exit_code == 0
assert "matrix_room_id=!sourceos-ops:example.org" in captured.out

store = EventStore(db_path)
try:
events = store.tail(limit=10)
finally:
store.close()
assert events[0].channel == "!sourceos-ops:example.org"
assert events[0].metadata["matrix_room_alias"] == "sourceosOps"


def test_matrix_cli_send_blocks_unverified_encrypted_sensitive_context(tmp_path, capsys):
db_path = tmp_path / "events.sqlite3"

Expand Down Expand Up @@ -146,3 +176,69 @@ def test_matrix_cli_normalize_sync_persists_events(tmp_path, capsys):
assert len(events) == 1
assert events[0].kind == "matrix_membership"
assert events[0].metadata["matrix_membership"] == "join"


def test_matrix_cli_normalize_sync_saves_state(tmp_path, capsys):
db_path = tmp_path / "events.sqlite3"
state_path = tmp_path / "matrix-state.json"
payload_path = tmp_path / "sync.json"
payload_path.write_text(
json.dumps(
{
"next_batch": "batch-3",
"rooms": {
"join": {
"!room:example.org": {
"timeline": {
"events": [
{
"event_id": "$event1",
"sender": "@operator:example.org",
"type": "m.room.message",
"content": {"body": "hello"},
}
]
}
}
}
},
}
),
encoding="utf-8",
)

exit_code = main(
[
"--db",
str(db_path),
"--state",
str(state_path),
"normalize-sync",
str(payload_path),
"--save-state",
]
)

captured = capsys.readouterr()
assert exit_code == 0
assert "matrix_state_next_batch=batch-3" in captured.out
assert "matrix_state_rooms=1" in captured.out
state = json.loads(state_path.read_text(encoding="utf-8"))
assert state["next_batch"] == "batch-3"
assert state["rooms"] == {"!room:example.org": "!room:example.org"}


def test_matrix_cli_state_prints_state(tmp_path, capsys):
state_path = tmp_path / "matrix-state.json"
state_path.write_text(
json.dumps({"next_batch": "batch-4", "rooms": {"sourceosOps": "!room"}}),
encoding="utf-8",
)

exit_code = main(["--state", str(state_path), "state"])

captured = capsys.readouterr()
assert exit_code == 0
assert "matrix_state_next_batch=batch-4" in captured.out
assert "matrix_state_rooms=1" in captured.out
assert "sourceosOps\t!room" in captured.out
Loading
Loading