Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions docs/integration/ops-history.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# OpsHistory Integration

Status: initial dry-run implementation slice.

AgentTerm is the operator surface for OpsHistory. This integration makes OpsHistory visible before any live local service, Matrix sync, Memory Mesh writeback, or AgentPlane run behavior is enabled.

## Commands

```bash
agent-term ops-history policy --profile active-multi-agent-room
agent-term ops-history replay --thread demo-search
agent-term ops-history context-pack --workroom pi-demo --topic urn:srcos:topic:professional-intelligence
agent-term ops-history redactions
```

All commands are dry-run only in this implementation slice. They emit deterministic JSON plans.

## Boundaries

- No live sync.
- No live Matrix access.
- No live Memory Mesh writeback.
- No AgentPlane execution.
- No browser or operational receipt export.

## Contract role

AgentTerm produces operator-visible plans for:

- OpsHistory sync-policy explanation;
- bounded replay planning;
- context-pack planning for Memory Mesh and AgentPlane;
- redaction posture.

Policy Fabric and Agent Registry references are represented as refs only. Runtime checks land in a later slice.
43 changes: 43 additions & 0 deletions src/agent_term/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Any

from agent_term.events import AgentTermEvent
from agent_term.ops_history import context_pack_plan, policy_explain, redactions_pending, replay_plan
from agent_term.planes import get_plane, iter_planes
from agent_term.store import DEFAULT_DB_PATH, EventStore

Expand Down Expand Up @@ -67,6 +68,27 @@ def build_parser() -> argparse.ArgumentParser:
show_plane = planes_sub.add_parser("show", help="Show one registered SourceOS plane.")
show_plane.add_argument("plane")

ops_history = subparsers.add_parser(
"ops-history",
help="Dry-run OpsHistory plans for policy, replay, context packs, and redactions.",
)
ops_sub = ops_history.add_subparsers(dest="ops_history_command", required=True)

ops_policy = ops_sub.add_parser("policy", help="Explain an OpsHistory sync-policy profile.")
ops_policy.add_argument("--profile", default="active-multi-agent-room")
ops_policy.add_argument("--dry-run", action="store_true", default=True)

ops_replay = ops_sub.add_parser("replay", help="Build an OpsHistory replay dry-run plan.")
ops_replay.add_argument("--thread", required=True)
ops_replay.add_argument("--dry-run", action="store_true", default=True)

ops_context = ops_sub.add_parser("context-pack", help="Build an OpsHistory context-pack dry-run plan.")
ops_context.add_argument("--workroom", required=True)
ops_context.add_argument("--topic")
ops_context.add_argument("--dry-run", action="store_true", default=True)

ops_sub.add_parser("redactions", help="Show pending OpsHistory redaction posture.")

request_shell = subparsers.add_parser(
"request-shell",
help="Record a governed cloudshell-fog shell-session request event.",
Expand Down Expand Up @@ -152,6 +174,11 @@ def parse_metadata(metadata_json: str) -> dict[str, Any]:
return value


def print_json(value: dict[str, Any]) -> int:
print(json.dumps(value, indent=2, sort_keys=True))
return 0


def format_event(event: AgentTermEvent) -> str:
thread = f" thread={event.thread_id}" if event.thread_id else ""
return (
Expand Down Expand Up @@ -304,6 +331,20 @@ def cmd_planes(args: argparse.Namespace) -> int:
raise SystemExit(f"unknown planes command: {args.planes_command}")


def cmd_ops_history(args: argparse.Namespace) -> int:
if not getattr(args, "dry_run", True):
raise SystemExit("OpsHistory commands are dry-run only in this implementation slice")
if args.ops_history_command == "policy":
return print_json(policy_explain(args.profile))
if args.ops_history_command == "replay":
return print_json(replay_plan(args.thread))
if args.ops_history_command == "context-pack":
return print_json(context_pack_plan(args.workroom, topic=args.topic))
if args.ops_history_command == "redactions":
return print_json(redactions_pending())
raise SystemExit(f"unknown ops-history command: {args.ops_history_command}")


def cmd_request_shell(store: EventStore, args: argparse.Namespace) -> int:
metadata = {
"profile": args.profile,
Expand Down Expand Up @@ -650,6 +691,8 @@ def main(argv: list[str] | None = None) -> int:

if args.command == "planes":
return cmd_planes(args)
if args.command == "ops-history":
return cmd_ops_history(args)

store = EventStore(db_path)
try:
Expand Down
189 changes: 189 additions & 0 deletions src/agent_term/ops_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""Dry-run OpsHistory planning helpers for AgentTerm.

These helpers intentionally do not contact Matrix, Policy Fabric, Agent Registry,
Memory Mesh, AgentPlane, or any local service endpoint. They produce deterministic
contract-shaped plans that the operator can inspect before runtime integration.
"""

from __future__ import annotations

from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any


DEFAULT_POLICY_REF = "urn:srcos:policy-decision:ops-history-hydrate-context-demo-0001"
DEFAULT_AGENT_GRANT_REF = "urn:srcos:agent-grant:ops-history-summarizer-demo"
DEFAULT_WORKROOM = "urn:srcos:workroom:professional-intelligence-demo"
DEFAULT_TOPIC = "urn:srcos:topic:professional-intelligence"


@dataclass(frozen=True)
class OpsHistoryScope:
"""Bounded scope for an OpsHistory dry-run plan."""

room_ref: str | None = None
thread_ref: str | None = None
workroom_ref: str = DEFAULT_WORKROOM
topic_ref: str | None = DEFAULT_TOPIC
session_ref: str | None = None

def as_dict(self) -> dict[str, str | None]:
return {
"roomRef": self.room_ref,
"threadRef": self.thread_ref,
"workroomRef": self.workroom_ref,
"topicRef": self.topic_ref,
"sessionRef": self.session_ref,
}


def _now() -> str:
return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")


def policy_explain(profile: str) -> dict[str, Any]:
"""Return an inspectable sync-policy explanation for a named profile."""

if profile == "active-multi-agent-room":
lanes = [
{
"lane": "control",
"priority": "high",
"cadence": [
{"count": 50, "intervalSeconds": 60},
{"count": 20, "intervalSeconds": 300},
{"count": 10, "intervalSeconds": 1800},
],
"payloadMode": "summary",
"requiresPolicyDecision": True,
},
{
"lane": "operational",
"priority": "normal",
"cadence": [
{"count": 50, "intervalSeconds": 180},
{"count": 20, "intervalSeconds": 600},
{"count": 10, "intervalSeconds": 3600},
],
"payloadMode": "metadata-only",
"requiresPolicyDecision": True,
},
{
"lane": "redaction",
"priority": "critical",
"cadence": [
{"count": 20, "intervalSeconds": 5},
{"count": 10, "intervalSeconds": 30},
],
"payloadMode": "redacted",
"requiresPolicyDecision": True,
},
]
else:
lanes = [
{
"lane": "control",
"priority": "maintenance",
"cadence": [{"count": 1, "intervalSeconds": 3600}],
"payloadMode": "metadata-only",
"requiresPolicyDecision": True,
},
{
"lane": "redaction",
"priority": "critical",
"cadence": [{"count": 5, "intervalSeconds": 5}],
"payloadMode": "redacted",
"requiresPolicyDecision": True,
},
]

return {
"planKind": "ops-history-policy-explain",
"dryRun": True,
"profile": profile,
"generatedAt": _now(),
"policyRef": "urn:srcos:ops-history-sync-policy:default-local-first-v1",
"defaultPayloadCapChars": 100000,
"defaultSyncWindowSeconds": 1209600,
"offlineBehavior": "queue-local",
"lanes": lanes,
"redactionPriority": {
"priority": "critical",
"targetPropagationSeconds": 30,
"invalidateContextPacks": True,
"invalidateMemoryWritebacks": True,
"invalidateArtifactExports": True,
},
"nonGoals": [
"no live sync",
"no live memory writeback",
"no raw sensitive payload release",
],
}


def replay_plan(thread_id: str, scope: OpsHistoryScope | None = None) -> dict[str, Any]:
"""Return a deterministic replay dry-run envelope."""

selected_scope = scope or OpsHistoryScope(thread_ref=f"urn:srcos:thread:{thread_id}")
return {
"planKind": "ops-history-replay",
"dryRun": True,
"generatedAt": _now(),
"threadId": thread_id,
"scope": selected_scope.as_dict(),
"sourceEventRefs": [f"urn:srcos:ops-history-event:{thread_id}-demo-0001"],
"policyDecisionRefs": [DEFAULT_POLICY_REF],
"agentRegistryRefs": [DEFAULT_AGENT_GRANT_REF],
"steps": [
"select bounded thread events",
"apply Policy Fabric hydration decision",
"apply Agent Registry grant constraints",
"exclude redacted refs",
"emit replay plan without side effects",
],
}


def context_pack_plan(workroom: str, topic: str | None = None) -> dict[str, Any]:
"""Return a context-pack dry-run envelope for Memory Mesh / AgentPlane handoff."""

topic_ref = topic or DEFAULT_TOPIC
workroom_ref = workroom if workroom.startswith("urn:") else f"urn:srcos:workroom:{workroom}"
return {
"planKind": "ops-history-context-pack",
"dryRun": True,
"generatedAt": _now(),
"contextPackRef": f"urn:srcos:context-pack:{workroom_ref.rsplit(':', 1)[-1]}-ops-history-demo",
"scope": OpsHistoryScope(workroom_ref=workroom_ref, topic_ref=topic_ref).as_dict(),
"payloadMode": "summary",
"sourceEventRefs": ["urn:srcos:ops-history-event:demo-agentterm-0001"],
"policyDecisionRefs": [DEFAULT_POLICY_REF],
"agentRegistryRefs": [DEFAULT_AGENT_GRANT_REF],
"retention": {
"mode": "ephemeral",
"ttlSeconds": 604800,
"writebackAllowed": False,
"promotionRequiresPolicyDecision": True,
},
"targetConsumers": ["memory-mesh", "agentplane"],
}


def redactions_pending() -> dict[str, Any]:
"""Return a deterministic pending-redactions dry-run posture."""

return {
"planKind": "ops-history-redactions-pending",
"dryRun": True,
"generatedAt": _now(),
"pending": [],
"policy": {
"redactionPriority": "critical",
"targetPropagationSeconds": 30,
"invalidateContextPacks": True,
"invalidateMemoryWritebacks": True,
"invalidateArtifactExports": True,
},
}
60 changes: 60 additions & 0 deletions tests/test_ops_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json

from agent_term.cli import main
from agent_term.ops_history import context_pack_plan, policy_explain, redactions_pending, replay_plan


def test_policy_explain_active_multi_agent_room():
plan = policy_explain("active-multi-agent-room")

assert plan["dryRun"] is True
assert plan["profile"] == "active-multi-agent-room"
assert any(lane["lane"] == "redaction" for lane in plan["lanes"])
assert plan["redactionPriority"]["invalidateContextPacks"] is True


def test_replay_plan_is_dry_run():
plan = replay_plan("demo-search")

assert plan["dryRun"] is True
assert plan["threadId"] == "demo-search"
assert plan["policyDecisionRefs"]
assert "emit replay plan without side effects" in plan["steps"]


def test_context_pack_plan_disables_writeback():
plan = context_pack_plan("pi-demo", topic="urn:srcos:topic:professional-intelligence")

assert plan["dryRun"] is True
assert plan["contextPackRef"] == "urn:srcos:context-pack:pi-demo-ops-history-demo"
assert plan["retention"]["writebackAllowed"] is False
assert "memory-mesh" in plan["targetConsumers"]
assert "agentplane" in plan["targetConsumers"]


def test_redactions_pending_is_empty_but_configured():
plan = redactions_pending()

assert plan["dryRun"] is True
assert plan["pending"] == []
assert plan["policy"]["redactionPriority"] == "critical"


def test_cli_ops_history_policy(capsys):
exit_code = main(["ops-history", "policy", "--profile", "active-multi-agent-room"])

captured = capsys.readouterr()
assert exit_code == 0
payload = json.loads(captured.out)
assert payload["planKind"] == "ops-history-policy-explain"
assert payload["dryRun"] is True


def test_cli_ops_history_context_pack(capsys):
exit_code = main(["ops-history", "context-pack", "--workroom", "pi-demo"])

captured = capsys.readouterr()
assert exit_code == 0
payload = json.loads(captured.out)
assert payload["planKind"] == "ops-history-context-pack"
assert payload["retention"]["writebackAllowed"] is False
Loading