Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 200 additions & 0 deletions src/agent_term/policy_fabric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
"""Policy Fabric admission adapter primitives.

AgentTerm is not the authority for policy. This module provides a small,
fakeable adapter boundary so side-effecting actions and sensitive context release
can be admitted or denied by a Policy Fabric-compatible backend before dispatch.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Protocol

from agent_term.adapters import AdapterResult
from agent_term.events import AgentTermEvent


ALLOW = "allow"
DENY = "deny"
PENDING = "pending"
UNKNOWN = "unknown"

SIDE_EFFECTING_KINDS = frozenset(
{
"shell_session",
"shell_attach",
"workspace_materialization",
"context_pack",
"memory_recall",
"memory_write",
"semantic_membrane",
"investigation",
"search_packet",
"graph_view",
"graph_diffusion",
"graph_artifact",
"run",
"replay",
"github_mutation",
"ci_retry",
"tool_grant",
"revocation",
}
)

SENSITIVE_CONTEXT_KINDS = frozenset(
{
"context_pack",
"memory_recall",
"memory_write",
"semantic_thread",
"claim",
"citation",
"investigation",
"search_packet",
"synthesis",
}
)


@dataclass(frozen=True)
class PolicyDecision:
"""Resolved policy decision for an AgentTerm event."""

decision_id: str
action: str
status: str
policy_ref: str
reason: str | None = None
obligations: tuple[str, ...] = ()
metadata: dict[str, object] = field(default_factory=dict)

@property
def is_allowed(self) -> bool:
return self.status == ALLOW

def to_metadata(self) -> dict[str, object]:
return {
"policy_decision_id": self.decision_id,
"policy_action": self.action,
"policy_status": self.status,
"policy_ref": self.policy_ref,
"policy_reason": self.reason,
"policy_obligations": list(self.obligations),
**self.metadata,
}


class PolicyFabricBackend(Protocol):
"""Backend contract for Policy Fabric decision lookup."""

def evaluate(self, event: AgentTermEvent) -> PolicyDecision | None:
"""Return the admission decision for an event, or None if unknown."""


class InMemoryPolicyFabricBackend:
"""Test/development backend for Policy Fabric decisions."""

def __init__(self, decisions: list[PolicyDecision] | None = None) -> None:
self._decisions = {decision.action: decision for decision in decisions or []}

def evaluate(self, event: AgentTermEvent) -> PolicyDecision | None:
action = action_for_event(event)
return self._decisions.get(action)


class PolicyFabricAdapter:
"""Adapter that enforces policy admission before sensitive or side-effecting work."""

key = "policy-fabric"

def __init__(self, backend: PolicyFabricBackend) -> None:
self.backend = backend

def supports(self, event: AgentTermEvent) -> bool:
return event.source == self.key or requires_admission(event)

def handle(self, event: AgentTermEvent) -> AdapterResult:
action = action_for_event(event)
if not requires_admission(event):
return AdapterResult(
ok=True,
source=self.key,
body=f"Policy Fabric admission not required for {action}",
kind="policy_check",
metadata=self._base_metadata(event, action=action, status="not_required"),
)

decision = self.backend.evaluate(event)
if decision is None:
return self._deny(event, action, "no_policy_decision")
if decision.status == PENDING:
return self._deny(event, action, "policy_decision_pending", decision=decision)
if decision.status != ALLOW:
reason = decision.reason or "policy_denied"
return self._deny(event, action, reason, decision=decision)

return AdapterResult(
ok=True,
source=self.key,
body=f"Policy Fabric admitted {action}",
kind="decision",
metadata={
**self._base_metadata(event, action=action, status="admitted"),
**decision.to_metadata(),
},
)

def _deny(
self,
event: AgentTermEvent,
action: str,
reason: str,
*,
decision: PolicyDecision | None = None,
) -> AdapterResult:
metadata = self._base_metadata(event, action=action, status="denied")
metadata["deny_reason"] = reason
if decision:
metadata.update(decision.to_metadata())
return AdapterResult(
ok=False,
source=self.key,
body=f"Policy Fabric denied {action}: {reason}",
kind="decision",
metadata=metadata,
)

def _base_metadata(
self,
event: AgentTermEvent,
*,
action: str,
status: str,
) -> dict[str, object]:
return {
"request_event_id": event.event_id,
"policy_action": action,
"admission_status": status,
"policy_check_at": datetime.now(UTC).isoformat(),
"requires_admission": requires_admission(event),
"fail_closed": True,
}


def action_for_event(event: AgentTermEvent) -> str:
explicit_action = event.metadata.get("policy_action") or event.metadata.get("action")
if explicit_action:
return str(explicit_action)
return f"{event.source}.{event.kind}"


def requires_admission(event: AgentTermEvent) -> bool:
if bool(event.metadata.get("requires_policy_admission")):
return True
if bool(event.metadata.get("sensitive_context")):
return True
if bool(event.metadata.get("approval_required")):
return True
return event.kind in SIDE_EFFECTING_KINDS or event.kind in SENSITIVE_CONTEXT_KINDS
126 changes: 126 additions & 0 deletions tests/test_policy_fabric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from agent_term.events import AgentTermEvent
from agent_term.policy_fabric import (
ALLOW,
DENY,
PENDING,
InMemoryPolicyFabricBackend,
PolicyDecision,
PolicyFabricAdapter,
requires_admission,
)


def make_event(kind: str, source: str, metadata: dict[str, object] | None = None) -> AgentTermEvent:
return AgentTermEvent(
channel="!policyfabric",
sender="@operator",
kind=kind,
source=source,
body="policy test event",
metadata=metadata or {},
)


def test_non_sensitive_message_does_not_require_admission():
event = make_event("message", "local")
adapter = PolicyFabricAdapter(InMemoryPolicyFabricBackend())

result = adapter.handle(event)

assert requires_admission(event) is False
assert result.ok is True
assert result.metadata["admission_status"] == "not_required"


def test_side_effecting_event_without_decision_fails_closed():
event = make_event("shell_session", "cloudshell-fog")
adapter = PolicyFabricAdapter(InMemoryPolicyFabricBackend())

result = adapter.handle(event)

assert requires_admission(event) is True
assert result.ok is False
assert result.metadata["fail_closed"] is True
assert result.metadata["deny_reason"] == "no_policy_decision"


def test_allow_decision_admits_event():
decision = PolicyDecision(
decision_id="decision-allow-shell",
action="cloudshell-fog.shell_session",
status=ALLOW,
policy_ref="SocioProphet/policy-fabric#shell-session",
obligations=("record-audit",),
)
adapter = PolicyFabricAdapter(InMemoryPolicyFabricBackend([decision]))

result = adapter.handle(make_event("shell_session", "cloudshell-fog"))

assert result.ok is True
assert result.kind == "decision"
assert result.metadata["admission_status"] == "admitted"
assert result.metadata["policy_decision_id"] == "decision-allow-shell"
assert result.metadata["policy_obligations"] == ["record-audit"]


def test_deny_decision_blocks_event():
decision = PolicyDecision(
decision_id="decision-deny-memory",
action="memory-mesh.memory_recall",
status=DENY,
policy_ref="SocioProphet/policy-fabric#memory",
reason="context release not authorized",
)
adapter = PolicyFabricAdapter(InMemoryPolicyFabricBackend([decision]))

result = adapter.handle(make_event("memory_recall", "memory-mesh"))

assert result.ok is False
assert result.metadata["deny_reason"] == "context release not authorized"
assert result.metadata["policy_status"] == DENY


def test_pending_decision_fails_closed():
decision = PolicyDecision(
decision_id="decision-pending-graph",
action="meshrush.graph_view",
status=PENDING,
policy_ref="SocioProphet/policy-fabric#graph",
)
adapter = PolicyFabricAdapter(InMemoryPolicyFabricBackend([decision]))

result = adapter.handle(make_event("graph_view", "meshrush"))

assert result.ok is False
assert result.metadata["deny_reason"] == "policy_decision_pending"
assert result.metadata["policy_status"] == PENDING


def test_explicit_policy_action_is_used():
decision = PolicyDecision(
decision_id="decision-explicit",
action="custom.context.release",
status=ALLOW,
policy_ref="SocioProphet/policy-fabric#custom",
)
adapter = PolicyFabricAdapter(InMemoryPolicyFabricBackend([decision]))
event = make_event(
"message",
"local",
{
"policy_action": "custom.context.release",
"requires_policy_admission": True,
},
)

result = adapter.handle(event)

assert result.ok is True
assert result.metadata["policy_action"] == "custom.context.release"
assert result.metadata["policy_decision_id"] == "decision-explicit"


def test_sensitive_context_flag_requires_admission():
event = make_event("message", "local", {"sensitive_context": True})

assert requires_admission(event) is True
Loading