From e9b673b46bca1793e63370f6fefea339bd49b2af Mon Sep 17 00:00:00 2001 From: tamim1517 Date: Thu, 14 May 2026 14:34:16 +1000 Subject: [PATCH 1/7] initial commit --- scripts/policy_scan/README.md | 264 ++++++++++++++++++++ scripts/policy_scan/local_scan.py | 383 ++++++++++++++++++++++++++++++ 2 files changed, 647 insertions(+) create mode 100644 scripts/policy_scan/README.md create mode 100644 scripts/policy_scan/local_scan.py diff --git a/scripts/policy_scan/README.md b/scripts/policy_scan/README.md new file mode 100644 index 000000000..0e688345d --- /dev/null +++ b/scripts/policy_scan/README.md @@ -0,0 +1,264 @@ +# local_scan.py Documentation + +## Overview + +`local_scan.py` provides a simple local policy scanning command for the Policy Deployment Engine. It allows developers to test OPA/Rego policies locally without manually writing long `opa eval` commands. + +The script takes a provider/service path such as `gcp/artifact_registry`, derives the expected Git branch name, validates the current branch, discovers matching input and policy folders, validates the Terraform `plan.json`, and runs the selected OPA output query. + +By default, the script evaluates the policy `message` output. Users can also request the `details` output when needed. + +--- + +## Purpose + +Currently, policy testing can require manually running commands such as: + +```powershell +opa eval --data .\policies\ --data .\policies\_helpers --input .\inputs\\\\\plan.json "data.terraform..security....message" --format pretty +``` + +`local_scan.py` simplifies this by allowing commands like: + +```powershell +python scripts\policy_scan\local_scan.py / +``` + +This improves local testing consistency, reduces command mistakes, and makes the local scan process closer to the project’s automated policy checking workflow. + +--- + +## Key Features + +- Scans policies locally using a short provider/service command. +- Extracts provider and service from the service path. +- Derives the expected Git branch from provider and service. +- Validates that the user is on the correct Git branch before scanning. +- Supports service-level, resource-level, and policy-level scans. +- Uses `message` as the default output type. +- Supports `--details` for detailed policy output. +- Validates that matching `policy.rego` files exist. +- Validates that matching `plan.json` files exist. +- Checks that `plan.json` contains both compliant and non-compliant resources. +- Skips incomplete or invalid policy folders safely. +- Prints a scan summary showing scanned and skipped policies. + +--- + +## Prerequisites + +- Python 3.10+ +- OPA CLI installed and available in `PATH` +- Git installed and available in `PATH` +- Existing project folder structure: + +```text +inputs/////plan.json +policies/////policy.rego +policies/_helpers/ +``` + +Example: + +```text +inputs/gcp/artifact_registry/google_artifact_registry_repository/approved_formats/plan.json +policies/gcp/artifact_registry/google_artifact_registry_repository/approved_formats/policy.rego +``` + +--- + +## Usage + +### Scan all resources and policies for a service + +```powershell +python scripts\policy_scan\local_scan.py gcp/artifact_registry +``` + +This scans every resource and policy under: + +```text +inputs/gcp/artifact_registry/ +policies/gcp/artifact_registry/ +``` + +--- + +### Scan all policies for a specific resource + +```powershell +python scripts\policy_scan\local_scan.py gcp/artifact_registry --resource google_artifact_registry_repository +``` + +This scans all policies under: + +```text +inputs/gcp/artifact_registry/google_artifact_registry_repository/ +``` + +--- + +### Scan one specific policy + +```powershell +python scripts\policy_scan\local_scan.py gcp/artifact_registry --resource google_artifact_registry_repository --policy approved_formats +``` + +This scans only: + +```text +inputs/gcp/artifact_registry/google_artifact_registry_repository/approved_formats/plan.json +``` + +against: + +```text +policies/gcp/artifact_registry/google_artifact_registry_repository/approved_formats/policy.rego +``` + +--- + +### Show details output instead of message + +```powershell +python scripts\policy_scan\local_scan.py gcp/artifact_registry --resource google_artifact_registry_repository --policy approved_formats --details +``` + +By default, the script evaluates: + +```text +message +``` + +With `--details`, it evaluates: + +```text +details +``` + +--- + +## Command Line Arguments + +| Argument | Required | Default | Description | +|---|---:|---|---| +| `service_path` | Yes | N/A | Provider and service path, for example `gcp/artifact_registry`. | +| `--resource` | No | All resources | Specific Terraform resource to scan. | +| `--policy` | No | All policies | Specific policy folder to scan. Must be used with `--resource`. | +| `--message` | No | Enabled by default | Shows policy message output. | +| `--details` | No | Off | Shows policy details output. | + +--- + +## Branch Validation + +The command uses a provider/service path: + +```text +gcp/artifact_registry +``` + +The script derives the expected Git branch name as: + +```text +gcp/service/artifact_registry +``` + +Before scanning, it checks the current Git branch using: + +```powershell +git branch --show-current +``` + +If the current branch does not match the expected branch, the script stops. + +Example error: + +```text +Error: branch mismatch. +Current branch : gcp/service/api_gateway +Expected branch: gcp/service/artifact_registry +Please switch to the correct branch before running the local policy scan. +``` + +--- + +## Output Type Logic + +If no output flag is provided, the script defaults to: + +```text +message +``` + +Example: + +```powershell +python scripts\policy_scan\local_scan.py gcp/artifact_registry +``` + +is equivalent to: + +```powershell +python scripts\policy_scan\local_scan.py gcp/artifact_registry --message +``` + +To view detailed output, use: + +```powershell +--details +``` + +`--message` and `--details` cannot be used together. + +--- + +## Plan Resource Validation + +For example, for this resource: + +```text +google_artifact_registry_repository +``` + +the script expects: + +```text +google_artifact_registry_repository.c +google_artifact_registry_repository.nc +``` + +The first resource must be the compliant example: + +```text +.c +``` + +The second resource must be the non-compliant example: + +```text +.nc +``` + +If the names are incorrect, the script skips that policy and prints a remedy. + +--- + +## Troubleshooting + +| Symptom | Likely Cause | Fix | +|---|---|---| +| `Error: branch mismatch` | Current Git branch does not match the expected service branch. | Switch to the correct branch, for example `gcp/service/artifact_registry`. | +| `service input directory not found` | The input folder for the provider/service does not exist. | Check the service path and folder structure under `inputs/`. | +| `policy input directory not found` | The selected policy folder does not exist under inputs. | Check the `--policy` name and ensure the input folder exists. | +| `policy.rego not found` | Matching policy file is missing. | Add or move `policy.rego` to the correct policy folder. | +| `plan.json not found` | Terraform plan output has not been generated. | Generate `plan.json` in the matching input folder. | +| `could not read plan.json using utf-16 encoding` | The file encoding does not match the script’s expected encoding. | Regenerate `plan.json` using the expected UTF-16 encoding or update the script encoding. | +| `compliant resource name is incorrect` | First resource in `plan.json` is not named `.c`. | Rename the compliant Terraform resource block to `c` and regenerate `plan.json`. | +| `non-compliant resource name is incorrect` | Second resource in `plan.json` is not named `.nc`. | Rename the non-compliant Terraform resource block to `nc` and regenerate `plan.json`. | + +--- + +## Summary + +`local_scan.py` helps developers test OPA policies locally with a simple and consistent command. It removes the need to manually write long OPA queries, validates the current Git branch, checks required files, verifies Terraform resource naming conventions, and runs the correct `message` or `details` policy output. diff --git a/scripts/policy_scan/local_scan.py b/scripts/policy_scan/local_scan.py new file mode 100644 index 000000000..998b0bbce --- /dev/null +++ b/scripts/policy_scan/local_scan.py @@ -0,0 +1,383 @@ +import argparse +import subprocess +import sys +from pathlib import Path +import json + + +POLICIES_ROOT = Path("policies") +INPUTS_ROOT = Path("inputs") + + +def run_command(command: list[str]) -> subprocess.CompletedProcess: + return subprocess.run( + command, + capture_output=True, + text=True, + shell=False, + ) + + +def get_current_git_branch() -> str: + result = run_command(["git", "branch", "--show-current"]) + + if result.returncode != 0: + print("Error: could not detect current git branch.", file=sys.stderr) + if result.stderr: + print(result.stderr, file=sys.stderr) + sys.exit(1) + + branch = result.stdout.strip() + + if not branch: + print("Error: current git branch is empty or detached HEAD state.", file=sys.stderr) + sys.exit(1) + + return branch + + +def validate_branch_matches(expected_branch: str) -> None: + current_branch = get_current_git_branch() + + if current_branch != expected_branch: + print("Error: branch mismatch.", file=sys.stderr) + print(f"Current branch : {current_branch}", file=sys.stderr) + print(f"Expected branch : {expected_branch}", file=sys.stderr) + print("Please switch to the correct branch or open a branch with the correct name.", file=sys.stderr) + sys.exit(1) + + print(f"Branch validation passed: {current_branch}") + + +def parse_service_path(service_path: str) -> tuple[str, str]: + parts = service_path.strip("/").split("/") + + if len(parts) != 2: + print( + "Error: service path must follow this format: /", + file=sys.stderr, + ) + print("Example: gcp/artifact_registry", file=sys.stderr) + sys.exit(1) + + provider, service = parts + + if not provider or not service: + print("Error: provider or service name is missing from service path.", file=sys.stderr) + sys.exit(1) + + return provider, service + + +def build_expected_branch_name(provider: str, service: str) -> str: + return f"{provider}/service/{service}" + + +def build_opa_query(provider: str, service: str, resource: str, policy: str, output_type: str) -> str: + return ( + f"data.terraform.{provider}.security." + f"{service}.{resource}.{policy}.{output_type}" + ) + + +def build_plan_path(provider: str, service: str, resource: str, policy: str) -> Path: + return INPUTS_ROOT / provider / service / resource / policy / "plan.json" + + +def build_policy_file_path(provider: str, service: str, resource: str, policy: str) -> Path: + return POLICIES_ROOT / provider / service / resource / policy / "policy.rego" + + +def get_resource_dirs(provider: str, service: str, resource: str | None = None) -> list[Path]: + service_input_dir = INPUTS_ROOT / provider / service + + if not service_input_dir.exists(): + print(f"Error: service input directory not found: {service_input_dir}", file=sys.stderr) + sys.exit(1) + + if resource: + resource_dir = service_input_dir / resource + + if not resource_dir.exists(): + print(f"Error: resource input directory not found: {resource_dir}", file=sys.stderr) + sys.exit(1) + + return [resource_dir] + + return sorted(path for path in service_input_dir.iterdir() if path.is_dir()) + + +def get_policy_dirs(resource_dir: Path, policy: str | None = None) -> list[Path]: + if policy: + policy_dir = resource_dir / policy + + if not policy_dir.exists(): + print(f"Error: policy input directory not found: {policy_dir}", file=sys.stderr) + sys.exit(1) + + return [policy_dir] + + return sorted(path for path in resource_dir.iterdir() if path.is_dir()) + + +def run_opa_eval( + provider: str, + plan_path: Path, + query: str, + output_format: str, +) -> int: + policies_provider_root = POLICIES_ROOT / provider + policies_helpers_root = POLICIES_ROOT / "_helpers" + + cmd = [ + "opa", + "eval", + "--data", + str(policies_provider_root), + "--data", + str(policies_helpers_root), + "--input", + str(plan_path), + query, + "--format", + output_format, + ] + + # print("\nRunning OPA command:") + # print(" ".join(cmd)) + # print() + + result = run_command(cmd) + + if result.stdout: + print(result.stdout) + + if result.stderr: + print(result.stderr, file=sys.stderr) + + return result.returncode + + +def validate_plan_resource_addresses(plan_path: Path, resource: str) -> bool: + expected_compliant_address = f"{resource}.c" + expected_non_compliant_address = f"{resource}.nc" + + try: + with plan_path.open("r", encoding="utf-16") as file: + plan_data = json.load(file) + except UnicodeDecodeError as error: + print(f"Skipping: could not read plan.json using utf-16 encoding at {plan_path}") + print(f"Encoding error: {error}") + print("Remedy: regenerate plan.json using UTF-16 encoding or update the script encoding to match the file.") + return False + except json.JSONDecodeError as error: + print(f"Skipping: invalid JSON in plan.json at {plan_path}") + print(f"JSON error: {error}") + print('Remedy: regenerate plan.json using "terraform show -json plan > plan.json" and ensure the file contains valid JSON.') + return False + + resources = ( + plan_data + .get("planned_values", {}) + .get("root_module", {}) + .get("resources", []) + ) + + if len(resources) < 2: + print("Skipping: plan.json must contain at least two resources.") + print(f"Expected first resource : {expected_compliant_address}") + print(f"Expected second resource : {expected_non_compliant_address}") + print("Remedy: add both compliant and non-compliant Terraform resources, then regenerate plan.json.") + return False + + compliant_address = resources[0].get("address") + non_compliant_address = resources[1].get("address") + + missing_or_invalid = False + + if compliant_address != expected_compliant_address: + print("Skipping: compliant resource name is incorrect.") + print(f"Expected: {expected_compliant_address}") + print(f"Found : {compliant_address}") + print(f"Remedy : rename the compliant Terraform resource block to: {resource}.c") + missing_or_invalid = True + + if non_compliant_address != expected_non_compliant_address: + print("Skipping: non-compliant resource name is incorrect.") + print(f"Expected: {expected_non_compliant_address}") + print(f"Found : {non_compliant_address}") + print(f"Remedy : rename the non-compliant Terraform resource block to: {resource}.nc") + missing_or_invalid = True + + if missing_or_invalid: + print("After fixing the Terraform resource names, regenerate plan.json and run the scan again.") + return False + + print("Resource name validation passed.") + return True + + +def scan_policy( + provider: str, + service: str, + resource: str, + policy: str, + output_type: str, + output_format: str, +) -> tuple[int, bool]: + plan_path = build_plan_path(provider, service, resource, policy) + policy_file_path = build_policy_file_path(provider, service, resource, policy) + + print("\n" + "=" * 90) + print(f"Provider : {provider}") + print(f"Service : {service}") + print(f"Resource : {resource}") + print(f"Policy : {policy}") + print("=" * 90) + + if not policy_file_path.exists(): + print(f"Skipping: policy.rego not found at {policy_file_path}") + return 0, False + + if not plan_path.exists(): + print(f"Skipping: plan.json not found at {plan_path}") + return 0, False + + if not validate_plan_resource_addresses(plan_path, resource): + return 0, False + + # print(f"\n----- {output_type.upper()} -----") + + query = build_opa_query( + provider=provider, + service=service, + resource=resource, + policy=policy, + output_type=output_type, + ) + + exit_code = run_opa_eval( + provider=provider, + plan_path=plan_path, + query=query, + output_format=output_format, + ) + + + return exit_code, True + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Run local OPA policy scans from a provider/service name." + ) + + parser.add_argument( + "service_path", + help="Service path in format: /. Example: gcp/artifact_registry", + ) + + parser.add_argument( + "--resource", + "--resourse", + dest="resource", + required=False, + help="Optional Terraform resource name. Example: google_artifact_registry_repository", + ) + + parser.add_argument( + "--policy", + required=False, + help="Optional policy folder name. Example: approved_formats", + ) + + output_group = parser.add_mutually_exclusive_group(required=False) + + output_group.add_argument( + "--message", + action="store_true", + help="Show policy message output", + ) + + output_group.add_argument( + "--details", + action="store_true", + help="Show policy details output", + ) + + parser.add_argument( + "--format", + default="pretty", + choices=["pretty", "json", "raw"], + help="OPA output format. Default: pretty", + ) + + args = parser.parse_args() + + provider, service = parse_service_path(args.service_path) + + expected_branch = build_expected_branch_name(provider, service) + + validate_branch_matches(expected_branch) + + if args.policy and not args.resource: + print( + "Error: --policy cannot be used without --resource because policies are inside resource folders.", + file=sys.stderr, + ) + sys.exit(1) + + if args.details: + output_type = "details" + else: + output_type = "message" + + resource_dirs = get_resource_dirs( + provider=provider, + service=service, + resource=args.resource, + ) + + final_exit_code = 0 + scanned_count = 0 + skipped_count = 0 + + for resource_dir in resource_dirs: + resource_name = resource_dir.name + + policy_dirs = get_policy_dirs( + resource_dir=resource_dir, + policy=args.policy, + ) + + for policy_dir in policy_dirs: + policy_name = policy_dir.name + + exit_code, scanned = scan_policy( + provider=provider, + service=service, + resource=resource_name, + policy=policy_name, + output_type=output_type, + output_format=args.format, + ) + + if scanned: + scanned_count += 1 + else: + skipped_count += 1 + + if exit_code != 0: + final_exit_code = exit_code + + print("\n" + "=" * 90) + print("Local policy scan completed") + print(f"Scanned policies : {scanned_count}") + print(f"Skipped policies : {skipped_count}") + print("=" * 90) + + sys.exit(final_exit_code) + + +if __name__ == "__main__": + main() \ No newline at end of file From 5f348f4a475d6c0228c28cc71c181701f32df843 Mon Sep 17 00:00:00 2001 From: tamim1517 Date: Mon, 18 May 2026 15:35:23 +1000 Subject: [PATCH 2/7] added support for existing linters --- scripts/policy_scan/local_scan.py | 121 ++++++------------------------ 1 file changed, 25 insertions(+), 96 deletions(-) diff --git a/scripts/policy_scan/local_scan.py b/scripts/policy_scan/local_scan.py index 998b0bbce..0caf428a2 100644 --- a/scripts/policy_scan/local_scan.py +++ b/scripts/policy_scan/local_scan.py @@ -2,7 +2,6 @@ import subprocess import sys from pathlib import Path -import json POLICIES_ROOT = Path("policies") @@ -18,35 +17,34 @@ def run_command(command: list[str]) -> subprocess.CompletedProcess: ) -def get_current_git_branch() -> str: - result = run_command(["git", "branch", "--show-current"]) +def run_existing_branch_linter() -> None: + result = run_command(["python", "scripts/linters/check_branch_name.py"]) - if result.returncode != 0: - print("Error: could not detect current git branch.", file=sys.stderr) - if result.stderr: - print(result.stderr, file=sys.stderr) - sys.exit(1) - - branch = result.stdout.strip() + if result.stdout: + print(result.stdout) - if not branch: - print("Error: current git branch is empty or detached HEAD state.", file=sys.stderr) - sys.exit(1) + if result.stderr: + print(result.stderr, file=sys.stderr) - return branch + if result.returncode != 0: + print("Local scan stopped because the branch name does not follow the naming convention.") + sys.exit(result.returncode) -def validate_branch_matches(expected_branch: str) -> None: - current_branch = get_current_git_branch() +def run_existing_service_linter(provider: str, service: str) -> None: + if provider != "gcp": + print(f"Skipping linter: current linter only supports GCP, but provider is '{provider}'.") + return - if current_branch != expected_branch: - print("Error: branch mismatch.", file=sys.stderr) - print(f"Current branch : {current_branch}", file=sys.stderr) - print(f"Expected branch : {expected_branch}", file=sys.stderr) - print("Please switch to the correct branch or open a branch with the correct name.", file=sys.stderr) - sys.exit(1) + result = subprocess.run( + ["python", "scripts/linters/linter.py", "--gcp", service], + text=True, + ) - print(f"Branch validation passed: {current_branch}") + if result.returncode != 0: + print("Local scan stopped because the linter found issues.") + print("Please fix the linter errors before running the policy scan.") + sys.exit(result.returncode) def parse_service_path(service_path: str) -> tuple[str, str]: @@ -69,10 +67,6 @@ def parse_service_path(service_path: str) -> tuple[str, str]: return provider, service -def build_expected_branch_name(provider: str, service: str) -> str: - return f"{provider}/service/{service}" - - def build_opa_query(provider: str, service: str, resource: str, policy: str, output_type: str) -> str: return ( f"data.terraform.{provider}.security." @@ -158,65 +152,6 @@ def run_opa_eval( return result.returncode -def validate_plan_resource_addresses(plan_path: Path, resource: str) -> bool: - expected_compliant_address = f"{resource}.c" - expected_non_compliant_address = f"{resource}.nc" - - try: - with plan_path.open("r", encoding="utf-16") as file: - plan_data = json.load(file) - except UnicodeDecodeError as error: - print(f"Skipping: could not read plan.json using utf-16 encoding at {plan_path}") - print(f"Encoding error: {error}") - print("Remedy: regenerate plan.json using UTF-16 encoding or update the script encoding to match the file.") - return False - except json.JSONDecodeError as error: - print(f"Skipping: invalid JSON in plan.json at {plan_path}") - print(f"JSON error: {error}") - print('Remedy: regenerate plan.json using "terraform show -json plan > plan.json" and ensure the file contains valid JSON.') - return False - - resources = ( - plan_data - .get("planned_values", {}) - .get("root_module", {}) - .get("resources", []) - ) - - if len(resources) < 2: - print("Skipping: plan.json must contain at least two resources.") - print(f"Expected first resource : {expected_compliant_address}") - print(f"Expected second resource : {expected_non_compliant_address}") - print("Remedy: add both compliant and non-compliant Terraform resources, then regenerate plan.json.") - return False - - compliant_address = resources[0].get("address") - non_compliant_address = resources[1].get("address") - - missing_or_invalid = False - - if compliant_address != expected_compliant_address: - print("Skipping: compliant resource name is incorrect.") - print(f"Expected: {expected_compliant_address}") - print(f"Found : {compliant_address}") - print(f"Remedy : rename the compliant Terraform resource block to: {resource}.c") - missing_or_invalid = True - - if non_compliant_address != expected_non_compliant_address: - print("Skipping: non-compliant resource name is incorrect.") - print(f"Expected: {expected_non_compliant_address}") - print(f"Found : {non_compliant_address}") - print(f"Remedy : rename the non-compliant Terraform resource block to: {resource}.nc") - missing_or_invalid = True - - if missing_or_invalid: - print("After fixing the Terraform resource names, regenerate plan.json and run the scan again.") - return False - - print("Resource name validation passed.") - return True - - def scan_policy( provider: str, service: str, @@ -241,12 +176,7 @@ def scan_policy( if not plan_path.exists(): print(f"Skipping: plan.json not found at {plan_path}") - return 0, False - - if not validate_plan_resource_addresses(plan_path, resource): - return 0, False - - # print(f"\n----- {output_type.upper()} -----") + return 0, False query = build_opa_query( provider=provider, @@ -316,10 +246,6 @@ def main() -> None: provider, service = parse_service_path(args.service_path) - expected_branch = build_expected_branch_name(provider, service) - - validate_branch_matches(expected_branch) - if args.policy and not args.resource: print( "Error: --policy cannot be used without --resource because policies are inside resource folders.", @@ -332,6 +258,9 @@ def main() -> None: else: output_type = "message" + run_existing_branch_linter() + run_existing_service_linter(provider, service) + resource_dirs = get_resource_dirs( provider=provider, service=service, From cb256d6daa67b0920e4c76025195f6d2fc9965d3 Mon Sep 17 00:00:00 2001 From: tamim1517 Date: Mon, 18 May 2026 16:15:31 +1000 Subject: [PATCH 3/7] updated documentation --- scripts/policy_scan/README.md | 249 +++++++++++----------------------- 1 file changed, 80 insertions(+), 169 deletions(-) diff --git a/scripts/policy_scan/README.md b/scripts/policy_scan/README.md index 0e688345d..2be7aea6b 100644 --- a/scripts/policy_scan/README.md +++ b/scripts/policy_scan/README.md @@ -1,264 +1,175 @@ -# local_scan.py Documentation +# Local Policy Scan Script ## Overview -`local_scan.py` provides a simple local policy scanning command for the Policy Deployment Engine. It allows developers to test OPA/Rego policies locally without manually writing long `opa eval` commands. +`local_scan.py` is a helper script for running OPA policy checks locally before raising a pull request. -The script takes a provider/service path such as `gcp/artifact_registry`, derives the expected Git branch name, validates the current branch, discovers matching input and policy folders, validates the Terraform `plan.json`, and runs the selected OPA output query. +Instead of manually writing long `opa eval` commands, students can use one simple command format to scan policies for a selected cloud provider and service. -By default, the script evaluates the policy `message` output. Users can also request the `details` output when needed. +The script also runs the existing branch name checker and service linter before scanning policies. This helps reduce common mistakes such as wrong branch naming, incorrect resource naming, missing files, or policy structure issues. --- -## Purpose +## Basic Command Format -Currently, policy testing can require manually running commands such as: - -```powershell -opa eval --data .\policies\ --data .\policies\_helpers --input .\inputs\\\\\plan.json "data.terraform..security....message" --format pretty -``` - -`local_scan.py` simplifies this by allowing commands like: - -```powershell +```bash python scripts\policy_scan\local_scan.py / ``` -This improves local testing consistency, reduces command mistakes, and makes the local scan process closer to the project’s automated policy checking workflow. - ---- - -## Key Features - -- Scans policies locally using a short provider/service command. -- Extracts provider and service from the service path. -- Derives the expected Git branch from provider and service. -- Validates that the user is on the correct Git branch before scanning. -- Supports service-level, resource-level, and policy-level scans. -- Uses `message` as the default output type. -- Supports `--details` for detailed policy output. -- Validates that matching `policy.rego` files exist. -- Validates that matching `plan.json` files exist. -- Checks that `plan.json` contains both compliant and non-compliant resources. -- Skips incomplete or invalid policy folders safely. -- Prints a scan summary showing scanned and skipped policies. - ---- - -## Prerequisites - -- Python 3.10+ -- OPA CLI installed and available in `PATH` -- Git installed and available in `PATH` -- Existing project folder structure: - -```text -inputs/////plan.json -policies/////policy.rego -policies/_helpers/ -``` - Example: -```text -inputs/gcp/artifact_registry/google_artifact_registry_repository/approved_formats/plan.json -policies/gcp/artifact_registry/google_artifact_registry_repository/approved_formats/policy.rego +```bash +python scripts\policy_scan\local_scan.py gcp/artifact_registry ``` --- -## Usage +## Example Commands -### Scan all resources and policies for a service +### 1. Scan a Full Service -```powershell -python scripts\policy_scan\local_scan.py gcp/artifact_registry +```bash +python scripts\policy_scan\local_scan.py / ``` -This scans every resource and policy under: - -```text -inputs/gcp/artifact_registry/ -policies/gcp/artifact_registry/ -``` +This scans all resources and policies inside the service. --- -### Scan all policies for a specific resource +### 2. Scan One Resource Only -```powershell -python scripts\policy_scan\local_scan.py gcp/artifact_registry --resource google_artifact_registry_repository +```bash +python scripts\policy_scan\local_scan.py / --resource ``` -This scans all policies under: - -```text -inputs/gcp/artifact_registry/google_artifact_registry_repository/ -``` +This scans only the selected Terraform resource. --- -### Scan one specific policy +### 3. Scan One Specific Policy -```powershell -python scripts\policy_scan\local_scan.py gcp/artifact_registry --resource google_artifact_registry_repository --policy approved_formats +```bash +python scripts\policy_scan\local_scan.py / --resource --policy ``` -This scans only: +This scans only the policy for the selected resource. -```text -inputs/gcp/artifact_registry/google_artifact_registry_repository/approved_formats/plan.json -``` +--- -against: +### 4. Show Policy Message Output (Optional) -```text -policies/gcp/artifact_registry/google_artifact_registry_repository/approved_formats/policy.rego +```bash +python scripts\policy_scan\local_scan.py / --message ``` ---- +This shows the policy `message` output. -### Show details output instead of message +By default, the script shows `message`, so this flag is optional. -```powershell -python scripts\policy_scan\local_scan.py gcp/artifact_registry --resource google_artifact_registry_repository --policy approved_formats --details -``` +--- -By default, the script evaluates: +### 5. Show Policy Details Output -```text -message +```bash +python scripts\policy_scan\local_scan.py / --details ``` -With `--details`, it evaluates: - -```text -details -``` +This shows the policy `details` output instead of the message. --- -## Command Line Arguments - -| Argument | Required | Default | Description | -|---|---:|---|---| -| `service_path` | Yes | N/A | Provider and service path, for example `gcp/artifact_registry`. | -| `--resource` | No | All resources | Specific Terraform resource to scan. | -| `--policy` | No | All policies | Specific policy folder to scan. Must be used with `--resource`. | -| `--message` | No | Enabled by default | Shows policy message output. | -| `--details` | No | Off | Shows policy details output. | - ---- +### 6. Change Output Format -## Branch Validation +```bash +python scripts\policy_scan\local_scan.py / --format json +``` -The command uses a provider/service path: +Supported formats are: ```text -gcp/artifact_registry +pretty +json +raw ``` -The script derives the expected Git branch name as: +The default format is: ```text -gcp/service/artifact_registry +pretty ``` -Before scanning, it checks the current Git branch using: +--- -```powershell -git branch --show-current -``` +## What Happens When the script is run -If the current branch does not match the expected branch, the script stops. +When the script runs, it follows these steps: -Example error: - -```text -Error: branch mismatch. -Current branch : gcp/service/api_gateway -Expected branch: gcp/service/artifact_registry -Please switch to the correct branch before running the local policy scan. -``` +1. Checks whether the branch name follows the project naming rule. +2. Runs the existing service linter. +3. Finds the selected service folder inside the `inputs` directory. +4. Looks for resource folders and policy folders. +5. Checks whether each policy has: + - a `policy.rego` file + - a matching `plan.json` file +6. Runs the OPA policy check. +7. Shows a final summary of scanned and skipped policies. --- -## Output Type Logic +## Important Notes -If no output flag is provided, the script defaults to: +The service path must follow this format: ```text -message +/ ``` -Example: +Correct example: -```powershell +```bash python scripts\policy_scan\local_scan.py gcp/artifact_registry ``` -is equivalent to: +Incorrect example: -```powershell -python scripts\policy_scan\local_scan.py gcp/artifact_registry --message +```bash +python scripts\policy_scan\local_scan.py artifact_registry ``` -To view detailed output, use: +At the moment, the service linter only supports GCP. If another provider is used, the script will skip the service linter. -```powershell ---details -``` - -`--message` and `--details` cannot be used together. - ---- +Also, `--policy` cannot be used alone. A policy belongs inside a resource folder, so `--resource` must be provided first. -## Plan Resource Validation +Correct: -For example, for this resource: - -```text -google_artifact_registry_repository +```bash +python scripts\policy_scan\local_scan.py / --resource --policy ``` -the script expects: +Incorrect: -```text -google_artifact_registry_repository.c -google_artifact_registry_repository.nc +```bash +python scripts\policy_scan\local_scan.py / --policy ``` -The first resource must be the compliant example: +--- -```text -.c -``` +## Final Output -The second resource must be the non-compliant example: +At the end, the script shows a summary like this: ```text -.nc +Local policy scan completed +Scanned policies : 3 +Skipped policies : 1 ``` -If the names are incorrect, the script skips that policy and prints a remedy. - ---- - -## Troubleshooting - -| Symptom | Likely Cause | Fix | -|---|---|---| -| `Error: branch mismatch` | Current Git branch does not match the expected service branch. | Switch to the correct branch, for example `gcp/service/artifact_registry`. | -| `service input directory not found` | The input folder for the provider/service does not exist. | Check the service path and folder structure under `inputs/`. | -| `policy input directory not found` | The selected policy folder does not exist under inputs. | Check the `--policy` name and ensure the input folder exists. | -| `policy.rego not found` | Matching policy file is missing. | Add or move `policy.rego` to the correct policy folder. | -| `plan.json not found` | Terraform plan output has not been generated. | Generate `plan.json` in the matching input folder. | -| `could not read plan.json using utf-16 encoding` | The file encoding does not match the script’s expected encoding. | Regenerate `plan.json` using the expected UTF-16 encoding or update the script encoding. | -| `compliant resource name is incorrect` | First resource in `plan.json` is not named `.c`. | Rename the compliant Terraform resource block to `c` and regenerate `plan.json`. | -| `non-compliant resource name is incorrect` | Second resource in `plan.json` is not named `.nc`. | Rename the non-compliant Terraform resource block to `nc` and regenerate `plan.json`. | +A policy may be skipped if the script cannot find the required `policy.rego` or `plan.json` file. --- ## Summary -`local_scan.py` helps developers test OPA policies locally with a simple and consistent command. It removes the need to manually write long OPA queries, validates the current Git branch, checks required files, verifies Terraform resource naming conventions, and runs the correct `message` or `details` policy output. +`local_scan.py` makes local policy testing easier and more consistent for students. + +It allows students to scan policies, check branch naming, run the service linter, and prepare their work before raising a pull request. From 1a92a5f93d2b84a495251fecd0ac9e24544343b0 Mon Sep 17 00:00:00 2001 From: tamim1517 Date: Mon, 25 May 2026 14:08:58 +1000 Subject: [PATCH 4/7] added shorthand, all service support --- .../local_scan.py => policy_scan.py | 185 +++++++++++------- 1 file changed, 118 insertions(+), 67 deletions(-) rename scripts/policy_scan/local_scan.py => policy_scan.py (59%) diff --git a/scripts/policy_scan/local_scan.py b/policy_scan.py similarity index 59% rename from scripts/policy_scan/local_scan.py rename to policy_scan.py index 0caf428a2..e9fe7b4c2 100644 --- a/scripts/policy_scan/local_scan.py +++ b/policy_scan.py @@ -31,10 +31,10 @@ def run_existing_branch_linter() -> None: sys.exit(result.returncode) -def run_existing_service_linter(provider: str, service: str) -> None: +def run_existing_service_linter(provider: str, service: str) -> tuple[bool, str | None]: if provider != "gcp": print(f"Skipping linter: current linter only supports GCP, but provider is '{provider}'.") - return + return True, None result = subprocess.run( ["python", "scripts/linters/linter.py", "--gcp", service], @@ -42,29 +42,11 @@ def run_existing_service_linter(provider: str, service: str) -> None: ) if result.returncode != 0: - print("Local scan stopped because the linter found issues.") - print("Please fix the linter errors before running the policy scan.") - sys.exit(result.returncode) - - -def parse_service_path(service_path: str) -> tuple[str, str]: - parts = service_path.strip("/").split("/") - - if len(parts) != 2: - print( - "Error: service path must follow this format: /", - file=sys.stderr, - ) - print("Example: gcp/artifact_registry", file=sys.stderr) - sys.exit(1) + reason = f"Service linter failed for {provider}/{service}" + print(reason) + return False, reason - provider, service = parts - - if not provider or not service: - print("Error: provider or service name is missing from service path.", file=sys.stderr) - sys.exit(1) - - return provider, service + return True, None def build_opa_query(provider: str, service: str, resource: str, policy: str, output_type: str) -> str: @@ -82,6 +64,25 @@ def build_policy_file_path(provider: str, service: str, resource: str, policy: s return POLICIES_ROOT / provider / service / resource / policy / "policy.rego" +def get_service_dirs(provider: str, service: str | None = None) -> list[Path]: + provider_input_dir = INPUTS_ROOT / provider + + if not provider_input_dir.exists(): + print(f"Error: provider input directory not found: {provider_input_dir}", file=sys.stderr) + sys.exit(1) + + if service: + service_dir = provider_input_dir / service + + if not service_dir.exists(): + print(f"Error: service input directory not found: {service_dir}", file=sys.stderr) + sys.exit(1) + + return [service_dir] + + return sorted(path for path in provider_input_dir.iterdir() if path.is_dir()) + + def get_resource_dirs(provider: str, service: str, resource: str | None = None) -> list[Path]: service_input_dir = INPUTS_ROOT / provider / service @@ -159,7 +160,7 @@ def scan_policy( policy: str, output_type: str, output_format: str, -) -> tuple[int, bool]: +) -> tuple[int, bool, str | None]: plan_path = build_plan_path(provider, service, resource, policy) policy_file_path = build_policy_file_path(provider, service, resource, policy) @@ -171,12 +172,14 @@ def scan_policy( print("=" * 90) if not policy_file_path.exists(): - print(f"Skipping: policy.rego not found at {policy_file_path}") - return 0, False + reason = f"policy.rego not found at {policy_file_path}" + print(f"Skipping: {reason}") + return 0, False, reason if not plan_path.exists(): - print(f"Skipping: plan.json not found at {plan_path}") - return 0, False + reason = f"plan.json not found at {plan_path}" + print(f"Skipping: {reason}") + return 0, False, reason query = build_opa_query( provider=provider, @@ -193,29 +196,40 @@ def scan_policy( output_format=output_format, ) + if exit_code != 0: + return exit_code, True, f"OPA evaluation failed for {provider}/{service}/{resource}/{policy}" - return exit_code, True + return exit_code, True, None def main() -> None: parser = argparse.ArgumentParser( - description="Run local OPA policy scans from a provider/service name." + description="Run local OPA policy scans by provider, service, resource, or policy." ) parser.add_argument( - "service_path", - help="Service path in format: /. Example: gcp/artifact_registry", + "-p", + "--provider", + required=True, + help="Cloud provider to scan. Example: gcp", + ) + + parser.add_argument( + "-s", + "--service", + required=False, + help="Optional service name. Example: artifact_registry", ) parser.add_argument( + "-r", "--resource", - "--resourse", - dest="resource", required=False, help="Optional Terraform resource name. Example: google_artifact_registry_repository", ) parser.add_argument( + "-po", "--policy", required=False, help="Optional policy folder name. Example: approved_formats", @@ -224,18 +238,21 @@ def main() -> None: output_group = parser.add_mutually_exclusive_group(required=False) output_group.add_argument( + "-m", "--message", action="store_true", - help="Show policy message output", + help="Show policy message output. This is the default option.", ) output_group.add_argument( + "-d", "--details", action="store_true", - help="Show policy details output", + help="Show policy details output.", ) parser.add_argument( + "-f", "--format", default="pretty", choices=["pretty", "json", "raw"], @@ -244,7 +261,15 @@ def main() -> None: args = parser.parse_args() - provider, service = parse_service_path(args.service_path) + provider = args.provider + service = args.service + + if args.resource and not service: + print( + "Error: --resource cannot be used without --service because resources are inside service folders.", + file=sys.stderr, + ) + sys.exit(1) if args.policy and not args.resource: print( @@ -259,50 +284,76 @@ def main() -> None: output_type = "message" run_existing_branch_linter() - run_existing_service_linter(provider, service) - - resource_dirs = get_resource_dirs( + + service_dirs = get_service_dirs( provider=provider, service=service, - resource=args.resource, ) final_exit_code = 0 - scanned_count = 0 - skipped_count = 0 + successful_policies = [] + failed_checks = [] + + for service_dir in service_dirs: + service_name = service_dir.name - for resource_dir in resource_dirs: - resource_name = resource_dir.name + linter_ok, linter_problem = run_existing_service_linter(provider, service_name) - policy_dirs = get_policy_dirs( - resource_dir=resource_dir, - policy=args.policy, + if not linter_ok: + failed_checks.append((f"{provider}/{service_name}", linter_problem or "Service linter failed")) + final_exit_code = 1 + continue + + resource_dirs = get_resource_dirs( + provider=provider, + service=service_name, + resource=args.resource, ) - for policy_dir in policy_dirs: - policy_name = policy_dir.name + for resource_dir in resource_dirs: + resource_name = resource_dir.name - exit_code, scanned = scan_policy( - provider=provider, - service=service, - resource=resource_name, - policy=policy_name, - output_type=output_type, - output_format=args.format, + policy_dirs = get_policy_dirs( + resource_dir=resource_dir, + policy=args.policy, ) - if scanned: - scanned_count += 1 - else: - skipped_count += 1 + for policy_dir in policy_dirs: + policy_name = policy_dir.name + + policy_ref = f"{provider}/{service_name}/{resource_name}/{policy_name}" - if exit_code != 0: - final_exit_code = exit_code + exit_code, scanned, problem = scan_policy( + provider=provider, + service=service_name, + resource=resource_name, + policy=policy_name, + output_type=output_type, + output_format=args.format, + ) + + if scanned and exit_code == 0: + successful_policies.append(policy_ref) + else: + reason = problem or "OPA evaluation failed" + failed_checks.append((policy_ref, reason)) + final_exit_code = 1 print("\n" + "=" * 90) - print("Local policy scan completed") - print(f"Scanned policies : {scanned_count}") - print(f"Skipped policies : {skipped_count}") + print(f"Successful policies : {len(successful_policies)}") + print(f"Failed checks : {len(failed_checks)}") + + # if successful_policies: + # print("\nSuccessful policies:") + # for policy_ref in successful_policies: + # print(f" - {policy_ref}") + + if failed_checks: + print("\nFailed check details:") + for check_ref, reason in failed_checks: + print(f" - {check_ref}") + print(f" Reason: {reason}") + print("=" * 90) sys.exit(final_exit_code) From 1a799c2d64d9e44e0d069ac285243605ea2d8ec9 Mon Sep 17 00:00:00 2001 From: tamim1517 Date: Mon, 25 May 2026 15:51:50 +1000 Subject: [PATCH 5/7] Added support for concurrent processing and terraform command automation --- policy_scan.py | 175 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 135 insertions(+), 40 deletions(-) diff --git a/policy_scan.py b/policy_scan.py index e9fe7b4c2..58dbd4a33 100644 --- a/policy_scan.py +++ b/policy_scan.py @@ -1,3 +1,4 @@ +from concurrent.futures import ThreadPoolExecutor, as_completed import argparse import subprocess import sys @@ -115,11 +116,12 @@ def get_policy_dirs(resource_dir: Path, policy: str | None = None) -> list[Path] return sorted(path for path in resource_dir.iterdir() if path.is_dir()) -def run_opa_eval( +def run_opa_eval_with_logs( provider: str, plan_path: Path, query: str, output_format: str, + logs: list[str], ) -> int: policies_provider_root = POLICIES_ROOT / provider policies_helpers_root = POLICIES_ROOT / "_helpers" @@ -138,21 +140,71 @@ def run_opa_eval( output_format, ] - # print("\nRunning OPA command:") - # print(" ".join(cmd)) - # print() - result = run_command(cmd) if result.stdout: - print(result.stdout) + logs.append(result.stdout) if result.stderr: - print(result.stderr, file=sys.stderr) + logs.append(result.stderr) return result.returncode +def generate_plan_json(input_dir: Path, logs: list[str]) -> tuple[bool, str | None]: + plan_file = input_dir / "plan" + plan_json_file = input_dir / "plan.json" + + logs.append(f"plan.json not found. Generating Terraform plan in: {input_dir}") + + commands = [ + ["terraform", "init"], + ["terraform", "plan", "--out=plan"], + ] + + for command in commands: + result = subprocess.run( + command, + cwd=input_dir, + capture_output=True, + text=True, + shell=False, + ) + + if result.returncode != 0: + reason = f"Terraform command failed: {' '.join(command)}" + logs.append(reason) + + if result.stdout: + logs.append(result.stdout) + + if result.stderr: + logs.append(result.stderr) + + return False, reason + + show_result = subprocess.run( + ["terraform", "show", "-json", str(plan_file.name)], + cwd=input_dir, + capture_output=True, + shell=False, + ) + + if show_result.returncode != 0: + reason = "Terraform command failed: terraform show -json plan" + logs.append(reason) + + if show_result.stderr: + logs.append(show_result.stderr.decode(errors="replace")) + + return False, reason + + plan_json_file.write_bytes(show_result.stdout) + + logs.append(f"Generated plan.json at: {plan_json_file}") + return True, None + + def scan_policy( provider: str, service: str, @@ -160,26 +212,31 @@ def scan_policy( policy: str, output_type: str, output_format: str, -) -> tuple[int, bool, str | None]: +) -> tuple[int, bool, str | None, str]: + logs = [] + plan_path = build_plan_path(provider, service, resource, policy) policy_file_path = build_policy_file_path(provider, service, resource, policy) - print("\n" + "=" * 90) - print(f"Provider : {provider}") - print(f"Service : {service}") - print(f"Resource : {resource}") - print(f"Policy : {policy}") - print("=" * 90) + logs.append("=" * 90) + logs.append(f"Provider : {provider}") + logs.append(f"Service : {service}") + logs.append(f"Resource : {resource}") + logs.append(f"Policy : {policy}") + logs.append("=" * 90) if not policy_file_path.exists(): reason = f"policy.rego not found at {policy_file_path}" - print(f"Skipping: {reason}") - return 0, False, reason + logs.append(f"Skipping: {reason}") + return 0, False, reason, "\n".join(logs) if not plan_path.exists(): - reason = f"plan.json not found at {plan_path}" - print(f"Skipping: {reason}") - return 0, False, reason + input_dir = plan_path.parent + + generated, reason = generate_plan_json(input_dir, logs) + + if not generated: + return 0, False, reason or f"plan.json could not be generated at {plan_path}", "\n".join(logs) query = build_opa_query( provider=provider, @@ -189,17 +246,18 @@ def scan_policy( output_type=output_type, ) - exit_code = run_opa_eval( + exit_code = run_opa_eval_with_logs( provider=provider, plan_path=plan_path, query=query, output_format=output_format, + logs=logs, ) if exit_code != 0: - return exit_code, True, f"OPA evaluation failed for {provider}/{service}/{resource}/{policy}" + return exit_code, True, f"OPA evaluation failed for {provider}/{service}/{resource}/{policy}", "\n".join(logs) - return exit_code, True, None + return exit_code, True, None, "\n".join(logs) def main() -> None: @@ -259,8 +317,20 @@ def main() -> None: help="OPA output format. Default: pretty", ) + parser.add_argument( + "-w", + "--workers", + type=int, + default=4, + help="Number of concurrent policy scans to run. Default: 4", + ) + args = parser.parse_args() + if args.workers < 1: + print("Error: --workers must be at least 1.", file=sys.stderr) + sys.exit(1) + provider = args.provider service = args.service @@ -294,6 +364,8 @@ def main() -> None: successful_policies = [] failed_checks = [] + scan_targets = [] + for service_dir in service_dirs: service_name = service_dir.name @@ -320,33 +392,56 @@ def main() -> None: for policy_dir in policy_dirs: policy_name = policy_dir.name - policy_ref = f"{provider}/{service_name}/{resource_name}/{policy_name}" - exit_code, scanned, problem = scan_policy( - provider=provider, - service=service_name, - resource=resource_name, - policy=policy_name, - output_type=output_type, - output_format=args.format, + scan_targets.append( + { + "provider": provider, + "service": service_name, + "resource": resource_name, + "policy": policy_name, + "policy_ref": policy_ref, + } ) - if scanned and exit_code == 0: - successful_policies.append(policy_ref) - else: - reason = problem or "OPA evaluation failed" - failed_checks.append((policy_ref, reason)) - final_exit_code = 1 + print(f"\nFound {len(scan_targets)} policies to scan. Starting scans with {args.workers} workers...\n") + + with ThreadPoolExecutor(max_workers=args.workers) as executor: + future_to_policy = { + executor.submit( + scan_policy, + target["provider"], + target["service"], + target["resource"], + target["policy"], + output_type, + args.format, + ): target["policy_ref"] + for target in scan_targets + } + + for future in as_completed(future_to_policy): + policy_ref = future_to_policy[future] + + try: + exit_code, scanned, problem, log_output = future.result() + print(log_output) + except Exception as error: + failed_checks.append((policy_ref, f"Unexpected error: {error}")) + final_exit_code = 1 + continue + + if scanned and exit_code == 0: + successful_policies.append(policy_ref) + else: + reason = problem or "OPA evaluation failed" + failed_checks.append((policy_ref, reason)) + final_exit_code = 1 print("\n" + "=" * 90) print(f"Successful policies : {len(successful_policies)}") print(f"Failed checks : {len(failed_checks)}") - # if successful_policies: - # print("\nSuccessful policies:") - # for policy_ref in successful_policies: - # print(f" - {policy_ref}") if failed_checks: print("\nFailed check details:") From 977d70eaf20a7a93dd10edce48bc9d1a7cb1ec0b Mon Sep 17 00:00:00 2001 From: tamim1517 Date: Tue, 26 May 2026 12:05:06 +1000 Subject: [PATCH 6/7] separated terraform and opa workers --- policy_scan.py | 117 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 101 insertions(+), 16 deletions(-) diff --git a/policy_scan.py b/policy_scan.py index 58dbd4a33..16e443852 100644 --- a/policy_scan.py +++ b/policy_scan.py @@ -205,6 +205,27 @@ def generate_plan_json(input_dir: Path, logs: list[str]) -> tuple[bool, str | No return True, None +def prepare_plan_json( + provider: str, + service: str, + resource: str, + policy: str, +) -> tuple[bool, str | None, str]: + logs = [] + plan_path = build_plan_path(provider, service, resource, policy) + + if plan_path.exists(): + logs.append(f"plan.json already exists: {plan_path}") + return True, None, "\n".join(logs) + + generated, reason = generate_plan_json(plan_path.parent, logs) + + if not generated: + return False, reason or f"plan.json could not be generated at {plan_path}", "\n".join(logs) + + return True, None, "\n".join(logs) + + def scan_policy( provider: str, service: str, @@ -231,12 +252,9 @@ def scan_policy( return 0, False, reason, "\n".join(logs) if not plan_path.exists(): - input_dir = plan_path.parent - - generated, reason = generate_plan_json(input_dir, logs) - - if not generated: - return 0, False, reason or f"plan.json could not be generated at {plan_path}", "\n".join(logs) + reason = f"plan.json not found at {plan_path}. Terraform generation stage did not create it." + logs.append(f"Skipping: {reason}") + return 0, False, reason, "\n".join(logs) query = build_opa_query( provider=provider, @@ -318,17 +336,27 @@ def main() -> None: ) parser.add_argument( - "-w", - "--workers", + "--terraform-workers", + type=int, + default=2, + help="Number of concurrent Terraform plan generations to run. Default: 2", + ) + + parser.add_argument( + "--opa-workers", type=int, default=4, - help="Number of concurrent policy scans to run. Default: 4", + help="Number of concurrent OPA policy scans to run. Default: 4", ) args = parser.parse_args() - if args.workers < 1: - print("Error: --workers must be at least 1.", file=sys.stderr) + if args.terraform_workers < 1: + print("Error: --terraform-workers must be at least 1.", file=sys.stderr) + sys.exit(1) + + if args.opa_workers < 1: + print("Error: --opa-workers must be at least 1.", file=sys.stderr) sys.exit(1) provider = args.provider @@ -404,9 +432,66 @@ def main() -> None: } ) - print(f"\nFound {len(scan_targets)} policies to scan. Starting scans with {args.workers} workers...\n") - - with ThreadPoolExecutor(max_workers=args.workers) as executor: + print(f"\n\nFound {len(scan_targets)} policies to scan.\n") + + targets_needing_plan = [ + target + for target in scan_targets + if not build_plan_path( + target["provider"], + target["service"], + target["resource"], + target["policy"], + ).exists() + ] + + if len(targets_needing_plan) > 0: + print( + f"Found {len(targets_needing_plan)} missing plan.json files. " + f"Starting Terraform generation with {args.terraform_workers} workers...\n" + ) + else: + print("All plan.json files already exist. Skipping Terraform generation.\n") + + with ThreadPoolExecutor(max_workers=args.terraform_workers) as executor: + future_to_policy = { + executor.submit( + prepare_plan_json, + target["provider"], + target["service"], + target["resource"], + target["policy"], + ): target["policy_ref"] + for target in targets_needing_plan + } + + for future in as_completed(future_to_policy): + policy_ref = future_to_policy[future] + + try: + plan_ready, problem, log_output = future.result() + print(log_output) + except Exception as error: + failed_checks.append((policy_ref, f"Unexpected Terraform generation error: {error}")) + final_exit_code = 1 + continue + + if not plan_ready: + failed_checks.append((policy_ref, problem or "Terraform plan generation failed")) + final_exit_code = 1 + + plan_failed_refs = {check_ref for check_ref, _ in failed_checks} + scan_ready_targets = [ + target + for target in scan_targets + if target["policy_ref"] not in plan_failed_refs + ] + + print( + f"\nStarting OPA scans for {len(scan_ready_targets)} policies.\n" + ) + + with ThreadPoolExecutor(max_workers=args.opa_workers) as executor: future_to_policy = { executor.submit( scan_policy, @@ -417,7 +502,7 @@ def main() -> None: output_type, args.format, ): target["policy_ref"] - for target in scan_targets + for target in scan_ready_targets } for future in as_completed(future_to_policy): @@ -427,7 +512,7 @@ def main() -> None: exit_code, scanned, problem, log_output = future.result() print(log_output) except Exception as error: - failed_checks.append((policy_ref, f"Unexpected error: {error}")) + failed_checks.append((policy_ref, f"Unexpected OPA scan error: {error}")) final_exit_code = 1 continue From eda2137693945524e203f51e22d66e3b515aee3e Mon Sep 17 00:00:00 2001 From: tamim1517 Date: Fri, 29 May 2026 17:28:27 +1000 Subject: [PATCH 7/7] updated documentation and modified plan.json creation logic --- policy_scan.py | 29 ++---- scripts/policy_scan/README.md | 179 ++++++++++++++++++++++++---------- 2 files changed, 136 insertions(+), 72 deletions(-) diff --git a/policy_scan.py b/policy_scan.py index 16e443852..faa00b29c 100644 --- a/policy_scan.py +++ b/policy_scan.py @@ -155,7 +155,7 @@ def generate_plan_json(input_dir: Path, logs: list[str]) -> tuple[bool, str | No plan_file = input_dir / "plan" plan_json_file = input_dir / "plan.json" - logs.append(f"plan.json not found. Generating Terraform plan in: {input_dir}") + logs.append(f"Generating Terraform plan in: {input_dir}") commands = [ ["terraform", "init"], @@ -214,10 +214,6 @@ def prepare_plan_json( logs = [] plan_path = build_plan_path(provider, service, resource, policy) - if plan_path.exists(): - logs.append(f"plan.json already exists: {plan_path}") - return True, None, "\n".join(logs) - generated, reason = generate_plan_json(plan_path.parent, logs) if not generated: @@ -434,24 +430,11 @@ def main() -> None: print(f"\n\nFound {len(scan_targets)} policies to scan.\n") - targets_needing_plan = [ - target - for target in scan_targets - if not build_plan_path( - target["provider"], - target["service"], - target["resource"], - target["policy"], - ).exists() - ] - - if len(targets_needing_plan) > 0: - print( - f"Found {len(targets_needing_plan)} missing plan.json files. " - f"Starting Terraform generation with {args.terraform_workers} workers...\n" - ) - else: - print("All plan.json files already exist. Skipping Terraform generation.\n") + targets_needing_plan = scan_targets + print( + f"Starting Terraform generation for {len(targets_needing_plan)} policies " + f"with {args.terraform_workers} workers...\n" + ) with ThreadPoolExecutor(max_workers=args.terraform_workers) as executor: future_to_policy = { diff --git a/scripts/policy_scan/README.md b/scripts/policy_scan/README.md index 2be7aea6b..b9720fe6b 100644 --- a/scripts/policy_scan/README.md +++ b/scripts/policy_scan/README.md @@ -2,86 +2,116 @@ ## Overview -`local_scan.py` is a helper script for running OPA policy checks locally before raising a pull request. +`policy_scan.py` is a helper script for running local Terraform plan generation and OPA/Rego policy checks before raising a pull request. -Instead of manually writing long `opa eval` commands, students can use one simple command format to scan policies for a selected cloud provider and service. +Instead of manually running long Terraform and `opa eval` commands, students can use one script to: -The script also runs the existing branch name checker and service linter before scanning policies. This helps reduce common mistakes such as wrong branch naming, incorrect resource naming, missing files, or policy structure issues. +- check the branch name +- run the service linter +- generate or refresh `plan.json` +- scan policies with OPA +- show a clear success and failure summary + +The script now runs Terraform plan generation for every selected policy, even when a `plan.json` file already exists. This helps make sure the scan uses the latest Terraform configuration. --- ## Basic Command Format ```bash -python scripts\policy_scan\local_scan.py / +python policy_scan.py --provider ``` Example: ```bash -python scripts\policy_scan\local_scan.py gcp/artifact_registry +python policy_scan.py --provider gcp ``` +This scans all services under the selected provider. + --- ## Example Commands -### 1. Scan a Full Service +### 1. Scan All Services for a Provider + +```bash +python policy_scan.py --provider +``` + +Example: ```bash -python scripts\policy_scan\local_scan.py / +python policy_scan.py --provider gcp ``` -This scans all resources and policies inside the service. +This scans all services, resources, and policies under the selected provider. --- -### 2. Scan One Resource Only +### 2. Scan One Service ```bash -python scripts\policy_scan\local_scan.py / --resource +python policy_scan.py --provider --service ``` -This scans only the selected Terraform resource. +Example: + +```bash +python policy_scan.py --provider gcp --service artifact_registry +``` + +This scans all resources and policies inside the selected service. --- -### 3. Scan One Specific Policy +### 3. Scan One Resource Only + +```bash +python policy_scan.py --provider --service --resource +``` + +Example: ```bash -python scripts\policy_scan\local_scan.py / --resource --policy +python policy_scan.py --provider gcp --service artifact_registry --resource google_artifact_registry_repository ``` -This scans only the policy for the selected resource. +This scans only the selected Terraform resource. --- -### 4. Show Policy Message Output (Optional) +### 4. Scan One Specific Policy ```bash -python scripts\policy_scan\local_scan.py / --message +python policy_scan.py --provider --service --resource --policy ``` -This shows the policy `message` output. +Example: + +```bash +python policy_scan.py --provider gcp --service artifact_registry --resource google_artifact_registry_repository --policy approved_formats +``` -By default, the script shows `message`, so this flag is optional. +This scans only one policy for the selected resource. --- ### 5. Show Policy Details Output ```bash -python scripts\policy_scan\local_scan.py / --details +python policy_scan.py --provider gcp --service artifact_registry --details ``` This shows the policy `details` output instead of the message. --- -### 6. Change Output Format +### 6. Change OPA Output Format ```bash -python scripts\policy_scan\local_scan.py / --format json +python policy_scan.py --provider gcp --service artifact_registry --format json ``` Supported formats are: @@ -100,76 +130,127 @@ pretty --- -## What Happens When the script is run +### 7. Change Worker Counts + +The script uses concurrent workers to speed up Terraform plan generation and OPA scans. + +```bash +python policy_scan.py --provider gcp --service artifact_registry --terraform-workers 2 --opa-workers 4 +``` + +Default values: + +```text +Terraform workers : 2 +OPA workers : 4 +``` + +Use lower worker counts if your computer becomes slow or if Terraform commands fail because too many tasks are running at the same time. + +--- + +## What Happens When the Script Runs When the script runs, it follows these steps: 1. Checks whether the branch name follows the project naming rule. 2. Runs the existing service linter. -3. Finds the selected service folder inside the `inputs` directory. -4. Looks for resource folders and policy folders. -5. Checks whether each policy has: - - a `policy.rego` file - - a matching `plan.json` file -6. Runs the OPA policy check. -7. Shows a final summary of scanned and skipped policies. +3. Finds the selected provider, service, resource, and policy folders. +4. Runs Terraform commands for each selected policy folder: + + ```bash + terraform init + terraform plan --out=plan + terraform show -json plan > plan.json + ``` + +5. Regenerates `plan.json` even if it already exists. +6. Checks whether each policy has a matching `policy.rego` file. +7. Runs the OPA policy check using the generated `plan.json`. +8. Shows a final summary of successful policies and failed checks. --- ## Important Notes -The service path must follow this format: +`--provider` is required. -```text -/ +Correct: + +```bash +python policy_scan.py --provider gcp ``` -Correct example: +Incorrect: ```bash -python scripts\policy_scan\local_scan.py gcp/artifact_registry +python policy_scan.py ``` -Incorrect example: +`--resource` cannot be used without `--service`, because resources are inside service folders. + +Correct: ```bash -python scripts\policy_scan\local_scan.py artifact_registry +python policy_scan.py --provider gcp --service artifact_registry --resource google_artifact_registry_repository ``` -At the moment, the service linter only supports GCP. If another provider is used, the script will skip the service linter. +Incorrect: -Also, `--policy` cannot be used alone. A policy belongs inside a resource folder, so `--resource` must be provided first. +```bash +python policy_scan.py --provider gcp --resource google_artifact_registry_repository +``` + +`--policy` cannot be used without `--resource`, because policies are inside resource folders. Correct: ```bash -python scripts\policy_scan\local_scan.py / --resource --policy +python policy_scan.py --provider gcp --service artifact_registry --resource google_artifact_registry_repository --policy approved_formats ``` Incorrect: ```bash -python scripts\policy_scan\local_scan.py / --policy +python policy_scan.py --provider gcp --service artifact_registry --policy approved_formats ``` +At the moment, the service linter only supports GCP. If another provider is used, the script skips the service linter. + --- -## Final Output +## Available Flags -At the end, the script shows a summary like this: +The script supports both full flag names and short-hand flags. Students can use either format. -```text -Local policy scan completed -Scanned policies : 3 -Skipped policies : 1 +| Purpose | Full flag | Short-hand flag | Example value | +|---|---|---|---| +| Select provider | `--provider` | `-p` | `gcp` | +| Select service | `--service` | `-s` | `artifact_registry` | +| Select resource | `--resource` | `-r` | `google_artifact_registry_repository` | +| Select policy | `--policy` | `-po` | `approved_formats` | +| Show message output | `--message` | `-m` | no value needed | +| Show details output | `--details` | `-d` | no value needed | +| Change OPA format | `--format` | `-f` | `pretty`, `json`, or `raw` | +| Set Terraform workers | `--terraform-workers` | no short-hand | `2` | +| Set OPA workers | `--opa-workers` | no short-hand | `4` | + +Example using full flags: + +```bash +python policy_scan.py --provider gcp --service artifact_registry --resource google_artifact_registry_repository --policy approved_formats --details --format pretty ``` -A policy may be skipped if the script cannot find the required `policy.rego` or `plan.json` file. +Same command using short-hand flags: + +```bash +python policy_scan.py -p gcp -s artifact_registry -r google_artifact_registry_repository -po approved_formats -d -f pretty +``` --- ## Summary -`local_scan.py` makes local policy testing easier and more consistent for students. +`policy_scan.py` makes local policy testing easier and more consistent for students. -It allows students to scan policies, check branch naming, run the service linter, and prepare their work before raising a pull request. +It checks the branch name, runs the service linter, regenerates Terraform `plan.json` files, runs OPA scans, and provides a final summary.