From fd4016ae443ccd960e7c08828ab7502b7fda2e0a Mon Sep 17 00:00:00 2001 From: bruzzechesse Date: Fri, 20 Mar 2026 10:53:05 +0100 Subject: [PATCH 1/2] add pull-rules method for dac --- pipelines/detection-as-code/scripts/main.py | 76 +++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/pipelines/detection-as-code/scripts/main.py b/pipelines/detection-as-code/scripts/main.py index b0bce25..3e12f79 100644 --- a/pipelines/detection-as-code/scripts/main.py +++ b/pipelines/detection-as-code/scripts/main.py @@ -101,6 +101,82 @@ def update_data_tables(ctx: AppContext): ", ".join(names)) +@cli.command() +@pass_context +def pull_rules(ctx: AppContext): + """Pull all rules from SecOps and populate local files.""" + from config import SECOPS_RULES_CONFIG_PATH, BASE_RULES_DIR, PROJECT_ID, REGION, CUSTOMER_ID + import ruamel.yaml + + _LOGGER.info("Fetching rules from SecOps...") + try: + rules_data = ctx.chronicle_client.list_rules().get("rules", []) + _LOGGER.info("Retrieved %d rules.", len(rules_data)) + + deployments_data = ctx.chronicle_client.list_rule_deployments().get("ruleDeployments", []) + _LOGGER.info("Retrieved %d rule deployments.", len(deployments_data)) + + # Build a mapping of ruleId -> deployment details + deployments_map = {d.get("name"): d for d in deployments_data if d.get("name")} + + yaml = ruamel.yaml.YAML() + yaml.preserve_quotes = True + + rules_config = {} + if SECOPS_RULES_CONFIG_PATH.is_file(): + with open(SECOPS_RULES_CONFIG_PATH, "r", encoding="utf-8") as f: + rules_config = yaml.load(f) or {} + + BASE_RULES_DIR.mkdir(parents=True, exist_ok=True) + + import_commands = [] + + for rule in rules_data: + rule_id = rule.get("name").split("/")[-1] + rule_text = rule.get("text", "") + + # Extract rule name from text + match = re.search(r"rule\s+([a-zA-Z0-9_]+)\s*\{", rule_text) + if not match: + _LOGGER.warning("Could not extract rule name for rule ID %s. Skipping.", rule_id) + continue + + rule_name = match.group(1) + + # Save rule to file + rule_file_path = BASE_RULES_DIR / f"{rule_name}.yaral" + rule_file_path.write_text(rule_text, encoding="utf-8") + + # Update secops_rules_config + dep = deployments_map.get(f"{rule.get("name")}/deployment", {}) + if rule_name not in rules_config: + rules_config[rule_name] = {} + + rules_config[rule_name]["enabled"] = dep.get("enabled", False) + rules_config[rule_name]["alerting"] = dep.get("alerting", False) + rules_config[rule_name]["archived"] = dep.get("archived", False) + rules_config[rule_name]["run_frequency"] = dep.get("runFrequency", "LIVE") + + # Generate terraform import commands + tf_parent = f"projects/{PROJECT_ID}/locations/{REGION}/instances/{CUSTOMER_ID}/rules/{rule_id}" + import_commands.append(f"terraform import 'google_chronicle_rule.rule[\"{rule_name}\"]' {tf_parent}") + import_commands.append(f"terraform import 'google_chronicle_rule_deployment.rule_deployment[\"{rule_name}\"]' {tf_parent}/deployment") + + with open(SECOPS_RULES_CONFIG_PATH, "w", encoding="utf-8") as f: + yaml.dump(rules_config, f) + + _LOGGER.info("Successfully pulled rules and updated %s.", SECOPS_RULES_CONFIG_PATH.name) + + if import_commands: + click.echo("\n--- Terraform Import Commands ---") + for cmd in import_commands: + click.echo(cmd) + + except Exception as e: + _LOGGER.error("Failed to pull rules: %s", e, exc_info=True) + sys.exit(1) + + @cli.command() @pass_context def verify_rules(ctx: AppContext): From d65991dffdfaa2bad6701265e02aaf20cc85d5f8 Mon Sep 17 00:00:00 2001 From: bruzzechesse Date: Fri, 20 Mar 2026 10:57:18 +0100 Subject: [PATCH 2/2] fix lint --- pipelines/detection-as-code/scripts/main.py | 30 ++++++++++++++------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/pipelines/detection-as-code/scripts/main.py b/pipelines/detection-as-code/scripts/main.py index 3e12f79..41dd189 100644 --- a/pipelines/detection-as-code/scripts/main.py +++ b/pipelines/detection-as-code/scripts/main.py @@ -113,11 +113,15 @@ def pull_rules(ctx: AppContext): rules_data = ctx.chronicle_client.list_rules().get("rules", []) _LOGGER.info("Retrieved %d rules.", len(rules_data)) - deployments_data = ctx.chronicle_client.list_rule_deployments().get("ruleDeployments", []) + deployments_data = ctx.chronicle_client.list_rule_deployments().get( + "ruleDeployments", []) _LOGGER.info("Retrieved %d rule deployments.", len(deployments_data)) # Build a mapping of ruleId -> deployment details - deployments_map = {d.get("name"): d for d in deployments_data if d.get("name")} + deployments_map = { + d.get("name"): d + for d in deployments_data if d.get("name") + } yaml = ruamel.yaml.YAML() yaml.preserve_quotes = True @@ -138,7 +142,9 @@ def pull_rules(ctx: AppContext): # Extract rule name from text match = re.search(r"rule\s+([a-zA-Z0-9_]+)\s*\{", rule_text) if not match: - _LOGGER.warning("Could not extract rule name for rule ID %s. Skipping.", rule_id) + _LOGGER.warning( + "Could not extract rule name for rule ID %s. Skipping.", + rule_id) continue rule_name = match.group(1) @@ -148,24 +154,30 @@ def pull_rules(ctx: AppContext): rule_file_path.write_text(rule_text, encoding="utf-8") # Update secops_rules_config - dep = deployments_map.get(f"{rule.get("name")}/deployment", {}) + dep = deployments_map.get(rule_id, {}) if rule_name not in rules_config: rules_config[rule_name] = {} - + rules_config[rule_name]["enabled"] = dep.get("enabled", False) rules_config[rule_name]["alerting"] = dep.get("alerting", False) rules_config[rule_name]["archived"] = dep.get("archived", False) - rules_config[rule_name]["run_frequency"] = dep.get("runFrequency", "LIVE") + rules_config[rule_name]["run_frequency"] = dep.get( + "runFrequency", "LIVE") # Generate terraform import commands tf_parent = f"projects/{PROJECT_ID}/locations/{REGION}/instances/{CUSTOMER_ID}/rules/{rule_id}" - import_commands.append(f"terraform import 'google_chronicle_rule.rule[\"{rule_name}\"]' {tf_parent}") - import_commands.append(f"terraform import 'google_chronicle_rule_deployment.rule_deployment[\"{rule_name}\"]' {tf_parent}/deployment") + import_commands.append( + f"terraform import 'google_chronicle_rule.rule[\"{rule_name}\"]' {tf_parent}" + ) + import_commands.append( + f"terraform import 'google_chronicle_rule_deployment.rule_deployment[\"{rule_name}\"]' {tf_parent}/deployment" + ) with open(SECOPS_RULES_CONFIG_PATH, "w", encoding="utf-8") as f: yaml.dump(rules_config, f) - _LOGGER.info("Successfully pulled rules and updated %s.", SECOPS_RULES_CONFIG_PATH.name) + _LOGGER.info("Successfully pulled rules and updated %s.", + SECOPS_RULES_CONFIG_PATH.name) if import_commands: click.echo("\n--- Terraform Import Commands ---")