From c1861f37baee811be139cff379c80210ad4fc65a Mon Sep 17 00:00:00 2001 From: Kanish Tyagi Date: Tue, 31 Mar 2026 19:27:37 -0500 Subject: [PATCH 1/2] feat: add policy YAML validation command to CLI --- .../agent-os/src/agent_os/cli/__init__.py | 342 +++++++++++++++++- 1 file changed, 324 insertions(+), 18 deletions(-) diff --git a/packages/agent-os/src/agent_os/cli/__init__.py b/packages/agent-os/src/agent_os/cli/__init__.py index a8daf946..c53da66a 100644 --- a/packages/agent-os/src/agent_os/cli/__init__.py +++ b/packages/agent-os/src/agent_os/cli/__init__.py @@ -1036,16 +1036,222 @@ def cmd_install_hooks(args: argparse.Namespace) -> int: return 1 -def cmd_validate(args: argparse.Namespace) -> int: - """Validate policy YAML files.""" - output_format = get_output_format(args) - files = args.files or list(Path(".agents").glob("*.yaml")) +def _load_json_schema() -> "dict | None": + """Load the bundled policy JSON schema, returning None if unavailable.""" + schema_path = Path(__file__).parent.parent / "policies" / "policy_schema.json" + if schema_path.exists(): + return json.loads(schema_path.read_text(encoding="utf-8")) + return None - if not files: - if output_format == "json": - print(json.dumps({"status": "error", "message": "No policy files found"}, indent=2)) + +def _validate_yaml_with_line_numbers(filepath: Path, content: dict, strict: bool) -> "tuple[list, list]": + """Validate a parsed YAML policy dict and return (errors, warnings). + + Performs three validation passes in order: + 1. JSON Schema validation via ``jsonschema`` (best-effort, skipped if not installed). + 2. Required-field checks (``version``, ``name``). + 3. Rule structure checks and strict-mode unknown-field warnings. + + Args: + filepath: Path to the source YAML file (used in error messages). + content: Parsed YAML content as a plain dict. + strict: When True, unknown top-level fields are reported as warnings. + + Returns: + A tuple of (errors, warnings) where each element is a list of + human-readable strings prefixed with the filepath and location. + """ + errors: list[str] = [] + warnings: list[str] = [] + + # ── Pass 1: JSON Schema validation (best-effort) ────────────────────── + schema = _load_json_schema() + if schema is not None: + try: + import jsonschema # type: ignore[import-untyped] + + validator = jsonschema.Draft7Validator(schema) + for ve in sorted(validator.iter_errors(content), key=lambda e: list(e.absolute_path)): + # Build a human-readable location string from the JSON path + location = " -> ".join(str(p) for p in ve.absolute_path) or "" + errors.append(f"{filepath}: [{location}] {ve.message}") + except ImportError: + pass # jsonschema not installed — fall through to manual checks + + # ── Pass 2: Required field checks ──────────────────────────────────── + REQUIRED_FIELDS = ["version", "name"] + for field in REQUIRED_FIELDS: + if field not in content: + errors.append(f"{filepath}: Missing required field: '{field}'") + + # Validate version format + if "version" in content: + version = str(content["version"]) + if not re.match(r"^\d+(\.\d+)*$", version): + warnings.append( + f"{filepath}: Version '{version}' should be numeric (e.g., '1.0')" + ) + + # ── Pass 3: Rule structure checks ──────────────────────────────────── + VALID_RULE_TYPES = ["allow", "deny", "audit", "require"] + VALID_ACTIONS = ["allow", "deny", "audit", "block"] + + if "rules" in content: + rules = content["rules"] + if not isinstance(rules, list): + errors.append(f"{filepath}: 'rules' must be a list, got {type(rules).__name__}") else: - print(format_error("No policy files found to validate")) + for i, rule in enumerate(rules): + rule_ref = f"rules[{i + 1}]" + if not isinstance(rule, dict): + errors.append(f"{filepath}: {rule_ref} must be a mapping, got {type(rule).__name__}") + continue + # name is required per schema + if "name" not in rule: + errors.append(f"{filepath}: {rule_ref} missing required field 'name'") + # action must be a valid value + if "action" in rule and rule["action"] not in VALID_ACTIONS: + errors.append( + f"{filepath}: {rule_ref} invalid action '{rule['action']}' " + f"(valid: {VALID_ACTIONS})" + ) + # legacy 'type' field warning + if "type" in rule and rule["type"] not in VALID_RULE_TYPES: + warnings.append( + f"{filepath}: {rule_ref} unknown type '{rule['type']}' " + f"(valid: {VALID_RULE_TYPES})" + ) + + # ── Pass 4: Strict mode — unknown top-level fields ─────────────────── + if strict: + KNOWN_FIELDS = [ + "version", "name", "description", "rules", "defaults", + "constraints", "signals", "allowed_actions", "blocked_actions", + "a2a_conversation_policy", + ] + for field in content.keys(): + if field not in KNOWN_FIELDS: + warnings.append(f"{filepath}: Unknown top-level field '{field}'") + + return errors, warnings + + +def cmd_validate(args: argparse.Namespace) -> int: + """Validate policy YAML files against the policy schema. + + Parses each file, runs JSON Schema and structural validation, and + reports errors with field locations. Exits with a non-zero code when + any file fails validation (CI-friendly). + + Args: + args: Parsed CLI arguments. Expects ``args.files`` (list of paths) + and ``args.strict`` (bool). + + Returns: + 0 if all files are valid, 1 if any errors were found. + """ + import yaml + + print(f"\n{Colors.BOLD}🔍 Validating Policy Files{Colors.RESET}\n") + + # ── Discover files ──────────────────────────────────────────────────── + files_to_check: list[Path] = [] + if args.files: + # Support both direct file paths and glob-style patterns + for f in args.files: + p = Path(f) + if "*" in f or "?" in f: + files_to_check.extend(sorted(Path(".").glob(f))) + else: + files_to_check.append(p) + else: + # Default: validate all YAML files in .agents/ + agents_dir = Path(".agents") + if agents_dir.exists(): + files_to_check = ( + sorted(agents_dir.glob("*.yaml")) + sorted(agents_dir.glob("*.yml")) + ) + if not files_to_check: + print(f"{Colors.YELLOW}No policy files found.{Colors.RESET}") + print("Run 'agentos init' to create default policies, or specify files directly.") + return 0 + + all_errors: list[str] = [] + all_warnings: list[str] = [] + valid_count = 0 + + for filepath in files_to_check: + if not filepath.exists(): + all_errors.append(f"{filepath}: File not found") + print(f" {Colors.RED}✗{Colors.RESET} {filepath} — not found") + continue + + print(f" Checking {filepath}...", end=" ", flush=True) + + try: + # ── Step 1: Parse YAML (captures syntax errors with line numbers) + with open(filepath, encoding="utf-8") as f: + raw_text = f.read() + + try: + content = yaml.safe_load(raw_text) + except yaml.YAMLError as exc: + # yaml.YAMLError includes line/column info in its string repr + msg = f"{filepath}: YAML syntax error — {exc}" + all_errors.append(msg) + print(f"{Colors.RED}PARSE ERROR{Colors.RESET}") + continue + + if content is None: + all_errors.append(f"{filepath}: File is empty") + print(f"{Colors.RED}EMPTY{Colors.RESET}") + continue + + if not isinstance(content, dict): + all_errors.append( + f"{filepath}: Top-level value must be a mapping, got {type(content).__name__}" + ) + print(f"{Colors.RED}INVALID{Colors.RESET}") + continue + + # ── Step 2: Schema + structural validation ───────────────────── + file_errors, file_warnings = _validate_yaml_with_line_numbers( + filepath, content, strict=getattr(args, "strict", False) + ) + + if file_errors: + all_errors.extend(file_errors) + print(f"{Colors.RED}INVALID{Colors.RESET}") + elif file_warnings: + all_warnings.extend(file_warnings) + print(f"{Colors.YELLOW}OK (warnings){Colors.RESET}") + valid_count += 1 + else: + print(f"{Colors.GREEN}OK{Colors.RESET}") + valid_count += 1 + + except Exception as exc: + all_errors.append(f"{filepath}: Unexpected error — {exc}") + print(f"{Colors.RED}ERROR{Colors.RESET}") + + print() + + # ── Summary output ──────────────────────────────────────────────────── + if all_warnings: + print(f"{Colors.YELLOW}Warnings:{Colors.RESET}") + for w in all_warnings: + print(f" ⚠️ {w}") + print() + + if all_errors: + print(f"{Colors.RED}Errors:{Colors.RESET}") + for e in all_errors: + print(f" ❌ {e}") + print() + print( + f"{Colors.RED}Validation failed.{Colors.RESET} " + f"{valid_count}/{len(files_to_check)} file(s) valid." + ) return 1 results = [] @@ -1056,12 +1262,43 @@ def cmd_validate(args: argparse.Namespace) -> int: is_valid = True results.append({"file": str(f), "valid": is_valid}) - if output_format == "json": - print(json.dumps({"status": "success" if all_valid else "error", "files": results}, indent=2)) - else: - for r in results: - mark = f"{Colors.GREEN}\u2713{Colors.RESET}" if r["valid"] else f"{Colors.RED}\u2717{Colors.RESET}" - print(f"{mark} {r['file']}") +def cmd_policy(args: argparse.Namespace) -> int: + """Dispatch 'agentos policy ' to the policies CLI. + + Routes ``agentos policy validate `` and related subcommands + to :mod:`agent_os.policies.cli`, which provides full JSON-Schema + validation and Pydantic model validation in a single pass. + + Args: + args: Parsed CLI arguments. Expects ``args.policy_command`` and + any subcommand-specific attributes set by the policy subparser. + + Returns: + Exit code from the delegated command (0 = success, 1 = failure, + 2 = runtime error). + """ + from agent_os.policies import cli as policies_cli # type: ignore[import] + + sub = getattr(args, "policy_command", None) + if sub == "validate": + return policies_cli.cmd_validate(args) + if sub == "test": + return policies_cli.cmd_test(args) + if sub == "diff": + return policies_cli.cmd_diff(args) + + # No subcommand given — print help + print("Usage: agentos policy ") + print() + print(" validate Validate a policy YAML/JSON file") + print(" test Run scenario tests against a policy") + print(" diff Show differences between two policies") + return 0 + + +# ============================================================================ +# HTTP API Server (agentos serve) +# ============================================================================ return 0 if all_valid else 1 @@ -1194,9 +1431,50 @@ def main() -> int: validate_parser.add_argument("files", nargs="*", help="Files to validate") validate_parser.add_argument("--json", action="store_true", help="Output in JSON format") - # metrics - metrics_parser = subparsers.add_parser("metrics", help="Output Prometheus metrics") - metrics_parser.add_argument("--json", action="store_true", help="Output in JSON format") + + # policy command — 'agentos policy validate ' with full JSON-Schema support + policy_parser = subparsers.add_parser( + "policy", + help="Policy-as-code tools: validate, test, and diff governance policies", + ) + policy_subparsers = policy_parser.add_subparsers(dest="policy_command") + + # agentos policy validate + pol_validate = policy_subparsers.add_parser( + "validate", + help="Validate a policy YAML/JSON file against the schema", + ) + pol_validate.add_argument("path", help="Path to the policy file to validate") + + # agentos policy test + pol_test = policy_subparsers.add_parser( + "test", + help="Test a policy against a set of YAML scenarios", + ) + pol_test.add_argument("policy_path", help="Path to the policy file") + pol_test.add_argument("test_scenarios_path", help="Path to the test scenarios YAML") + + # agentos policy diff + pol_diff = policy_subparsers.add_parser( + "diff", + help="Show differences between two policy files", + ) + pol_diff.add_argument("path1", help="First policy file") + pol_diff.add_argument("path2", help="Second policy file") + + # serve command + serve_parser = subparsers.add_parser( + "serve", + help="Start the HTTP API server for Agent OS", + description="Launch an HTTP server exposing health, status, agents, and " + "execution endpoints for programmatic access to the kernel.", + ) + serve_parser.add_argument( + "--port", type=int, default=8080, help="Port to listen on (default: 8080)" + ) + serve_parser.add_argument( + "--host", default="0.0.0.0", help="Host to bind to (default: 0.0.0.0)" + ) # health health_parser = subparsers.add_parser("health", help="Check system health") @@ -1208,7 +1486,35 @@ def main() -> int: args = parser.parse_args() - if not args.command: + # Handle CI mode + if hasattr(args, 'ci') and args.ci: + Colors.disable() + + if args.version: + try: + from agent_os import __version__ + print(f"agentos {__version__}") + except Exception: + print("agentos (version unknown)") + return 0 + + commands = { + "init": cmd_init, + "secure": cmd_secure, + "audit": cmd_audit, + "status": cmd_status, + "check": cmd_check, + "review": cmd_review, + "install-hooks": cmd_install_hooks, + "validate": cmd_validate, + "policy": cmd_policy, + "serve": cmd_serve, + "metrics": cmd_metrics, + "health": cmd_health, + } + + handler = commands.get(args.command) + if handler is None: parser.print_help() return 0 From 4acbfabfb3454dd9aee69b30abcd9ff29f2199aa Mon Sep 17 00:00:00 2001 From: Kanish Tyagi Date: Thu, 2 Apr 2026 21:58:56 -0500 Subject: [PATCH 2/2] fix: resolve blocking issues from maintainer review --- packages/agent-os/src/agent_os/cli/__init__.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/packages/agent-os/src/agent_os/cli/__init__.py b/packages/agent-os/src/agent_os/cli/__init__.py index c53da66a..db99df28 100644 --- a/packages/agent-os/src/agent_os/cli/__init__.py +++ b/packages/agent-os/src/agent_os/cli/__init__.py @@ -1254,13 +1254,10 @@ def cmd_validate(args: argparse.Namespace) -> int: ) return 1 - results = [] - all_valid = True - for f in files: - # Simple simulated validation - is_valid = True - results.append({"file": str(f), "valid": is_valid}) + print(f"{Colors.GREEN}✓ All {valid_count} policy file(s) valid.{Colors.RESET}") + return 0 + def cmd_policy(args: argparse.Namespace) -> int: """Dispatch 'agentos policy ' to the policies CLI. @@ -1300,7 +1297,6 @@ def cmd_policy(args: argparse.Namespace) -> int: # HTTP API Server (agentos serve) # ============================================================================ - return 0 if all_valid else 1 def cmd_metrics(args: argparse.Namespace) -> int: @@ -1430,6 +1426,7 @@ def main() -> int: validate_parser = subparsers.add_parser("validate", help="Validate policy YAML files") validate_parser.add_argument("files", nargs="*", help="Files to validate") validate_parser.add_argument("--json", action="store_true", help="Output in JSON format") + validate_parser.add_argument("--strict", action="store_true", help="Strict mode: treat warnings as errors") # policy command — 'agentos policy validate ' with full JSON-Schema support @@ -1480,9 +1477,9 @@ def main() -> int: health_parser = subparsers.add_parser("health", help="Check system health") health_parser.add_argument("--json", action="store_true", help="Output in JSON format") - # serve (not updated for JSON as it is a server) - serve_parser = subparsers.add_parser("serve", help="Start HTTP API server") - serve_parser.add_argument("--port", type=int, default=8000, help="Port to listen on") + # metrics + metrics_parser = subparsers.add_parser("metrics", help="Output Prometheus metrics") + metrics_parser.add_argument("--json", action="store_true", help="Output in JSON format") args = parser.parse_args() @@ -1508,7 +1505,6 @@ def main() -> int: "install-hooks": cmd_install_hooks, "validate": cmd_validate, "policy": cmd_policy, - "serve": cmd_serve, "metrics": cmd_metrics, "health": cmd_health, }