Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
358 changes: 330 additions & 28 deletions packages/agent-os/src/agent_os/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,34 +1036,267 @@ def cmd_install_hooks(args: argparse.Namespace) -> int:
return 1


def cmd_validate(args: argparse.Namespace) -> int:
"""Validate policy YAML files."""
output_format = get_output_format(args)
files = args.files or list(Path(".agents").glob("*.yaml"))
def _load_json_schema() -> "dict | None":
"""Load the bundled policy JSON schema, returning None if unavailable."""
schema_path = Path(__file__).parent.parent / "policies" / "policy_schema.json"
if schema_path.exists():
return json.loads(schema_path.read_text(encoding="utf-8"))
return None

if not files:
if output_format == "json":
print(json.dumps({"status": "error", "message": "No policy files found"}, indent=2))

def _validate_yaml_with_line_numbers(filepath: Path, content: dict, strict: bool) -> "tuple[list, list]":
"""Validate a parsed YAML policy dict and return (errors, warnings).

Performs three validation passes in order:
1. JSON Schema validation via ``jsonschema`` (best-effort, skipped if not installed).
2. Required-field checks (``version``, ``name``).
3. Rule structure checks and strict-mode unknown-field warnings.

Args:
filepath: Path to the source YAML file (used in error messages).
content: Parsed YAML content as a plain dict.
strict: When True, unknown top-level fields are reported as warnings.

Returns:
A tuple of (errors, warnings) where each element is a list of
human-readable strings prefixed with the filepath and location.
"""
errors: list[str] = []
warnings: list[str] = []

# ── Pass 1: JSON Schema validation (best-effort) ──────────────────────
schema = _load_json_schema()
if schema is not None:
try:
import jsonschema # type: ignore[import-untyped]

validator = jsonschema.Draft7Validator(schema)
for ve in sorted(validator.iter_errors(content), key=lambda e: list(e.absolute_path)):
# Build a human-readable location string from the JSON path
location = " -> ".join(str(p) for p in ve.absolute_path) or "<root>"
errors.append(f"{filepath}: [{location}] {ve.message}")
except ImportError:
pass # jsonschema not installed — fall through to manual checks

# ── Pass 2: Required field checks ────────────────────────────────────
REQUIRED_FIELDS = ["version", "name"]
for field in REQUIRED_FIELDS:
if field not in content:
errors.append(f"{filepath}: Missing required field: '{field}'")

# Validate version format
if "version" in content:
version = str(content["version"])
if not re.match(r"^\d+(\.\d+)*$", version):
warnings.append(
f"{filepath}: Version '{version}' should be numeric (e.g., '1.0')"
)

# ── Pass 3: Rule structure checks ────────────────────────────────────
VALID_RULE_TYPES = ["allow", "deny", "audit", "require"]
VALID_ACTIONS = ["allow", "deny", "audit", "block"]

if "rules" in content:
rules = content["rules"]
if not isinstance(rules, list):
errors.append(f"{filepath}: 'rules' must be a list, got {type(rules).__name__}")
else:
print(format_error("No policy files found to validate"))
return 1
for i, rule in enumerate(rules):
rule_ref = f"rules[{i + 1}]"
if not isinstance(rule, dict):
errors.append(f"{filepath}: {rule_ref} must be a mapping, got {type(rule).__name__}")
continue
# name is required per schema
if "name" not in rule:
errors.append(f"{filepath}: {rule_ref} missing required field 'name'")
# action must be a valid value
if "action" in rule and rule["action"] not in VALID_ACTIONS:
errors.append(
f"{filepath}: {rule_ref} invalid action '{rule['action']}' "
f"(valid: {VALID_ACTIONS})"
)
# legacy 'type' field warning
if "type" in rule and rule["type"] not in VALID_RULE_TYPES:
warnings.append(
f"{filepath}: {rule_ref} unknown type '{rule['type']}' "
f"(valid: {VALID_RULE_TYPES})"
)

# ── Pass 4: Strict mode — unknown top-level fields ───────────────────
if strict:
KNOWN_FIELDS = [
"version", "name", "description", "rules", "defaults",
"constraints", "signals", "allowed_actions", "blocked_actions",
"a2a_conversation_policy",
]
for field in content.keys():
if field not in KNOWN_FIELDS:
warnings.append(f"{filepath}: Unknown top-level field '{field}'")

results = []
all_valid = True
return errors, warnings

for f in files:
# Simple simulated validation
is_valid = True
results.append({"file": str(f), "valid": is_valid})

if output_format == "json":
print(json.dumps({"status": "success" if all_valid else "error", "files": results}, indent=2))
def cmd_validate(args: argparse.Namespace) -> int:
"""Validate policy YAML files against the policy schema.

Parses each file, runs JSON Schema and structural validation, and
reports errors with field locations. Exits with a non-zero code when
any file fails validation (CI-friendly).

Args:
args: Parsed CLI arguments. Expects ``args.files`` (list of paths)
and ``args.strict`` (bool).

Returns:
0 if all files are valid, 1 if any errors were found.
"""
import yaml

print(f"\n{Colors.BOLD}🔍 Validating Policy Files{Colors.RESET}\n")

# ── Discover files ────────────────────────────────────────────────────
files_to_check: list[Path] = []
if args.files:
# Support both direct file paths and glob-style patterns
for f in args.files:
p = Path(f)
if "*" in f or "?" in f:
files_to_check.extend(sorted(Path(".").glob(f)))
else:
files_to_check.append(p)
else:
for r in results:
mark = f"{Colors.GREEN}\u2713{Colors.RESET}" if r["valid"] else f"{Colors.RED}\u2717{Colors.RESET}"
print(f"{mark} {r['file']}")
# Default: validate all YAML files in .agents/
agents_dir = Path(".agents")
if agents_dir.exists():
files_to_check = (
sorted(agents_dir.glob("*.yaml")) + sorted(agents_dir.glob("*.yml"))
)
if not files_to_check:
print(f"{Colors.YELLOW}No policy files found.{Colors.RESET}")
print("Run 'agentos init' to create default policies, or specify files directly.")
return 0

all_errors: list[str] = []
all_warnings: list[str] = []
valid_count = 0

for filepath in files_to_check:
if not filepath.exists():
all_errors.append(f"{filepath}: File not found")
print(f" {Colors.RED}✗{Colors.RESET} {filepath} — not found")
continue

print(f" Checking {filepath}...", end=" ", flush=True)

try:
# ── Step 1: Parse YAML (captures syntax errors with line numbers)
with open(filepath, encoding="utf-8") as f:
raw_text = f.read()

try:
content = yaml.safe_load(raw_text)
except yaml.YAMLError as exc:
# yaml.YAMLError includes line/column info in its string repr
msg = f"{filepath}: YAML syntax error — {exc}"
all_errors.append(msg)
print(f"{Colors.RED}PARSE ERROR{Colors.RESET}")
continue

if content is None:
all_errors.append(f"{filepath}: File is empty")
print(f"{Colors.RED}EMPTY{Colors.RESET}")
continue

if not isinstance(content, dict):
all_errors.append(
f"{filepath}: Top-level value must be a mapping, got {type(content).__name__}"
)
print(f"{Colors.RED}INVALID{Colors.RESET}")
continue

# ── Step 2: Schema + structural validation ─────────────────────
file_errors, file_warnings = _validate_yaml_with_line_numbers(
filepath, content, strict=getattr(args, "strict", False)
)

if file_errors:
all_errors.extend(file_errors)
print(f"{Colors.RED}INVALID{Colors.RESET}")
elif file_warnings:
all_warnings.extend(file_warnings)
print(f"{Colors.YELLOW}OK (warnings){Colors.RESET}")
valid_count += 1
else:
print(f"{Colors.GREEN}OK{Colors.RESET}")
valid_count += 1

except Exception as exc:
all_errors.append(f"{filepath}: Unexpected error — {exc}")
print(f"{Colors.RED}ERROR{Colors.RESET}")

print()

# ── Summary output ────────────────────────────────────────────────────
if all_warnings:
print(f"{Colors.YELLOW}Warnings:{Colors.RESET}")
for w in all_warnings:
print(f" ⚠️ {w}")
print()

if all_errors:
print(f"{Colors.RED}Errors:{Colors.RESET}")
for e in all_errors:
print(f" ❌ {e}")
print()
print(
f"{Colors.RED}Validation failed.{Colors.RESET} "
f"{valid_count}/{len(files_to_check)} file(s) valid."
)
return 1


print(f"{Colors.GREEN}✓ All {valid_count} policy file(s) valid.{Colors.RESET}")
return 0


def cmd_policy(args: argparse.Namespace) -> int:
"""Dispatch 'agentos policy <subcommand>' to the policies CLI.

Routes ``agentos policy validate <file>`` and related subcommands
to :mod:`agent_os.policies.cli`, which provides full JSON-Schema
validation and Pydantic model validation in a single pass.

Args:
args: Parsed CLI arguments. Expects ``args.policy_command`` and
any subcommand-specific attributes set by the policy subparser.

Returns:
Exit code from the delegated command (0 = success, 1 = failure,
2 = runtime error).
"""
from agent_os.policies import cli as policies_cli # type: ignore[import]

sub = getattr(args, "policy_command", None)
if sub == "validate":
return policies_cli.cmd_validate(args)
if sub == "test":
return policies_cli.cmd_test(args)
if sub == "diff":
return policies_cli.cmd_diff(args)

# No subcommand given — print help
print("Usage: agentos policy <validate|test|diff>")
print()
print(" validate <file> Validate a policy YAML/JSON file")
print(" test <policy> <scenarios> Run scenario tests against a policy")
print(" diff <file1> <file2> Show differences between two policies")
return 0


# ============================================================================
# HTTP API Server (agentos serve)
# ============================================================================

return 0 if all_valid else 1


def cmd_metrics(args: argparse.Namespace) -> int:
Expand Down Expand Up @@ -1193,22 +1426,91 @@ def main() -> int:
validate_parser = subparsers.add_parser("validate", help="Validate policy YAML files")
validate_parser.add_argument("files", nargs="*", help="Files to validate")
validate_parser.add_argument("--json", action="store_true", help="Output in JSON format")
validate_parser.add_argument("--strict", action="store_true", help="Strict mode: treat warnings as errors")

# metrics
metrics_parser = subparsers.add_parser("metrics", help="Output Prometheus metrics")
metrics_parser.add_argument("--json", action="store_true", help="Output in JSON format")

# policy command — 'agentos policy validate <file>' with full JSON-Schema support
policy_parser = subparsers.add_parser(
"policy",
help="Policy-as-code tools: validate, test, and diff governance policies",
)
policy_subparsers = policy_parser.add_subparsers(dest="policy_command")

# agentos policy validate <file>
pol_validate = policy_subparsers.add_parser(
"validate",
help="Validate a policy YAML/JSON file against the schema",
)
pol_validate.add_argument("path", help="Path to the policy file to validate")

# agentos policy test <policy> <scenarios>
pol_test = policy_subparsers.add_parser(
"test",
help="Test a policy against a set of YAML scenarios",
)
pol_test.add_argument("policy_path", help="Path to the policy file")
pol_test.add_argument("test_scenarios_path", help="Path to the test scenarios YAML")

# agentos policy diff <file1> <file2>
pol_diff = policy_subparsers.add_parser(
"diff",
help="Show differences between two policy files",
)
pol_diff.add_argument("path1", help="First policy file")
pol_diff.add_argument("path2", help="Second policy file")

# serve command
serve_parser = subparsers.add_parser(
"serve",
help="Start the HTTP API server for Agent OS",
description="Launch an HTTP server exposing health, status, agents, and "
"execution endpoints for programmatic access to the kernel.",
)
serve_parser.add_argument(
"--port", type=int, default=8080, help="Port to listen on (default: 8080)"
)
serve_parser.add_argument(
"--host", default="0.0.0.0", help="Host to bind to (default: 0.0.0.0)"
)

# health
health_parser = subparsers.add_parser("health", help="Check system health")
health_parser.add_argument("--json", action="store_true", help="Output in JSON format")

# serve (not updated for JSON as it is a server)
serve_parser = subparsers.add_parser("serve", help="Start HTTP API server")
serve_parser.add_argument("--port", type=int, default=8000, help="Port to listen on")
# metrics
metrics_parser = subparsers.add_parser("metrics", help="Output Prometheus metrics")
metrics_parser.add_argument("--json", action="store_true", help="Output in JSON format")

args = parser.parse_args()

if not args.command:
# Handle CI mode
if hasattr(args, 'ci') and args.ci:
Colors.disable()

if args.version:
try:
from agent_os import __version__
print(f"agentos {__version__}")
except Exception:
print("agentos (version unknown)")
return 0

commands = {
"init": cmd_init,
"secure": cmd_secure,
"audit": cmd_audit,
"status": cmd_status,
"check": cmd_check,
"review": cmd_review,
"install-hooks": cmd_install_hooks,
"validate": cmd_validate,
"policy": cmd_policy,
"metrics": cmd_metrics,
"health": cmd_health,
}

handler = commands.get(args.command)
if handler is None:
parser.print_help()
return 0

Expand Down
Loading