diff --git a/policy_scan.py b/policy_scan.py new file mode 100644 index 000000000..faa00b29c --- /dev/null +++ b/policy_scan.py @@ -0,0 +1,526 @@ +from concurrent.futures import ThreadPoolExecutor, as_completed +import argparse +import subprocess +import sys +from pathlib import Path + + +POLICIES_ROOT = Path("policies") +INPUTS_ROOT = Path("inputs") + + +def run_command(command: list[str]) -> subprocess.CompletedProcess: + return subprocess.run( + command, + capture_output=True, + text=True, + shell=False, + ) + + +def run_existing_branch_linter() -> None: + result = run_command(["python", "scripts/linters/check_branch_name.py"]) + + if result.stdout: + print(result.stdout) + + if result.stderr: + print(result.stderr, file=sys.stderr) + + if result.returncode != 0: + print("Local scan stopped because the branch name does not follow the naming convention.") + sys.exit(result.returncode) + + +def run_existing_service_linter(provider: str, service: str) -> tuple[bool, str | None]: + if provider != "gcp": + print(f"Skipping linter: current linter only supports GCP, but provider is '{provider}'.") + return True, None + + result = subprocess.run( + ["python", "scripts/linters/linter.py", "--gcp", service], + text=True, + ) + + if result.returncode != 0: + reason = f"Service linter failed for {provider}/{service}" + print(reason) + return False, reason + + return True, None + + +def build_opa_query(provider: str, service: str, resource: str, policy: str, output_type: str) -> str: + return ( + f"data.terraform.{provider}.security." + f"{service}.{resource}.{policy}.{output_type}" + ) + + +def build_plan_path(provider: str, service: str, resource: str, policy: str) -> Path: + return INPUTS_ROOT / provider / service / resource / policy / "plan.json" + + +def build_policy_file_path(provider: str, service: str, resource: str, policy: str) -> Path: + return POLICIES_ROOT / provider / service / resource / policy / "policy.rego" + + +def get_service_dirs(provider: str, service: str | None = None) -> list[Path]: + provider_input_dir = INPUTS_ROOT / provider + + if not provider_input_dir.exists(): + print(f"Error: provider input directory not found: {provider_input_dir}", file=sys.stderr) + sys.exit(1) + + if service: + service_dir = provider_input_dir / service + + if not service_dir.exists(): + print(f"Error: service input directory not found: {service_dir}", file=sys.stderr) + sys.exit(1) + + return [service_dir] + + return sorted(path for path in provider_input_dir.iterdir() if path.is_dir()) + + +def get_resource_dirs(provider: str, service: str, resource: str | None = None) -> list[Path]: + service_input_dir = INPUTS_ROOT / provider / service + + if not service_input_dir.exists(): + print(f"Error: service input directory not found: {service_input_dir}", file=sys.stderr) + sys.exit(1) + + if resource: + resource_dir = service_input_dir / resource + + if not resource_dir.exists(): + print(f"Error: resource input directory not found: {resource_dir}", file=sys.stderr) + sys.exit(1) + + return [resource_dir] + + return sorted(path for path in service_input_dir.iterdir() if path.is_dir()) + + +def get_policy_dirs(resource_dir: Path, policy: str | None = None) -> list[Path]: + if policy: + policy_dir = resource_dir / policy + + if not policy_dir.exists(): + print(f"Error: policy input directory not found: {policy_dir}", file=sys.stderr) + sys.exit(1) + + return [policy_dir] + + return sorted(path for path in resource_dir.iterdir() if path.is_dir()) + + +def run_opa_eval_with_logs( + provider: str, + plan_path: Path, + query: str, + output_format: str, + logs: list[str], +) -> int: + policies_provider_root = POLICIES_ROOT / provider + policies_helpers_root = POLICIES_ROOT / "_helpers" + + cmd = [ + "opa", + "eval", + "--data", + str(policies_provider_root), + "--data", + str(policies_helpers_root), + "--input", + str(plan_path), + query, + "--format", + output_format, + ] + + result = run_command(cmd) + + if result.stdout: + logs.append(result.stdout) + + if result.stderr: + logs.append(result.stderr) + + return result.returncode + + +def generate_plan_json(input_dir: Path, logs: list[str]) -> tuple[bool, str | None]: + plan_file = input_dir / "plan" + plan_json_file = input_dir / "plan.json" + + logs.append(f"Generating Terraform plan in: {input_dir}") + + commands = [ + ["terraform", "init"], + ["terraform", "plan", "--out=plan"], + ] + + for command in commands: + result = subprocess.run( + command, + cwd=input_dir, + capture_output=True, + text=True, + shell=False, + ) + + if result.returncode != 0: + reason = f"Terraform command failed: {' '.join(command)}" + logs.append(reason) + + if result.stdout: + logs.append(result.stdout) + + if result.stderr: + logs.append(result.stderr) + + return False, reason + + show_result = subprocess.run( + ["terraform", "show", "-json", str(plan_file.name)], + cwd=input_dir, + capture_output=True, + shell=False, + ) + + if show_result.returncode != 0: + reason = "Terraform command failed: terraform show -json plan" + logs.append(reason) + + if show_result.stderr: + logs.append(show_result.stderr.decode(errors="replace")) + + return False, reason + + plan_json_file.write_bytes(show_result.stdout) + + logs.append(f"Generated plan.json at: {plan_json_file}") + return True, None + + +def prepare_plan_json( + provider: str, + service: str, + resource: str, + policy: str, +) -> tuple[bool, str | None, str]: + logs = [] + plan_path = build_plan_path(provider, service, resource, policy) + + generated, reason = generate_plan_json(plan_path.parent, logs) + + if not generated: + return False, reason or f"plan.json could not be generated at {plan_path}", "\n".join(logs) + + return True, None, "\n".join(logs) + + +def scan_policy( + provider: str, + service: str, + resource: str, + policy: str, + output_type: str, + output_format: str, +) -> tuple[int, bool, str | None, str]: + logs = [] + + plan_path = build_plan_path(provider, service, resource, policy) + policy_file_path = build_policy_file_path(provider, service, resource, policy) + + logs.append("=" * 90) + logs.append(f"Provider : {provider}") + logs.append(f"Service : {service}") + logs.append(f"Resource : {resource}") + logs.append(f"Policy : {policy}") + logs.append("=" * 90) + + if not policy_file_path.exists(): + reason = f"policy.rego not found at {policy_file_path}" + logs.append(f"Skipping: {reason}") + return 0, False, reason, "\n".join(logs) + + if not plan_path.exists(): + reason = f"plan.json not found at {plan_path}. Terraform generation stage did not create it." + logs.append(f"Skipping: {reason}") + return 0, False, reason, "\n".join(logs) + + query = build_opa_query( + provider=provider, + service=service, + resource=resource, + policy=policy, + output_type=output_type, + ) + + exit_code = run_opa_eval_with_logs( + provider=provider, + plan_path=plan_path, + query=query, + output_format=output_format, + logs=logs, + ) + + if exit_code != 0: + return exit_code, True, f"OPA evaluation failed for {provider}/{service}/{resource}/{policy}", "\n".join(logs) + + return exit_code, True, None, "\n".join(logs) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Run local OPA policy scans by provider, service, resource, or policy." + ) + + parser.add_argument( + "-p", + "--provider", + required=True, + help="Cloud provider to scan. Example: gcp", + ) + + parser.add_argument( + "-s", + "--service", + required=False, + help="Optional service name. Example: artifact_registry", + ) + + parser.add_argument( + "-r", + "--resource", + required=False, + help="Optional Terraform resource name. Example: google_artifact_registry_repository", + ) + + parser.add_argument( + "-po", + "--policy", + required=False, + help="Optional policy folder name. Example: approved_formats", + ) + + output_group = parser.add_mutually_exclusive_group(required=False) + + output_group.add_argument( + "-m", + "--message", + action="store_true", + help="Show policy message output. This is the default option.", + ) + + output_group.add_argument( + "-d", + "--details", + action="store_true", + help="Show policy details output.", + ) + + parser.add_argument( + "-f", + "--format", + default="pretty", + choices=["pretty", "json", "raw"], + help="OPA output format. Default: pretty", + ) + + parser.add_argument( + "--terraform-workers", + type=int, + default=2, + help="Number of concurrent Terraform plan generations to run. Default: 2", + ) + + parser.add_argument( + "--opa-workers", + type=int, + default=4, + help="Number of concurrent OPA policy scans to run. Default: 4", + ) + + args = parser.parse_args() + + if args.terraform_workers < 1: + print("Error: --terraform-workers must be at least 1.", file=sys.stderr) + sys.exit(1) + + if args.opa_workers < 1: + print("Error: --opa-workers must be at least 1.", file=sys.stderr) + sys.exit(1) + + provider = args.provider + service = args.service + + if args.resource and not service: + print( + "Error: --resource cannot be used without --service because resources are inside service folders.", + file=sys.stderr, + ) + sys.exit(1) + + if args.policy and not args.resource: + print( + "Error: --policy cannot be used without --resource because policies are inside resource folders.", + file=sys.stderr, + ) + sys.exit(1) + + if args.details: + output_type = "details" + else: + output_type = "message" + + run_existing_branch_linter() + + service_dirs = get_service_dirs( + provider=provider, + service=service, + ) + + final_exit_code = 0 + successful_policies = [] + failed_checks = [] + + scan_targets = [] + + for service_dir in service_dirs: + service_name = service_dir.name + + linter_ok, linter_problem = run_existing_service_linter(provider, service_name) + + if not linter_ok: + failed_checks.append((f"{provider}/{service_name}", linter_problem or "Service linter failed")) + final_exit_code = 1 + continue + + resource_dirs = get_resource_dirs( + provider=provider, + service=service_name, + resource=args.resource, + ) + + for resource_dir in resource_dirs: + resource_name = resource_dir.name + + policy_dirs = get_policy_dirs( + resource_dir=resource_dir, + policy=args.policy, + ) + + for policy_dir in policy_dirs: + policy_name = policy_dir.name + policy_ref = f"{provider}/{service_name}/{resource_name}/{policy_name}" + + scan_targets.append( + { + "provider": provider, + "service": service_name, + "resource": resource_name, + "policy": policy_name, + "policy_ref": policy_ref, + } + ) + + print(f"\n\nFound {len(scan_targets)} policies to scan.\n") + + targets_needing_plan = scan_targets + print( + f"Starting Terraform generation for {len(targets_needing_plan)} policies " + f"with {args.terraform_workers} workers...\n" + ) + + with ThreadPoolExecutor(max_workers=args.terraform_workers) as executor: + future_to_policy = { + executor.submit( + prepare_plan_json, + target["provider"], + target["service"], + target["resource"], + target["policy"], + ): target["policy_ref"] + for target in targets_needing_plan + } + + for future in as_completed(future_to_policy): + policy_ref = future_to_policy[future] + + try: + plan_ready, problem, log_output = future.result() + print(log_output) + except Exception as error: + failed_checks.append((policy_ref, f"Unexpected Terraform generation error: {error}")) + final_exit_code = 1 + continue + + if not plan_ready: + failed_checks.append((policy_ref, problem or "Terraform plan generation failed")) + final_exit_code = 1 + + plan_failed_refs = {check_ref for check_ref, _ in failed_checks} + scan_ready_targets = [ + target + for target in scan_targets + if target["policy_ref"] not in plan_failed_refs + ] + + print( + f"\nStarting OPA scans for {len(scan_ready_targets)} policies.\n" + ) + + with ThreadPoolExecutor(max_workers=args.opa_workers) as executor: + future_to_policy = { + executor.submit( + scan_policy, + target["provider"], + target["service"], + target["resource"], + target["policy"], + output_type, + args.format, + ): target["policy_ref"] + for target in scan_ready_targets + } + + for future in as_completed(future_to_policy): + policy_ref = future_to_policy[future] + + try: + exit_code, scanned, problem, log_output = future.result() + print(log_output) + except Exception as error: + failed_checks.append((policy_ref, f"Unexpected OPA scan error: {error}")) + final_exit_code = 1 + continue + + if scanned and exit_code == 0: + successful_policies.append(policy_ref) + else: + reason = problem or "OPA evaluation failed" + failed_checks.append((policy_ref, reason)) + final_exit_code = 1 + + print("\n" + "=" * 90) + print(f"Successful policies : {len(successful_policies)}") + print(f"Failed checks : {len(failed_checks)}") + + + if failed_checks: + print("\nFailed check details:") + for check_ref, reason in failed_checks: + print(f" - {check_ref}") + print(f" Reason: {reason}") + + print("=" * 90) + + sys.exit(final_exit_code) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/policy_scan/README.md b/scripts/policy_scan/README.md new file mode 100644 index 000000000..b9720fe6b --- /dev/null +++ b/scripts/policy_scan/README.md @@ -0,0 +1,256 @@ +# Local Policy Scan Script + +## Overview + +`policy_scan.py` is a helper script for running local Terraform plan generation and OPA/Rego policy checks before raising a pull request. + +Instead of manually running long Terraform and `opa eval` commands, students can use one script to: + +- check the branch name +- run the service linter +- generate or refresh `plan.json` +- scan policies with OPA +- show a clear success and failure summary + +The script now runs Terraform plan generation for every selected policy, even when a `plan.json` file already exists. This helps make sure the scan uses the latest Terraform configuration. + +--- + +## Basic Command Format + +```bash +python policy_scan.py --provider +``` + +Example: + +```bash +python policy_scan.py --provider gcp +``` + +This scans all services under the selected provider. + +--- + +## Example Commands + +### 1. Scan All Services for a Provider + +```bash +python policy_scan.py --provider +``` + +Example: + +```bash +python policy_scan.py --provider gcp +``` + +This scans all services, resources, and policies under the selected provider. + +--- + +### 2. Scan One Service + +```bash +python policy_scan.py --provider --service +``` + +Example: + +```bash +python policy_scan.py --provider gcp --service artifact_registry +``` + +This scans all resources and policies inside the selected service. + +--- + +### 3. Scan One Resource Only + +```bash +python policy_scan.py --provider --service --resource +``` + +Example: + +```bash +python policy_scan.py --provider gcp --service artifact_registry --resource google_artifact_registry_repository +``` + +This scans only the selected Terraform resource. + +--- + +### 4. Scan One Specific Policy + +```bash +python policy_scan.py --provider --service --resource --policy +``` + +Example: + +```bash +python policy_scan.py --provider gcp --service artifact_registry --resource google_artifact_registry_repository --policy approved_formats +``` + +This scans only one policy for the selected resource. + +--- + +### 5. Show Policy Details Output + +```bash +python policy_scan.py --provider gcp --service artifact_registry --details +``` + +This shows the policy `details` output instead of the message. + +--- + +### 6. Change OPA Output Format + +```bash +python policy_scan.py --provider gcp --service artifact_registry --format json +``` + +Supported formats are: + +```text +pretty +json +raw +``` + +The default format is: + +```text +pretty +``` + +--- + +### 7. Change Worker Counts + +The script uses concurrent workers to speed up Terraform plan generation and OPA scans. + +```bash +python policy_scan.py --provider gcp --service artifact_registry --terraform-workers 2 --opa-workers 4 +``` + +Default values: + +```text +Terraform workers : 2 +OPA workers : 4 +``` + +Use lower worker counts if your computer becomes slow or if Terraform commands fail because too many tasks are running at the same time. + +--- + +## What Happens When the Script Runs + +When the script runs, it follows these steps: + +1. Checks whether the branch name follows the project naming rule. +2. Runs the existing service linter. +3. Finds the selected provider, service, resource, and policy folders. +4. Runs Terraform commands for each selected policy folder: + + ```bash + terraform init + terraform plan --out=plan + terraform show -json plan > plan.json + ``` + +5. Regenerates `plan.json` even if it already exists. +6. Checks whether each policy has a matching `policy.rego` file. +7. Runs the OPA policy check using the generated `plan.json`. +8. Shows a final summary of successful policies and failed checks. + +--- + +## Important Notes + +`--provider` is required. + +Correct: + +```bash +python policy_scan.py --provider gcp +``` + +Incorrect: + +```bash +python policy_scan.py +``` + +`--resource` cannot be used without `--service`, because resources are inside service folders. + +Correct: + +```bash +python policy_scan.py --provider gcp --service artifact_registry --resource google_artifact_registry_repository +``` + +Incorrect: + +```bash +python policy_scan.py --provider gcp --resource google_artifact_registry_repository +``` + +`--policy` cannot be used without `--resource`, because policies are inside resource folders. + +Correct: + +```bash +python policy_scan.py --provider gcp --service artifact_registry --resource google_artifact_registry_repository --policy approved_formats +``` + +Incorrect: + +```bash +python policy_scan.py --provider gcp --service artifact_registry --policy approved_formats +``` + +At the moment, the service linter only supports GCP. If another provider is used, the script skips the service linter. + +--- + +## Available Flags + +The script supports both full flag names and short-hand flags. Students can use either format. + +| Purpose | Full flag | Short-hand flag | Example value | +|---|---|---|---| +| Select provider | `--provider` | `-p` | `gcp` | +| Select service | `--service` | `-s` | `artifact_registry` | +| Select resource | `--resource` | `-r` | `google_artifact_registry_repository` | +| Select policy | `--policy` | `-po` | `approved_formats` | +| Show message output | `--message` | `-m` | no value needed | +| Show details output | `--details` | `-d` | no value needed | +| Change OPA format | `--format` | `-f` | `pretty`, `json`, or `raw` | +| Set Terraform workers | `--terraform-workers` | no short-hand | `2` | +| Set OPA workers | `--opa-workers` | no short-hand | `4` | + +Example using full flags: + +```bash +python policy_scan.py --provider gcp --service artifact_registry --resource google_artifact_registry_repository --policy approved_formats --details --format pretty +``` + +Same command using short-hand flags: + +```bash +python policy_scan.py -p gcp -s artifact_registry -r google_artifact_registry_repository -po approved_formats -d -f pretty +``` + +--- + +## Summary + +`policy_scan.py` makes local policy testing easier and more consistent for students. + +It checks the branch name, runs the service linter, regenerates Terraform `plan.json` files, runs OPA scans, and provides a final summary.