diff --git a/src/agent_term/agent_registry.py b/src/agent_term/agent_registry.py new file mode 100644 index 0000000..b9e56e8 --- /dev/null +++ b/src/agent_term/agent_registry.py @@ -0,0 +1,269 @@ +"""Agent Registry adapter primitives. + +AgentTerm is not the authority for agent identity. This module provides a small, +fakeable adapter boundary so AgentTerm can resolve agent identity, session state, +tool grants, and revocation posture before dispatching non-human participants. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import UTC, datetime +from typing import Protocol + +from agent_term.adapters import AdapterResult +from agent_term.events import AgentTermEvent + + +ACTIVE_STATUSES = {"registered", "active"} + + +@dataclass(frozen=True) +class AgentRegistration: + """Resolved Agent Registry record for one non-human participant.""" + + agent_id: str + registry_ref: str + spec_version: str + runtime_authority: str = "agent-registry" + status: str = "registered" + session_id: str | None = None + tool_grants: frozenset[str] = field(default_factory=frozenset) + revoked: bool = False + metadata: dict[str, object] = field(default_factory=dict) + + @property + def is_enabled(self) -> bool: + return self.status in ACTIVE_STATUSES and not self.revoked + + def to_metadata(self) -> dict[str, object]: + return { + "agent_id": self.agent_id, + "agent_registry_ref": self.registry_ref, + "agent_spec_version": self.spec_version, + "runtime_authority": self.runtime_authority, + "agent_status": self.status, + "session_id": self.session_id, + "tool_grants": sorted(self.tool_grants), + "revoked": self.revoked, + **self.metadata, + } + + +@dataclass(frozen=True) +class ToolGrant: + """Resolved tool grant for an agent participant.""" + + grant_id: str + agent_id: str + tool: str + status: str = "active" + metadata: dict[str, object] = field(default_factory=dict) + + @property + def is_active(self) -> bool: + return self.status == "active" + + def to_metadata(self) -> dict[str, object]: + return { + "grant_id": self.grant_id, + "agent_id": self.agent_id, + "tool": self.tool, + "grant_status": self.status, + **self.metadata, + } + + +class AgentRegistryBackend(Protocol): + """Backend contract for Agent Registry lookups.""" + + def resolve_agent(self, agent_id: str) -> AgentRegistration | None: + """Return the agent registration if known.""" + + def resolve_tool_grant(self, agent_id: str, tool: str) -> ToolGrant | None: + """Return a tool grant if active or known.""" + + +class InMemoryAgentRegistryBackend: + """Test/development backend for Agent Registry lookups. + + This backend is intentionally explicit. It should not be used as production + authority; it exists so AgentTerm can validate fail-closed behavior in CI. + """ + + def __init__( + self, + agents: list[AgentRegistration] | None = None, + grants: list[ToolGrant] | None = None, + ) -> None: + self._agents = {agent.agent_id: agent for agent in agents or []} + self._grants = {(grant.agent_id, grant.tool): grant for grant in grants or []} + + def resolve_agent(self, agent_id: str) -> AgentRegistration | None: + return self._agents.get(agent_id) + + def resolve_tool_grant(self, agent_id: str, tool: str) -> ToolGrant | None: + return self._grants.get((agent_id, tool)) + + +class AgentRegistryAdapter: + """Adapter that resolves agent identity, grants, and revocation posture.""" + + key = "agent-registry" + + def __init__(self, backend: AgentRegistryBackend) -> None: + self.backend = backend + + def supports(self, event: AgentTermEvent) -> bool: + return event.source == self.key or event.kind in { + "agent_identity", + "validate_agent_registration", + "tool_grant", + "revocation_check", + } + + def handle(self, event: AgentTermEvent) -> AdapterResult: + if event.kind in {"agent_identity", "validate_agent_registration"}: + return self._resolve_identity(event) + if event.kind == "tool_grant": + return self._resolve_tool_grant(event) + if event.kind == "revocation_check": + return self._check_revocation(event) + return AdapterResult( + ok=False, + source=self.key, + body=f"Unsupported Agent Registry event kind: {event.kind}", + metadata=self._base_metadata(event, status="unsupported_kind"), + ) + + def _resolve_identity(self, event: AgentTermEvent) -> AdapterResult: + agent_id = self._agent_id(event) + if not agent_id: + return self._deny(event, "missing_agent_id") + + registration = self.backend.resolve_agent(agent_id) + if registration is None: + return self._deny(event, "unknown_agent", agent_id=agent_id) + if not registration.is_enabled: + return self._deny( + event, + "agent_not_enabled", + agent_id=agent_id, + extra=registration.to_metadata(), + ) + + return AdapterResult( + ok=True, + source=self.key, + body=f"Agent Registry resolved {agent_id}", + metadata={ + **self._base_metadata(event, status="resolved"), + **registration.to_metadata(), + }, + ) + + def _resolve_tool_grant(self, event: AgentTermEvent) -> AdapterResult: + agent_id = self._agent_id(event) + tool = self._tool(event) + if not agent_id: + return self._deny(event, "missing_agent_id") + if not tool: + return self._deny(event, "missing_tool", agent_id=agent_id) + + registration = self.backend.resolve_agent(agent_id) + if registration is None: + return self._deny(event, "unknown_agent", agent_id=agent_id) + if not registration.is_enabled: + return self._deny( + event, + "agent_not_enabled", + agent_id=agent_id, + extra=registration.to_metadata(), + ) + + grant = self.backend.resolve_tool_grant(agent_id, tool) + if grant is None or not grant.is_active: + return self._deny( + event, + "tool_grant_not_active", + agent_id=agent_id, + extra={"tool": tool}, + ) + + return AdapterResult( + ok=True, + source=self.key, + body=f"Agent Registry granted {agent_id} tool access: {tool}", + metadata={ + **self._base_metadata(event, status="tool_granted"), + **registration.to_metadata(), + **grant.to_metadata(), + }, + ) + + def _check_revocation(self, event: AgentTermEvent) -> AdapterResult: + agent_id = self._agent_id(event) + if not agent_id: + return self._deny(event, "missing_agent_id") + + registration = self.backend.resolve_agent(agent_id) + if registration is None: + return self._deny(event, "unknown_agent", agent_id=agent_id) + if registration.revoked: + return self._deny( + event, + "agent_revoked", + agent_id=agent_id, + extra=registration.to_metadata(), + ) + + return AdapterResult( + ok=True, + source=self.key, + body=f"Agent Registry revocation check passed for {agent_id}", + metadata={ + **self._base_metadata(event, status="not_revoked"), + **registration.to_metadata(), + }, + ) + + def _deny( + self, + event: AgentTermEvent, + reason: str, + *, + agent_id: str | None = None, + extra: dict[str, object] | None = None, + ) -> AdapterResult: + metadata = self._base_metadata(event, status="denied") + metadata["deny_reason"] = reason + if agent_id: + metadata["agent_id"] = agent_id + if extra: + metadata.update(extra) + return AdapterResult( + ok=False, + source=self.key, + body=f"Agent Registry denied request: {reason}", + metadata=metadata, + ) + + def _base_metadata(self, event: AgentTermEvent, *, status: str) -> dict[str, object]: + return { + "request_event_id": event.event_id, + "registry_status": status, + "revocation_check_at": datetime.now(UTC).isoformat(), + "fail_closed": True, + } + + def _agent_id(self, event: AgentTermEvent) -> str | None: + value = ( + event.metadata.get("agent_id") + or event.metadata.get("agentRegistryId") + or event.metadata.get("agent_registry_id") + ) + return str(value) if value else None + + def _tool(self, event: AgentTermEvent) -> str | None: + value = event.metadata.get("tool") or event.metadata.get("tool_name") + return str(value) if value else None diff --git a/tests/test_agent_registry.py b/tests/test_agent_registry.py new file mode 100644 index 0000000..b261a13 --- /dev/null +++ b/tests/test_agent_registry.py @@ -0,0 +1,140 @@ +from agent_term.agent_registry import ( + AgentRegistration, + AgentRegistryAdapter, + InMemoryAgentRegistryBackend, + ToolGrant, +) +from agent_term.events import AgentTermEvent + + +def make_event(kind: str, metadata: dict[str, object]) -> AgentTermEvent: + return AgentTermEvent( + channel="!agent-registry", + sender="@operator", + kind=kind, + source="agent-registry", + body="registry test event", + metadata=metadata, + ) + + +def test_resolves_registered_agent_identity(): + backend = InMemoryAgentRegistryBackend( + agents=[ + AgentRegistration( + agent_id="agent.codex", + registry_ref="SocioProphet/agent-registry#agent.codex", + spec_version="v0.1", + session_id="session-1", + ) + ] + ) + adapter = AgentRegistryAdapter(backend) + + result = adapter.handle(make_event("agent_identity", {"agent_id": "agent.codex"})) + + assert result.ok is True + assert result.metadata["agent_id"] == "agent.codex" + assert result.metadata["registry_status"] == "resolved" + assert result.metadata["session_id"] == "session-1" + + +def test_unknown_agent_fails_closed(): + adapter = AgentRegistryAdapter(InMemoryAgentRegistryBackend()) + + result = adapter.handle(make_event("agent_identity", {"agent_id": "agent.unknown"})) + + assert result.ok is False + assert result.metadata["fail_closed"] is True + assert result.metadata["deny_reason"] == "unknown_agent" + + +def test_revoked_agent_fails_closed(): + backend = InMemoryAgentRegistryBackend( + agents=[ + AgentRegistration( + agent_id="agent.revoked", + registry_ref="SocioProphet/agent-registry#agent.revoked", + spec_version="v0.1", + revoked=True, + ) + ] + ) + adapter = AgentRegistryAdapter(backend) + + result = adapter.handle(make_event("agent_identity", {"agent_id": "agent.revoked"})) + + assert result.ok is False + assert result.metadata["deny_reason"] == "agent_not_enabled" + assert result.metadata["revoked"] is True + + +def test_active_tool_grant_passes(): + backend = InMemoryAgentRegistryBackend( + agents=[ + AgentRegistration( + agent_id="agent.claude-code", + registry_ref="SocioProphet/agent-registry#agent.claude-code", + spec_version="v0.1", + tool_grants=frozenset({"grant.repo-write"}), + ) + ], + grants=[ + ToolGrant( + grant_id="grant.repo-write", + agent_id="agent.claude-code", + tool="repo-write", + ) + ], + ) + adapter = AgentRegistryAdapter(backend) + + result = adapter.handle( + make_event("tool_grant", {"agent_id": "agent.claude-code", "tool": "repo-write"}) + ) + + assert result.ok is True + assert result.metadata["registry_status"] == "tool_granted" + assert result.metadata["grant_id"] == "grant.repo-write" + assert result.metadata["tool"] == "repo-write" + + +def test_missing_tool_grant_fails_closed(): + backend = InMemoryAgentRegistryBackend( + agents=[ + AgentRegistration( + agent_id="agent.openclaw", + registry_ref="SocioProphet/agent-registry#agent.openclaw", + spec_version="v0.1", + ) + ] + ) + adapter = AgentRegistryAdapter(backend) + + result = adapter.handle( + make_event("tool_grant", {"agent_id": "agent.openclaw", "tool": "memory-write"}) + ) + + assert result.ok is False + assert result.metadata["deny_reason"] == "tool_grant_not_active" + assert result.metadata["tool"] == "memory-write" + + +def test_result_can_be_converted_to_agentterm_event(): + backend = InMemoryAgentRegistryBackend( + agents=[ + AgentRegistration( + agent_id="agent.hermes", + registry_ref="SocioProphet/agent-registry#agent.hermes", + spec_version="v0.1", + ) + ] + ) + adapter = AgentRegistryAdapter(backend) + request = make_event("agent_identity", {"agent_id": "agent.hermes"}) + + result_event = adapter.handle(request).to_event(request) + + assert result_event.source == "agent-registry" + assert result_event.metadata["request_event_id"] == request.event_id + assert result_event.metadata["agent_id"] == "agent.hermes"