From ce53dc46ab95ceeb7fd4080b01e1d0b25eeed240 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Thu, 30 Apr 2026 13:14:16 -0400 Subject: [PATCH 1/2] Add Policy Fabric admission adapter primitives --- src/agent_term/policy_fabric.py | 200 ++++++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 src/agent_term/policy_fabric.py diff --git a/src/agent_term/policy_fabric.py b/src/agent_term/policy_fabric.py new file mode 100644 index 0000000..729b3f7 --- /dev/null +++ b/src/agent_term/policy_fabric.py @@ -0,0 +1,200 @@ +"""Policy Fabric admission adapter primitives. + +AgentTerm is not the authority for policy. This module provides a small, +fakeable adapter boundary so side-effecting actions and sensitive context release +can be admitted or denied by a Policy Fabric-compatible backend before dispatch. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import UTC, datetime +from typing import Protocol + +from agent_term.adapters import AdapterResult +from agent_term.events import AgentTermEvent + + +ALLOW = "allow" +DENY = "deny" +PENDING = "pending" +UNKNOWN = "unknown" + +SIDE_EFFECTING_KINDS = frozenset( + { + "shell_session", + "shell_attach", + "workspace_materialization", + "context_pack", + "memory_recall", + "memory_write", + "semantic_membrane", + "investigation", + "search_packet", + "graph_view", + "graph_diffusion", + "graph_artifact", + "run", + "replay", + "github_mutation", + "ci_retry", + "tool_grant", + "revocation", + } +) + +SENSITIVE_CONTEXT_KINDS = frozenset( + { + "context_pack", + "memory_recall", + "memory_write", + "semantic_thread", + "claim", + "citation", + "investigation", + "search_packet", + "synthesis", + } +) + + +@dataclass(frozen=True) +class PolicyDecision: + """Resolved policy decision for an AgentTerm event.""" + + decision_id: str + action: str + status: str + policy_ref: str + reason: str | None = None + obligations: tuple[str, ...] = () + metadata: dict[str, object] = field(default_factory=dict) + + @property + def is_allowed(self) -> bool: + return self.status == ALLOW + + def to_metadata(self) -> dict[str, object]: + return { + "policy_decision_id": self.decision_id, + "policy_action": self.action, + "policy_status": self.status, + "policy_ref": self.policy_ref, + "policy_reason": self.reason, + "policy_obligations": list(self.obligations), + **self.metadata, + } + + +class PolicyFabricBackend(Protocol): + """Backend contract for Policy Fabric decision lookup.""" + + def evaluate(self, event: AgentTermEvent) -> PolicyDecision | None: + """Return the admission decision for an event, or None if unknown.""" + + +class InMemoryPolicyFabricBackend: + """Test/development backend for Policy Fabric decisions.""" + + def __init__(self, decisions: list[PolicyDecision] | None = None) -> None: + self._decisions = {decision.action: decision for decision in decisions or []} + + def evaluate(self, event: AgentTermEvent) -> PolicyDecision | None: + action = action_for_event(event) + return self._decisions.get(action) + + +class PolicyFabricAdapter: + """Adapter that enforces policy admission before sensitive or side-effecting work.""" + + key = "policy-fabric" + + def __init__(self, backend: PolicyFabricBackend) -> None: + self.backend = backend + + def supports(self, event: AgentTermEvent) -> bool: + return event.source == self.key or requires_admission(event) + + def handle(self, event: AgentTermEvent) -> AdapterResult: + action = action_for_event(event) + if not requires_admission(event): + return AdapterResult( + ok=True, + source=self.key, + body=f"Policy Fabric admission not required for {action}", + kind="policy_check", + metadata=self._base_metadata(event, action=action, status="not_required"), + ) + + decision = self.backend.evaluate(event) + if decision is None: + return self._deny(event, action, "no_policy_decision") + if decision.status == PENDING: + return self._deny(event, action, "policy_decision_pending", decision=decision) + if decision.status != ALLOW: + reason = decision.reason or "policy_denied" + return self._deny(event, action, reason, decision=decision) + + return AdapterResult( + ok=True, + source=self.key, + body=f"Policy Fabric admitted {action}", + kind="decision", + metadata={ + **self._base_metadata(event, action=action, status="admitted"), + **decision.to_metadata(), + }, + ) + + def _deny( + self, + event: AgentTermEvent, + action: str, + reason: str, + *, + decision: PolicyDecision | None = None, + ) -> AdapterResult: + metadata = self._base_metadata(event, action=action, status="denied") + metadata["deny_reason"] = reason + if decision: + metadata.update(decision.to_metadata()) + return AdapterResult( + ok=False, + source=self.key, + body=f"Policy Fabric denied {action}: {reason}", + kind="decision", + metadata=metadata, + ) + + def _base_metadata( + self, + event: AgentTermEvent, + *, + action: str, + status: str, + ) -> dict[str, object]: + return { + "request_event_id": event.event_id, + "policy_action": action, + "admission_status": status, + "policy_check_at": datetime.now(UTC).isoformat(), + "requires_admission": requires_admission(event), + "fail_closed": True, + } + + +def action_for_event(event: AgentTermEvent) -> str: + explicit_action = event.metadata.get("policy_action") or event.metadata.get("action") + if explicit_action: + return str(explicit_action) + return f"{event.source}.{event.kind}" + + +def requires_admission(event: AgentTermEvent) -> bool: + if bool(event.metadata.get("requires_policy_admission")): + return True + if bool(event.metadata.get("sensitive_context")): + return True + if bool(event.metadata.get("approval_required")): + return True + return event.kind in SIDE_EFFECTING_KINDS or event.kind in SENSITIVE_CONTEXT_KINDS From 8043c32dfd1505f2cee2ead08d8d2d0aa34f9770 Mon Sep 17 00:00:00 2001 From: mdheller <21163552+mdheller@users.noreply.github.com> Date: Thu, 30 Apr 2026 13:14:49 -0400 Subject: [PATCH 2/2] Add Policy Fabric adapter tests --- tests/test_policy_fabric.py | 126 ++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 tests/test_policy_fabric.py diff --git a/tests/test_policy_fabric.py b/tests/test_policy_fabric.py new file mode 100644 index 0000000..ede002c --- /dev/null +++ b/tests/test_policy_fabric.py @@ -0,0 +1,126 @@ +from agent_term.events import AgentTermEvent +from agent_term.policy_fabric import ( + ALLOW, + DENY, + PENDING, + InMemoryPolicyFabricBackend, + PolicyDecision, + PolicyFabricAdapter, + requires_admission, +) + + +def make_event(kind: str, source: str, metadata: dict[str, object] | None = None) -> AgentTermEvent: + return AgentTermEvent( + channel="!policyfabric", + sender="@operator", + kind=kind, + source=source, + body="policy test event", + metadata=metadata or {}, + ) + + +def test_non_sensitive_message_does_not_require_admission(): + event = make_event("message", "local") + adapter = PolicyFabricAdapter(InMemoryPolicyFabricBackend()) + + result = adapter.handle(event) + + assert requires_admission(event) is False + assert result.ok is True + assert result.metadata["admission_status"] == "not_required" + + +def test_side_effecting_event_without_decision_fails_closed(): + event = make_event("shell_session", "cloudshell-fog") + adapter = PolicyFabricAdapter(InMemoryPolicyFabricBackend()) + + result = adapter.handle(event) + + assert requires_admission(event) is True + assert result.ok is False + assert result.metadata["fail_closed"] is True + assert result.metadata["deny_reason"] == "no_policy_decision" + + +def test_allow_decision_admits_event(): + decision = PolicyDecision( + decision_id="decision-allow-shell", + action="cloudshell-fog.shell_session", + status=ALLOW, + policy_ref="SocioProphet/policy-fabric#shell-session", + obligations=("record-audit",), + ) + adapter = PolicyFabricAdapter(InMemoryPolicyFabricBackend([decision])) + + result = adapter.handle(make_event("shell_session", "cloudshell-fog")) + + assert result.ok is True + assert result.kind == "decision" + assert result.metadata["admission_status"] == "admitted" + assert result.metadata["policy_decision_id"] == "decision-allow-shell" + assert result.metadata["policy_obligations"] == ["record-audit"] + + +def test_deny_decision_blocks_event(): + decision = PolicyDecision( + decision_id="decision-deny-memory", + action="memory-mesh.memory_recall", + status=DENY, + policy_ref="SocioProphet/policy-fabric#memory", + reason="context release not authorized", + ) + adapter = PolicyFabricAdapter(InMemoryPolicyFabricBackend([decision])) + + result = adapter.handle(make_event("memory_recall", "memory-mesh")) + + assert result.ok is False + assert result.metadata["deny_reason"] == "context release not authorized" + assert result.metadata["policy_status"] == DENY + + +def test_pending_decision_fails_closed(): + decision = PolicyDecision( + decision_id="decision-pending-graph", + action="meshrush.graph_view", + status=PENDING, + policy_ref="SocioProphet/policy-fabric#graph", + ) + adapter = PolicyFabricAdapter(InMemoryPolicyFabricBackend([decision])) + + result = adapter.handle(make_event("graph_view", "meshrush")) + + assert result.ok is False + assert result.metadata["deny_reason"] == "policy_decision_pending" + assert result.metadata["policy_status"] == PENDING + + +def test_explicit_policy_action_is_used(): + decision = PolicyDecision( + decision_id="decision-explicit", + action="custom.context.release", + status=ALLOW, + policy_ref="SocioProphet/policy-fabric#custom", + ) + adapter = PolicyFabricAdapter(InMemoryPolicyFabricBackend([decision])) + event = make_event( + "message", + "local", + { + "policy_action": "custom.context.release", + "requires_policy_admission": True, + }, + ) + + result = adapter.handle(event) + + assert result.ok is True + assert result.metadata["policy_action"] == "custom.context.release" + assert result.metadata["policy_decision_id"] == "decision-explicit" + + +def test_sensitive_context_flag_requires_admission(): + event = make_event("message", "local", {"sensitive_context": True}) + + assert requires_admission(event) is True