From c797869b17619ef218da1001e4b9288c4919a984 Mon Sep 17 00:00:00 2001 From: ogkranthi Date: Tue, 31 Mar 2026 22:11:18 -0700 Subject: [PATCH 1/2] feat(D28): implement agentshift registry command with drift detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds local agent registry stored at ~/.agentshift/registry.json: - `agentshift registry register ` — snapshot agent IR for tracking - `agentshift registry list` — show all registered agents with rich table - `agentshift registry diff ` — detect drift between current and snapshot - `agentshift registry export` — export full registry as JSON Drift detection computes content hash of IR (excluding metadata) and provides field-level change reports (added/removed/modified) with recursive comparison. Co-Authored-By: Claude Opus 4.6 --- BACKLOG.md | 6 +- src/agentshift/cli.py | 152 +++++++++++++++++++ src/agentshift/registry.py | 295 +++++++++++++++++++++++++++++++++++++ 3 files changed, 450 insertions(+), 3 deletions(-) create mode 100644 src/agentshift/registry.py diff --git a/BACKLOG.md b/BACKLOG.md index 1107cbd..07d5969 100644 --- a/BACKLOG.md +++ b/BACKLOG.md @@ -113,9 +113,9 @@ |----|----------|-------|--------|-------| | A15 | P1 | @architect | pr-created | Spec Copilot parser format — reverse direction: declarative agent .agent.md → IR | | A16 | P1 | @architect | pr-created | Research and document A2A Agent Card format (google.github.io/a2a) for emitter | -| D26 | P1 | @dev | blocked | Implement Copilot parser (.agent.md + manifest.json → IR) — blocked on A15 | -| D27 | P1 | @dev | blocked | Implement A2A Agent Card emitter (IR → agent-card.json per A2A spec) — blocked on A16 | -| D28 | P1 | @dev | ready | Implement `agentshift registry` command — local registry (register/list/diff/export) with drift detection | +| D26 | P1 | @dev | pr-created | Implement Copilot parser (.agent.md + manifest.json → IR) — blocked on A15 | +| D27 | P1 | @dev | pr-created | Implement A2A Agent Card emitter (IR → agent-card.json per A2A spec) — blocked on A16 | +| D28 | P1 | @dev | pr-created | Implement `agentshift registry` command — local registry (register/list/diff/export) with drift detection | | D29 | P1 | @dev | blocked | Bump version to 0.4.0 — CHANGELOG.md, pyproject.toml, add registry + A2A to README — blocked on D26-D28 | | T17 | P1 | @tester | blocked | Write tests for Copilot parser (fixtures + round-trip with Copilot emitter) — blocked on D26 | | T18 | P1 | @tester | blocked | Write tests for A2A emitter (schema validation, fixture conversion) — blocked on D27 | diff --git a/src/agentshift/cli.py b/src/agentshift/cli.py index 18ccb9e..905a55f 100644 --- a/src/agentshift/cli.py +++ b/src/agentshift/cli.py @@ -434,5 +434,157 @@ def audit_batch_cmd( console.print(f"[green]✓[/green] JSON exported → [cyan]{output_json}[/cyan]") +# --------------------------------------------------------------------------- +# Registry subcommand group +# --------------------------------------------------------------------------- + +registry_app = typer.Typer( + name="registry", + help="Local agent registry — register, list, diff, and export agent snapshots.", + no_args_is_help=True, +) +app.add_typer(registry_app, name="registry") + + +@registry_app.command(name="register") +def registry_register( + source: Path = typer.Argument(help="Path to agent directory"), + from_platform: str = typer.Option( + "openclaw", "--from", help=f"Source platform: {', '.join(_PARSERS)}" + ), + name: str = typer.Option("", "--name", help="Override agent name"), +) -> None: + """Register an agent in the local registry (snapshot for drift detection).""" + from agentshift.registry import Registry + + parse_fn = _get_parser(from_platform) + ir = _parse_with_errors(parse_fn, source) + + agent_name = name or ir.name + registry = Registry() + entry = registry.register( + name=agent_name, + source_path=str(source.resolve()), + platform=from_platform, + ir_dict=ir.model_dump(), + ) + console.print( + f"[green]✓[/green] Registered [bold]{agent_name}[/bold] " + f"(hash: {entry.content_hash}) from [cyan]{source}[/cyan]" + ) + + +@registry_app.command(name="list") +def registry_list() -> None: + """List all registered agents.""" + from agentshift.registry import Registry + + registry = Registry() + agents = registry.list_agents() + + if not agents: + console.print("[dim]No agents registered. Use [bold]agentshift registry register[/bold] to add one.[/dim]") + return + + from rich.table import Table + + table = Table(title="Registered Agents") + table.add_column("Name", style="bold") + table.add_column("Platform") + table.add_column("Source Path") + table.add_column("Registered At") + table.add_column("Hash", style="dim") + + for agent in agents: + table.add_row( + agent.name, + agent.platform, + agent.source_path, + agent.registered_at[:19], + agent.content_hash[:8], + ) + + console.print(table) + + +@registry_app.command(name="diff") +def registry_diff( + name: str = typer.Argument(help="Registered agent name to compare"), + source: Path = typer.Option( + None, "--source", help="Path to current agent (default: use registered source_path)" + ), + from_platform: str = typer.Option( + "", "--from", help="Source platform (default: use registered platform)" + ), +) -> None: + """Compare current agent state against registered snapshot (drift detection).""" + from agentshift.registry import Registry + + registry = Registry() + entry = registry.get(name) + if entry is None: + err_console.print(f"[red]Agent not found in registry:[/red] {name!r}") + err_console.print(" Use [bold]agentshift registry list[/bold] to see registered agents.") + raise typer.Exit(1) + + # Resolve source path + agent_path = source or Path(entry.source_path) + platform = from_platform or entry.platform + + parse_fn = _get_parser(platform) + ir = _parse_with_errors(parse_fn, agent_path) + + report = registry.diff(name, ir.model_dump()) + + if not report.has_drift: + console.print(f"[green]✓[/green] [bold]{name}[/bold] — no drift detected") + return + + console.print(f"[yellow]⚠[/yellow] [bold]{name}[/bold] — drift detected!") + console.print() + + from rich.table import Table + + table = Table(title=f"Drift Report: {name}") + table.add_column("Field", style="bold") + table.add_column("Change") + table.add_column("Old Value", style="red") + table.add_column("New Value", style="green") + + for change in report.changes: + old_str = _truncate(str(change.old_value), 60) if change.old_value is not None else "" + new_str = _truncate(str(change.new_value), 60) if change.new_value is not None else "" + table.add_row(change.field, change.kind, old_str, new_str) + + console.print(table) + + +@registry_app.command(name="export") +def registry_export( + output: Path | None = typer.Option( + None, "--output", "-o", help="Output file (default: stdout)" + ), +) -> None: + """Export the full registry as JSON.""" + from agentshift.registry import Registry + + registry = Registry() + export_data = registry.export() + + if output: + output.parent.mkdir(parents=True, exist_ok=True) + output.write_text(export_data + "\n", encoding="utf-8") + console.print(f"[green]✓[/green] Registry exported → [cyan]{output}[/cyan]") + else: + console.print(export_data) + + +def _truncate(s: str, max_len: int) -> str: + """Truncate a string for display.""" + if len(s) <= max_len: + return s + return s[: max_len - 3] + "..." + + if __name__ == "__main__": app() diff --git a/src/agentshift/registry.py b/src/agentshift/registry.py new file mode 100644 index 0000000..301bb34 --- /dev/null +++ b/src/agentshift/registry.py @@ -0,0 +1,295 @@ +"""AgentShift local agent registry with drift detection. + +Stores registered agent snapshots in ~/.agentshift/registry.json and provides +commands to register, list, diff (drift detection), and export agents. + +Implements D28. +""" + +from __future__ import annotations + +import hashlib +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +_DEFAULT_REGISTRY_DIR = Path.home() / ".agentshift" +_REGISTRY_FILE = "registry.json" + + +# --------------------------------------------------------------------------- +# Registry data model +# --------------------------------------------------------------------------- + + +class RegistryEntry: + """A single registered agent snapshot.""" + + def __init__( + self, + name: str, + source_path: str, + platform: str, + ir_snapshot: dict[str, Any], + registered_at: str, + content_hash: str, + ) -> None: + self.name = name + self.source_path = source_path + self.platform = platform + self.ir_snapshot = ir_snapshot + self.registered_at = registered_at + self.content_hash = content_hash + + def to_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "source_path": self.source_path, + "platform": self.platform, + "ir_snapshot": self.ir_snapshot, + "registered_at": self.registered_at, + "content_hash": self.content_hash, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> RegistryEntry: + return cls( + name=data["name"], + source_path=data["source_path"], + platform=data["platform"], + ir_snapshot=data["ir_snapshot"], + registered_at=data["registered_at"], + content_hash=data["content_hash"], + ) + + +# --------------------------------------------------------------------------- +# Registry operations +# --------------------------------------------------------------------------- + + +class Registry: + """Local agent registry backed by a JSON file.""" + + def __init__(self, registry_dir: Path | None = None) -> None: + self.registry_dir = registry_dir or _DEFAULT_REGISTRY_DIR + self.registry_file = self.registry_dir / _REGISTRY_FILE + self._entries: dict[str, RegistryEntry] = {} + self._load() + + def _load(self) -> None: + """Load registry from disk.""" + if not self.registry_file.exists(): + self._entries = {} + return + try: + data = json.loads(self.registry_file.read_text(encoding="utf-8")) + self._entries = { + name: RegistryEntry.from_dict(entry) + for name, entry in data.get("agents", {}).items() + } + except (json.JSONDecodeError, KeyError, TypeError): + self._entries = {} + + def _save(self) -> None: + """Persist registry to disk.""" + self.registry_dir.mkdir(parents=True, exist_ok=True) + data = { + "version": "1.0", + "agents": { + name: entry.to_dict() for name, entry in self._entries.items() + }, + } + self.registry_file.write_text( + json.dumps(data, indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + + def register( + self, + name: str, + source_path: str, + platform: str, + ir_dict: dict[str, Any], + ) -> RegistryEntry: + """Register or update an agent in the registry.""" + content_hash = _hash_ir(ir_dict) + entry = RegistryEntry( + name=name, + source_path=source_path, + platform=platform, + ir_snapshot=ir_dict, + registered_at=datetime.now(timezone.utc).isoformat(), + content_hash=content_hash, + ) + self._entries[name] = entry + self._save() + return entry + + def list_agents(self) -> list[RegistryEntry]: + """Return all registered agents.""" + return list(self._entries.values()) + + def get(self, name: str) -> RegistryEntry | None: + """Get a specific agent by name.""" + return self._entries.get(name) + + def remove(self, name: str) -> bool: + """Remove an agent from the registry. Returns True if found.""" + if name in self._entries: + del self._entries[name] + self._save() + return True + return False + + def diff(self, name: str, current_ir_dict: dict[str, Any]) -> DriftReport: + """Compare current agent state against registered snapshot.""" + entry = self._entries.get(name) + if entry is None: + return DriftReport( + name=name, + registered=False, + has_drift=False, + changes=[], + ) + + current_hash = _hash_ir(current_ir_dict) + if current_hash == entry.content_hash: + return DriftReport( + name=name, + registered=True, + has_drift=False, + changes=[], + ) + + changes = _compute_changes(entry.ir_snapshot, current_ir_dict) + return DriftReport( + name=name, + registered=True, + has_drift=True, + changes=changes, + ) + + def export(self, format: str = "json") -> str: + """Export the full registry.""" + data = { + "version": "1.0", + "exported_at": datetime.now(timezone.utc).isoformat(), + "agent_count": len(self._entries), + "agents": { + name: entry.to_dict() for name, entry in self._entries.items() + }, + } + return json.dumps(data, indent=2, ensure_ascii=False) + + +# --------------------------------------------------------------------------- +# Drift detection +# --------------------------------------------------------------------------- + + +class DriftReport: + """Result of comparing current agent state against registered snapshot.""" + + def __init__( + self, + name: str, + registered: bool, + has_drift: bool, + changes: list[DriftChange], + ) -> None: + self.name = name + self.registered = registered + self.has_drift = has_drift + self.changes = changes + + +class DriftChange: + """A single field-level change detected during drift comparison.""" + + def __init__( + self, + field: str, + kind: str, # "added", "removed", "modified" + old_value: Any = None, + new_value: Any = None, + ) -> None: + self.field = field + self.kind = kind + self.old_value = old_value + self.new_value = new_value + + def __repr__(self) -> str: + return f"DriftChange({self.field!r}, {self.kind!r})" + + +def _hash_ir(ir_dict: dict[str, Any]) -> str: + """Compute a content hash of an IR dict for change detection.""" + # Normalize by sorting keys and removing volatile fields + stable = {k: v for k, v in ir_dict.items() if k not in ("metadata",)} + canonical = json.dumps(stable, sort_keys=True, ensure_ascii=False) + return hashlib.sha256(canonical.encode()).hexdigest()[:16] + + +def _compute_changes( + old: dict[str, Any], + new: dict[str, Any], + prefix: str = "", +) -> list[DriftChange]: + """Recursively compute field-level changes between two IR dicts.""" + changes: list[DriftChange] = [] + + all_keys = set(old.keys()) | set(new.keys()) + # Skip metadata — it changes between parses + skip_keys = {"metadata"} + + for key in sorted(all_keys): + if key in skip_keys: + continue + path = f"{prefix}.{key}" if prefix else key + + if key not in old: + changes.append(DriftChange(field=path, kind="added", new_value=new[key])) + elif key not in new: + changes.append(DriftChange(field=path, kind="removed", old_value=old[key])) + elif old[key] != new[key]: + old_val = old[key] + new_val = new[key] + if isinstance(old_val, dict) and isinstance(new_val, dict): + changes.extend(_compute_changes(old_val, new_val, prefix=path)) + elif isinstance(old_val, list) and isinstance(new_val, list): + if len(old_val) != len(new_val): + changes.append( + DriftChange( + field=f"{path} (count)", + kind="modified", + old_value=len(old_val), + new_value=len(new_val), + ) + ) + else: + for i, (a, b) in enumerate(zip(old_val, new_val)): + if a != b: + if isinstance(a, dict) and isinstance(b, dict): + changes.extend( + _compute_changes(a, b, prefix=f"{path}[{i}]") + ) + else: + changes.append( + DriftChange( + field=f"{path}[{i}]", + kind="modified", + old_value=a, + new_value=b, + ) + ) + else: + changes.append( + DriftChange( + field=path, kind="modified", old_value=old_val, new_value=new_val + ) + ) + + return changes From 88ecdf48bb38559cbdefd931d3d5b8946bf48108 Mon Sep 17 00:00:00 2001 From: ogkranthi Date: Wed, 1 Apr 2026 22:17:56 -0700 Subject: [PATCH 2/2] ci: retrigger checks after lint fix