Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,11 @@ def extract_profile_id(text: str) -> str:


def is_valid_profile_id_format(profile_id: str) -> bool:
"""
Checks if a profile ID matches the expected format.

Validates against PROFILE_ID_PATTERN and enforces maximum length of 64 characters.
"""
Comment on lines +722 to +726
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several of the newly added docstrings include “blank” separator lines that contain trailing whitespace (e.g., the empty line after the summary here). This tends to trigger whitespace/formatting linters and makes diffs noisier over time; please strip trailing whitespace from the blank lines in these new docstrings.

Copilot uses AI. Check for mistakes.
if not PROFILE_ID_PATTERN.match(profile_id):
return False
if len(profile_id) > 64:
Expand All @@ -727,6 +732,12 @@ def is_valid_profile_id_format(profile_id: str) -> bool:


def validate_profile_id(profile_id: str, log_errors: bool = True) -> bool:
"""
Validates a Control D profile ID with optional error logging.

Returns True if profile ID is valid, False otherwise.
Logs specific validation errors when log_errors=True.
"""
if not is_valid_profile_id_format(profile_id):
if log_errors:
if not PROFILE_ID_PATTERN.match(profile_id):
Expand Down Expand Up @@ -785,6 +796,12 @@ def is_valid_folder_name(name: str) -> bool:


def validate_folder_data(data: Dict[str, Any], url: str) -> bool:
"""
Validates folder JSON data structure and content.

Checks for required fields (name, action, rules), validates folder name
and action type, and ensures rules are valid. Logs specific validation errors.
"""
Comment on lines +799 to +804
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new docstring doesn’t match what validate_folder_data() actually validates. The implementation currently only checks the top-level shape (dict), presence/type of data['group'], presence/type of data['group']['group'], and validates the folder name; it does not validate action, rules, or rule contents as described. Please either update the docstring to reflect the real validations, or extend the function to validate the fields mentioned in the docstring.

Copilot uses AI. Check for mistakes.
if not isinstance(data, dict):
log.error(
f"Invalid data from {sanitize_for_log(url)}: Root must be a JSON object."
Expand Down Expand Up @@ -1102,6 +1119,12 @@ def check_api_access(client: httpx.Client, profile_id: str) -> bool:


def list_existing_folders(client: httpx.Client, profile_id: str) -> Dict[str, str]:
"""
Retrieves all existing folders (groups) for a given profile.

Returns a dictionary mapping folder names to their IDs.
Returns empty dict on error.
"""
try:
data = _api_get(client, f"{API_BASE}/{profile_id}/groups").json()
folders = data.get("body", {}).get("groups", [])
Expand Down Expand Up @@ -1236,6 +1259,13 @@ def get_all_existing_rules(
profile_id: str,
known_folders: Optional[Dict[str, str]] = None,
) -> Set[str]:
"""
Fetches all existing rules across root and all folders.

Retrieves rules from the root level and all folders in parallel.
Uses known_folders to avoid redundant API calls when provided.
Returns set of rule IDs.
"""
all_rules = set()

def _fetch_folder_rules(folder_id: str) -> List[str]:
Expand Down Expand Up @@ -1298,13 +1328,26 @@ def _fetch_folder_rules(folder_id: str) -> List[str]:


def fetch_folder_data(url: str) -> Dict[str, Any]:
"""
Downloads and validates folder JSON data from a URL.

Uses cached GET request and validates the folder structure.
Raises KeyError if validation fails.
"""
js = _gh_get(url)
if not validate_folder_data(js, url):
raise KeyError(f"Invalid folder data from {sanitize_for_log(url)}")
return js


def warm_up_cache(urls: Sequence[str]) -> None:
"""
Pre-fetches and caches folder data from multiple URLs in parallel.

Validates URLs and fetches data concurrently to minimize cold-start latency.
Shows progress bar when USE_COLORS is enabled. Skips invalid URLs while
emitting warnings/log entries for validation and fetch failures.
"""
urls = list(set(urls))
with _cache_lock:
urls_to_process = [u for u in urls if u not in _cache]
Expand Down Expand Up @@ -1358,6 +1401,11 @@ def _validate_and_fetch(url: str):
def delete_folder(
client: httpx.Client, profile_id: str, name: str, folder_id: str
) -> bool:
"""
Deletes a folder (group) from a Control D profile.

Returns True on success, False on failure. Logs detailed error information.
"""
try:
_api_delete(client, f"{API_BASE}/{profile_id}/groups/{folder_id}")
log.info(
Expand Down Expand Up @@ -1466,6 +1514,13 @@ def push_rules(
existing_rules: Set[str],
client: httpx.Client,
) -> bool:
"""
Pushes rules to a folder in batches, filtering duplicates and invalid rules.

Deduplicates input, validates rules against RULE_PATTERN, and sends batches
in parallel for optimal performance. Updates existing_rules set with newly
added rules. Returns True if all batches succeed.
"""
if not hostnames:
log.info("Folder %s - no rules to push", sanitize_for_log(folder_name))
return True
Expand Down Expand Up @@ -1533,6 +1588,7 @@ def push_rules(
progress_label = f"Folder {sanitized_folder_name}"

def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]:
"""Processes a single batch of rules by sending API request."""
data = {
"do": str_do,
"status": str_status,
Expand Down Expand Up @@ -1678,6 +1734,14 @@ def sync_profile(
no_delete: bool = False,
plan_accumulator: Optional[List[Dict[str, Any]]] = None,
) -> bool:
"""
Synchronizes Control D folders from remote blocklist URLs.

Fetches folder data, optionally deletes existing folders with same names,
creates new folders, and pushes rules in batches. In dry-run mode, only
generates a plan without making API changes. Returns True if all folders
sync successfully.
"""
# SECURITY: Clear cached DNS validations at the start of each sync run.
# This prevents TOCTOU issues where a domain's IP could change between runs.
validate_folder_url.cache_clear()
Expand Down Expand Up @@ -1874,6 +1938,12 @@ def _fetch_if_valid(url: str):
# 5. Entry-point
# --------------------------------------------------------------------------- #
def parse_args() -> argparse.Namespace:
"""
Parses command-line arguments for the Control D sync tool.

Supports profile IDs, folder URLs, dry-run mode, no-delete flag,
and plan JSON output file path.
"""
parser = argparse.ArgumentParser(description="Control D folder sync")
parser.add_argument(
"--profiles", help="Comma-separated list of profile IDs", default=None
Expand All @@ -1890,6 +1960,14 @@ def parse_args() -> argparse.Namespace:


def main():
"""
Main entry point for Control D Sync.

Loads environment configuration, validates inputs, warms up cache,
and syncs profiles. Supports interactive prompts for missing credentials
when running in a TTY. Prints summary statistics and exits with appropriate
status code.
"""
# SECURITY: Check .env permissions (after Colors is defined for NO_COLOR support)
# This must happen BEFORE load_dotenv() to prevent reading secrets from world-readable files
check_env_permissions()
Expand Down Expand Up @@ -1920,6 +1998,7 @@ def main():
)

def validate_profile_input(value: str) -> bool:
"""Validates one or more profile IDs from comma-separated input."""
ids = [extract_profile_id(p) for p in value.split(",") if p.strip()]
return bool(ids) and all(
validate_profile_id(pid, log_errors=False) for pid in ids
Expand Down
Loading