Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions pipelines/detection-as-code/scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,94 @@ def update_data_tables(ctx: AppContext):
", ".join(names))


@cli.command()
@pass_context
def pull_rules(ctx: AppContext):
"""Pull all rules from SecOps and populate local files."""
from config import SECOPS_RULES_CONFIG_PATH, BASE_RULES_DIR, PROJECT_ID, REGION, CUSTOMER_ID
import ruamel.yaml

_LOGGER.info("Fetching rules from SecOps...")
try:
rules_data = ctx.chronicle_client.list_rules().get("rules", [])
_LOGGER.info("Retrieved %d rules.", len(rules_data))

deployments_data = ctx.chronicle_client.list_rule_deployments().get(
"ruleDeployments", [])
_LOGGER.info("Retrieved %d rule deployments.", len(deployments_data))

# Build a mapping of ruleId -> deployment details
deployments_map = {
d.get("name"): d
for d in deployments_data if d.get("name")
}

yaml = ruamel.yaml.YAML()
yaml.preserve_quotes = True

rules_config = {}
if SECOPS_RULES_CONFIG_PATH.is_file():
with open(SECOPS_RULES_CONFIG_PATH, "r", encoding="utf-8") as f:
rules_config = yaml.load(f) or {}

BASE_RULES_DIR.mkdir(parents=True, exist_ok=True)

import_commands = []

for rule in rules_data:
rule_id = rule.get("name").split("/")[-1]
rule_text = rule.get("text", "")

# Extract rule name from text
match = re.search(r"rule\s+([a-zA-Z0-9_]+)\s*\{", rule_text)
if not match:
_LOGGER.warning(
"Could not extract rule name for rule ID %s. Skipping.",
rule_id)
continue

rule_name = match.group(1)

# Save rule to file
rule_file_path = BASE_RULES_DIR / f"{rule_name}.yaral"
rule_file_path.write_text(rule_text, encoding="utf-8")

# Update secops_rules_config
dep = deployments_map.get(rule_id, {})
if rule_name not in rules_config:
rules_config[rule_name] = {}

rules_config[rule_name]["enabled"] = dep.get("enabled", False)
rules_config[rule_name]["alerting"] = dep.get("alerting", False)
rules_config[rule_name]["archived"] = dep.get("archived", False)
rules_config[rule_name]["run_frequency"] = dep.get(
"runFrequency", "LIVE")

# Generate terraform import commands
tf_parent = f"projects/{PROJECT_ID}/locations/{REGION}/instances/{CUSTOMER_ID}/rules/{rule_id}"
import_commands.append(
f"terraform import 'google_chronicle_rule.rule[\"{rule_name}\"]' {tf_parent}"
)
import_commands.append(
f"terraform import 'google_chronicle_rule_deployment.rule_deployment[\"{rule_name}\"]' {tf_parent}/deployment"
)

with open(SECOPS_RULES_CONFIG_PATH, "w", encoding="utf-8") as f:
yaml.dump(rules_config, f)

_LOGGER.info("Successfully pulled rules and updated %s.",
SECOPS_RULES_CONFIG_PATH.name)

if import_commands:
click.echo("\n--- Terraform Import Commands ---")
for cmd in import_commands:
click.echo(cmd)

except Exception as e:
_LOGGER.error("Failed to pull rules: %s", e, exc_info=True)
sys.exit(1)


@cli.command()
@pass_context
def verify_rules(ctx: AppContext):
Expand Down
Loading