diff --git a/.jules/sentinel.md b/.jules/sentinel.md index 038829e..f0eff30 100644 --- a/.jules/sentinel.md +++ b/.jules/sentinel.md @@ -61,3 +61,8 @@ **Learning:** Logic errors in security controls often lead to "fail-closed" states that break functionality entirely, or "fail-open" states that bypass security. Implicit returns in Python (`None`) can be dangerous when boolean validation is expected. **Prevention:** Always use explicit return statements for both success and failure paths in validation functions. Use static analysis (linting) to catch unreachable code and implicit returns. Ensure unit tests cover positive cases (valid inputs) as rigorously as negative cases (attack vectors). + +## 2025-02-18 - Broken Security Validation +**Vulnerability:** Syntax errors in `validate_folder_data` rendered nested rule validation inoperable and prevented the application from running. The errors (e.g., `enumerate (rgi"rules"1)`) resembled OCR artifacts. +**Learning:** Security controls can be silently broken if not covered by tests that are run in CI. A syntax error in a "dead" or rarely used code path can persist. In this case, it was a top-level SyntaxError, meaning the file was never imported/run in the environment where it was committed. +**Prevention:** Enforce pre-commit hooks that run linters to catch syntax errors. Ensure 100% code coverage for security-critical validation functions. diff --git a/main.py b/main.py index 7e75861..8cd1fb4 100644 --- a/main.py +++ b/main.py @@ -77,9 +77,33 @@ class Box: """Box drawing characters for pretty tables.""" if USE_COLORS: - H, V, TL, TR, BL, BR, T, B, L, R, X = "─", "│", "┌", "┐", "└", "┘", "┬", "┴", "├", "┤", "┼" + H, V, TL, TR, BL, BR, T, B, L, R, X = ( + "─", + "│", + "┌", + "┐", + "└", + "┘", + "┬", + "┴", + "├", + "┤", + "┼", + ) else: - H, V, TL, TR, BL, BR, T, B, L, R, X = "-", "|", "+", "+", "+", "+", "+", "+", "+", "+", "+" + H, V, TL, TR, BL, BR, T, B, L, R, X = ( + "-", + "|", + "+", + "+", + "+", + "+", + "+", + "+", + "+", + "+", + "+", + ) class ColoredFormatter(logging.Formatter): @@ -331,18 +355,32 @@ def print_summary_table(results: List[Dict[str, Any]], dry_run: bool) -> None: if USE_COLORS: # Unicode Box Drawing chars = { - "tl": "┌", "tm": "┬", "tr": "┐", - "bl": "└", "bm": "┴", "br": "┘", - "ml": "├", "mm": "┼", "mr": "┤", - "v": "│", "h": "─", + "tl": "┌", + "tm": "┬", + "tr": "┐", + "bl": "└", + "bm": "┴", + "br": "┘", + "ml": "├", + "mm": "┼", + "mr": "┤", + "v": "│", + "h": "─", } else: # ASCII Fallback chars = { - "tl": "+", "tm": "+", "tr": "+", - "bl": "+", "bm": "+", "br": "+", - "ml": "+", "mm": "+", "mr": "+", - "v": "|", "h": "-", + "tl": "+", + "tm": "+", + "tr": "+", + "bl": "+", + "bm": "+", + "br": "+", + "ml": "+", + "mm": "+", + "mr": "+", + "v": "|", + "h": "-", } def _print_separator(left, mid, right): @@ -367,22 +405,33 @@ def _print_row(profile, folders, rules, duration, status, is_header=False): d_val = f"{Colors.BOLD}{d_val}{Colors.ENDC}" s_val = f"{Colors.BOLD}{s_val}{Colors.ENDC}" - print( - f"{v} {p_val} {v} {f_val} {v} {r_val} {v} {d_val} {v} {s_val} {v}" - ) + print(f"{v} {p_val} {v} {f_val} {v} {r_val} {v} {d_val} {v} {s_val} {v}") title_text = "DRY RUN SUMMARY" if dry_run else "SYNC SUMMARY" title_color = Colors.CYAN if dry_run else Colors.HEADER total_width = ( - 1 + (col_widths["profile"] + 2) + 1 + - (col_widths["folders"] + 2) + 1 + - (col_widths["rules"] + 2) + 1 + - (col_widths["duration"] + 2) + 1 + - (col_widths["status"] + 2) + 1 + 1 + + (col_widths["profile"] + 2) + + 1 + + (col_widths["folders"] + 2) + + 1 + + (col_widths["rules"] + 2) + + 1 + + (col_widths["duration"] + 2) + + 1 + + (col_widths["status"] + 2) + + 1 ) - print("\n" + (f"{title_color}{title_text:^{total_width}}{Colors.ENDC}" if USE_COLORS else f"{title_text:^{total_width}}")) + print( + "\n" + + ( + f"{title_color}{title_text:^{total_width}}{Colors.ENDC}" + if USE_COLORS + else f"{title_text:^{total_width}}" + ) + ) _print_separator("tl", "tm", "tr") # Header row - pad manually then print @@ -465,7 +514,7 @@ def _print_row(profile, folders, rules, duration, status, is_header=False): def _get_progress_bar_width() -> int: """Calculate dynamic progress bar width based on terminal size. - + Returns width clamped between 15 and 50 characters, approximately 40% of terminal width. This ensures progress bars are readable on narrow terminals while utilizing space on wider displays. @@ -662,7 +711,9 @@ def _api_client() -> httpx.Client: # --------------------------------------------------------------------------- # # Disk cache stores validated blocklist data with HTTP cache headers (ETag, Last-Modified) # to enable fast cold-start syncs via conditional HTTP requests (304 Not Modified) -CACHE_TTL_SECONDS = 24 * 60 * 60 # 24 hours: within TTL, serve from disk without HTTP request +CACHE_TTL_SECONDS = ( + 24 * 60 * 60 +) # 24 hours: within TTL, serve from disk without HTTP request _disk_cache: Dict[str, Dict[str, Any]] = {} # Loaded from disk on startup _cache_stats = {"hits": 0, "misses": 0, "validations": 0, "errors": 0} _api_stats = {"control_d_api_calls": 0, "blocklist_fetches": 0} @@ -673,9 +724,9 @@ def _api_client() -> httpx.Client: # Track rate limit information from API responses to enable proactive throttling # and provide visibility into API quota usage _rate_limit_info = { - "limit": None, # Max requests allowed per window (from X-RateLimit-Limit) - "remaining": None, # Requests remaining in current window (from X-RateLimit-Remaining) - "reset": None, # Timestamp when limit resets (from X-RateLimit-Reset) + "limit": None, # Max requests allowed per window (from X-RateLimit-Limit) + "remaining": None, # Requests remaining in current window (from X-RateLimit-Remaining) + "reset": None, # Timestamp when limit resets (from X-RateLimit-Reset) } _rate_limit_lock = threading.Lock() # Protect _rate_limit_info updates @@ -683,12 +734,12 @@ def _api_client() -> httpx.Client: def get_cache_dir() -> Path: """ Returns platform-specific cache directory for ctrld-sync. - + Uses standard cache locations: - Linux/Unix: ~/.cache/ctrld-sync - macOS: ~/Library/Caches/ctrld-sync - Windows: %LOCALAPPDATA%/ctrld-sync/cache - + SECURITY: No user input in path construction - prevents path traversal attacks """ system = platform.system() @@ -708,13 +759,13 @@ def get_cache_dir() -> Path: def load_disk_cache() -> None: """ Loads persistent cache from disk on startup. - - GRACEFUL DEGRADATION: Any error (corrupted JSON, permissions, etc.) + + GRACEFUL DEGRADATION: Any error (corrupted JSON, permissions, etc.) is logged but ignored - we simply start with empty cache. This protects against crashes from corrupted cache files. """ global _disk_cache, _cache_stats - + try: cache_file = get_cache_dir() / "blocklists.json" if not cache_file.exists(): @@ -757,7 +808,9 @@ def load_disk_cache() -> None: if not sanitized_cache: # If nothing is valid, start with an empty cache instead of crashing later _disk_cache = {} - log.warning("Cache file contained no valid entries; starting with empty cache") + log.warning( + "Cache file contained no valid entries; starting with empty cache" + ) return if dropped_entries: @@ -785,36 +838,36 @@ def load_disk_cache() -> None: def save_disk_cache() -> None: """ Saves persistent cache to disk after successful sync. - + SECURITY: Creates cache directory with user-only permissions (0o700) to prevent other users from reading cached blocklist data. """ try: cache_dir = get_cache_dir() cache_dir.mkdir(parents=True, exist_ok=True) - + # Set directory permissions to user-only (rwx------) # This prevents other users from reading cached data if platform.system() != "Windows": cache_dir.chmod(0o700) - + cache_file = cache_dir / "blocklists.json" - + # Write atomically: write to temp file, then rename # This prevents corrupted cache if process is killed mid-write temp_file = cache_file.with_suffix(".tmp") with open(temp_file, "w", encoding="utf-8") as f: json.dump(_disk_cache, f, indent=2) - + # Set file permissions to user-only (rw-------) if platform.system() != "Windows": temp_file.chmod(0o600) - + # Atomic rename (POSIX guarantees atomicity) temp_file.replace(cache_file) - + log.debug(f"Saved {len(_disk_cache):,} entries to disk cache") - + except Exception as e: # Cache save failures are non-fatal - we just won't have cache next time log.warning(f"Failed to save cache (non-fatal): {sanitize_for_log(e)}") @@ -824,25 +877,25 @@ def save_disk_cache() -> None: def _parse_rate_limit_headers(response: httpx.Response) -> None: """ Parse rate limit headers from API response and update global tracking. - + Supports standard rate limit headers: - X-RateLimit-Limit: Maximum requests per window - X-RateLimit-Remaining: Requests remaining in current window - X-RateLimit-Reset: Unix timestamp when limit resets - Retry-After: Seconds to wait (priority on 429 responses) - + This enables: 1. Proactive throttling when approaching limits 2. Visibility into API quota usage 3. Smarter retry strategies based on actual limit state - + THREAD-SAFE: Uses _rate_limit_lock to protect shared state GRACEFUL: Invalid/missing headers are ignored (no crashes) """ global _rate_limit_info - + headers = response.headers - + # Parse standard rate limit headers # These may not exist on all responses, so we check individually try: @@ -853,34 +906,37 @@ def _parse_rate_limit_headers(response: httpx.Response) -> None: _rate_limit_info["limit"] = int(headers["X-RateLimit-Limit"]) except (ValueError, TypeError): pass # Invalid value, ignore - + # X-RateLimit-Remaining: Requests left in current window if "X-RateLimit-Remaining" in headers: try: - _rate_limit_info["remaining"] = int(headers["X-RateLimit-Remaining"]) + _rate_limit_info["remaining"] = int( + headers["X-RateLimit-Remaining"] + ) except (ValueError, TypeError): pass - + # X-RateLimit-Reset: Unix timestamp when window resets if "X-RateLimit-Reset" in headers: try: _rate_limit_info["reset"] = int(headers["X-RateLimit-Reset"]) except (ValueError, TypeError): pass - + # Log warnings when approaching rate limits # Only log if we have both limit and remaining values - if (_rate_limit_info["limit"] is not None and - _rate_limit_info["remaining"] is not None): + if ( + _rate_limit_info["limit"] is not None + and _rate_limit_info["remaining"] is not None + ): limit = _rate_limit_info["limit"] remaining = _rate_limit_info["remaining"] - + # Warn at 20% remaining capacity if limit > 0 and remaining / limit < 0.2: if _rate_limit_info["reset"]: reset_time = time.strftime( - "%H:%M:%S", - time.localtime(_rate_limit_info["reset"]) + "%H:%M:%S", time.localtime(_rate_limit_info["reset"]) ) log.warning( f"Approaching rate limit: {remaining}/{limit} requests remaining " @@ -988,7 +1044,7 @@ def extract_profile_id(text: str) -> str: def is_valid_profile_id_format(profile_id: str) -> bool: """ Checks if a profile ID matches the expected format. - + Validates against PROFILE_ID_PATTERN and enforces maximum length of 64 characters. """ if not PROFILE_ID_PATTERN.match(profile_id): @@ -1001,7 +1057,7 @@ def is_valid_profile_id_format(profile_id: str) -> bool: def validate_profile_id(profile_id: str, log_errors: bool = True) -> bool: """ Validates a Control D profile ID with optional error logging. - + Returns True if profile ID is valid, False otherwise. Logs specific validation errors when log_errors=True. """ @@ -1076,7 +1132,7 @@ def is_valid_folder_name(name: str) -> bool: def validate_folder_data(data: Dict[str, Any], url: str) -> bool: """ Validates folder JSON data structure and content. - + Checks for required fields (name, action, rules), validates folder name and action type, and ensures rules are valid. Logs specific validation errors. """ @@ -1131,19 +1187,22 @@ def validate_folder_data(data: Dict[str, Any], url: str) -> bool: ) return False if "rules" in rg: - if not isinstance (rg["rules"], list): - log. error ( - f"Invalid data from {sanitize_for_log(url)} : rule_groups[fil].rules must be a list." + if not isinstance(rg["rules"], list): + log.error( + f"Invalid data from {sanitize_for_log(url)}: rule_groups[{i}].rules must be a list." ) return False -# Ensure each rule within the group is an object (dict), -# because later code treats each rule as a mapping (e.g., rule.get(...)). -for j, rule in enumerate (rgi"rules"1): -if not isinstance (rule, dict): - log. error ( - f"Invalid data from {sanitize_for_log(u rl)}: rule_groups[fiłl.rules[kił] must be an object." - ) - return False + # Ensure each rule within the group is an object (dict), + # because later code treats each rule as a mapping (e.g., rule.get(...)). + for j, rule in enumerate(rg["rules"]): + if not isinstance(rule, dict): + log.error( + f"Invalid data from {sanitize_for_log(url)}: rule_groups[{i}].rules[{j}] must be an object." + ) + return False + + return True + # Lock to protect updates to _api_stats in multi-threaded contexts. # Without this, concurrent increments can lose updates because `+=` is not atomic. @@ -1184,7 +1243,9 @@ def _api_post_form(client: httpx.Client, url: str, data: Dict) -> httpx.Response ) -def retry_with_jitter(attempt: int, base_delay: float = 1.0, max_delay: float = MAX_RETRY_DELAY) -> float: +def retry_with_jitter( + attempt: int, base_delay: float = 1.0, max_delay: float = MAX_RETRY_DELAY +) -> float: """Calculate retry delay with exponential backoff and full jitter. Full jitter draws uniformly from [0, min(base_delay * 2^attempt, max_delay)) @@ -1198,23 +1259,23 @@ def retry_with_jitter(attempt: int, base_delay: float = 1.0, max_delay: float = Returns: Delay in seconds with full jitter applied """ - exponential_delay = min(base_delay * (2 ** attempt), max_delay) + exponential_delay = min(base_delay * (2**attempt), max_delay) return exponential_delay * random.random() def _retry_request(request_func, max_retries=MAX_RETRIES, delay=RETRY_DELAY): """ Retry request with exponential backoff and full jitter. - + RETRY STRATEGY: - Uses retry_with_jitter() for full jitter: delay drawn from [0, min(delay*2^attempt, MAX_RETRY_DELAY)] - Full jitter prevents thundering herd when multiple clients fail simultaneously - + RATE LIMIT HANDLING: - Parses X-RateLimit-* headers from all API responses - On 429 (Too Many Requests): uses Retry-After header if present - Logs warnings when approaching rate limits (< 20% remaining) - + SECURITY: - Does NOT retry 4xx client errors (except 429) - Sanitizes error messages in logs @@ -1222,11 +1283,11 @@ def _retry_request(request_func, max_retries=MAX_RETRIES, delay=RETRY_DELAY): for attempt in range(max_retries): try: response = request_func() - + # Parse rate limit headers from successful responses # This gives us visibility into quota usage even when requests succeed _parse_rate_limit_headers(response) - + response.raise_for_status() return response except (httpx.HTTPError, httpx.TimeoutException) as e: @@ -1234,11 +1295,11 @@ def _retry_request(request_func, max_retries=MAX_RETRIES, delay=RETRY_DELAY): # Retrying 4xx errors is inefficient and can trigger security alerts or rate limits. if isinstance(e, httpx.HTTPStatusError): code = e.response.status_code - + # Parse rate limit headers even from error responses # This helps us understand why we hit limits _parse_rate_limit_headers(e.response) - + # Handle 429 (Too Many Requests) with Retry-After if code == 429: # Check for Retry-After header (in seconds) @@ -1260,7 +1321,7 @@ def _retry_request(request_func, max_retries=MAX_RETRIES, delay=RETRY_DELAY): except ValueError: # Retry-After might be HTTP date format, ignore for now pass - + # Don't retry other 4xx errors (auth failures, bad requests, etc.) if 400 <= code < 500 and code != 429: if hasattr(e, "response") and e.response is not None: @@ -1273,11 +1334,11 @@ def _retry_request(request_func, max_retries=MAX_RETRIES, delay=RETRY_DELAY): if hasattr(e, "response") and e.response is not None: log.debug(f"Response content: {sanitize_for_log(e.response.text)}") raise - + # Full jitter exponential backoff: delay drawn from [0, min(delay * 2^attempt, MAX_RETRY_DELAY)] # Spreads retries evenly across the full window to prevent thundering herd wait_time = retry_with_jitter(attempt, base_delay=delay) - + log.warning( f"Request failed (attempt {attempt + 1}/{max_retries}): " f"{sanitize_for_log(e)}. Retrying in {wait_time:.2f}s..." @@ -1288,27 +1349,27 @@ def _retry_request(request_func, max_retries=MAX_RETRIES, delay=RETRY_DELAY): def _gh_get(url: str) -> Dict: """ Fetch blocklist data from URL with HTTP cache header support. - + CACHING STRATEGY: 1. Check in-memory cache first (fastest) 2. Check disk cache and send conditional request (If-None-Match/If-Modified-Since) 3. If 304 Not Modified: reuse cached data (cache validation) 4. If 200 OK: download new data and update cache - + SECURITY: Validates data structure regardless of cache source """ global _cache_stats, _api_stats - + # First check: Quick check without holding lock for long with _cache_lock: if url in _cache: _cache_stats["hits"] += 1 return _cache[url] - + # Track that we're about to make a blocklist fetch with _cache_lock: _api_stats["blocklist_fetches"] += 1 - + # Check disk cache for TTL-based hit or conditional request headers headers = {} cached_entry = _disk_cache.get(url) @@ -1333,7 +1394,7 @@ def _gh_get(url: str) -> Dict: last_modified = cached_entry.get("last_modified") if last_modified: headers["If-Modified-Since"] = last_modified - + # Fetch data (or validate cache) # Explicitly let HTTPError propagate (no need to catch just to re-raise) try: @@ -1343,18 +1404,20 @@ def _gh_get(url: str) -> Dict: if cached_entry and "data" in cached_entry: log.debug(f"Cache validated (304) for {sanitize_for_log(url)}") _cache_stats["validations"] += 1 - + # Update in-memory cache with validated data data = cached_entry["data"] with _cache_lock: _cache[url] = data - + # Update timestamp in disk cache to track last validation cached_entry["last_validated"] = time.time() return data else: # Shouldn't happen, but handle gracefully - log.warning(f"Got 304 but no cached data for {sanitize_for_log(url)}, re-fetching") + log.warning( + f"Got 304 but no cached data for {sanitize_for_log(url)}, re-fetching" + ) _cache_stats["errors"] += 1 # Close the original streaming response before retrying r.close() @@ -1381,7 +1444,7 @@ def _gh_get(url: str) -> Dict: f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. " "Falling back to streaming size check." ) - + # 2. Stream and check actual size chunks = [] current_size = 0 @@ -1393,19 +1456,19 @@ def _gh_get(url: str) -> Dict: f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)" ) chunks.append(chunk) - + try: data = json.loads(b"".join(chunks)) except json.JSONDecodeError as e: raise ValueError( f"Invalid JSON response from {sanitize_for_log(url)}" ) from e - + # Store cache headers for future conditional requests # ETag is preferred over Last-Modified (more reliable) etag = r_retry.headers.get("ETag") last_modified = r_retry.headers.get("Last-Modified") - + # Update disk cache with new data and headers _disk_cache[url] = { "data": data, @@ -1414,10 +1477,10 @@ def _gh_get(url: str) -> Dict: "fetched_at": time.time(), "last_validated": time.time(), } - + _cache_stats["misses"] += 1 return data - + r.raise_for_status() # Security: Validate Content-Type @@ -1447,7 +1510,7 @@ def _gh_get(url: str) -> Dict: f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. " "Falling back to streaming size check." ) - + # 2. Stream and check actual size chunks = [] current_size = 0 @@ -1460,19 +1523,19 @@ def _gh_get(url: str) -> Dict: f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)" ) chunks.append(chunk) - + try: data = json.loads(b"".join(chunks)) except json.JSONDecodeError as e: raise ValueError( f"Invalid JSON response from {sanitize_for_log(url)}" ) from e - + # Store cache headers for future conditional requests # ETag is preferred over Last-Modified (more reliable) etag = r.headers.get("ETag") last_modified = r.headers.get("Last-Modified") - + # Update disk cache with new data and headers _disk_cache[url] = { "data": data, @@ -1481,13 +1544,13 @@ def _gh_get(url: str) -> Dict: "fetched_at": time.time(), "last_validated": time.time(), } - + _cache_stats["misses"] += 1 - + except httpx.HTTPStatusError as e: # Re-raise with original exception (don't catch and re-raise) raise - + # Double-checked locking: Check again after fetch to avoid duplicate fetches # If another thread already cached it while we were fetching, use theirs # for consistency (return _cache[url] instead of data to ensure single source of truth) @@ -1540,7 +1603,7 @@ def check_api_access(client: httpx.Client, profile_id: str) -> bool: def list_existing_folders(client: httpx.Client, profile_id: str) -> Dict[str, str]: """ Retrieves all existing folders (groups) for a given profile. - + Returns a dictionary mapping folder names to their IDs. Returns empty dict on error. """ @@ -1688,7 +1751,7 @@ def get_all_existing_rules( ) -> Set[str]: """ Fetches all existing rules across root and all folders. - + Retrieves rules from the root level and all folders in parallel. Uses known_folders to avoid redundant API calls when provided. Returns set of rule IDs. @@ -1757,7 +1820,7 @@ def _fetch_folder_rules(folder_id: str) -> List[str]: def fetch_folder_data(url: str) -> Dict[str, Any]: """ Downloads and validates folder JSON data from a URL. - + Uses cached GET request and validates the folder structure. Raises KeyError if validation fails. """ @@ -1770,7 +1833,7 @@ def fetch_folder_data(url: str) -> Dict[str, Any]: def warm_up_cache(urls: Sequence[str]) -> None: """ Pre-fetches and caches folder data from multiple URLs in parallel. - + Validates URLs and fetches data concurrently to minimize cold-start latency. Shows progress bar when USE_COLORS is enabled. Skips invalid URLs while emitting warnings/log entries for validation and fetch failures. @@ -1830,7 +1893,7 @@ def delete_folder( ) -> bool: """ Deletes a folder (group) from a Control D profile. - + Returns True on success, False on failure. Logs detailed error information. """ try: @@ -1887,7 +1950,9 @@ def create_folder( if grp.get("group") == name: pk = str(grp["PK"]) if not validate_folder_id(pk, log_errors=False): - log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}") + log.error( + f"API returned invalid folder ID: {sanitize_for_log(pk)}" + ) continue log.info( "Created folder %s (ID %s) [Direct]", @@ -1910,7 +1975,9 @@ def create_folder( if grp["group"].strip() == name.strip(): pk = str(grp["PK"]) if not validate_folder_id(pk, log_errors=False): - log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}") + log.error( + f"API returned invalid folder ID: {sanitize_for_log(pk)}" + ) return None log.info( "Created folder %s (ID %s) [Polled]", @@ -1955,7 +2022,7 @@ def push_rules( ) -> bool: """ Pushes rules to a folder in batches, filtering duplicates and invalid rules. - + Deduplicates input, validates rules against RULE_PATTERN, and sends batches in parallel for optimal performance. Updates existing_rules set with newly added rules. Returns True if all batches succeed. @@ -2184,7 +2251,7 @@ def sync_profile( ) -> bool: """ Synchronizes Control D folders from remote blocklist URLs. - + Fetches folder data, optionally deletes existing folders with same names, creates new folders, and pushes rules in batches. In dry-run mode, only generates a plan without making API changes. Returns True if all folders @@ -2289,9 +2356,12 @@ def _fetch_if_valid(url: str): # Shared executor for rate-limited operations (DELETE, push_rules batches) # Reusing this executor prevents thread churn and enforces global rate limits. - with concurrent.futures.ThreadPoolExecutor( - max_workers=DELETE_WORKERS - ) as shared_executor, _api_client() as client: + with ( + concurrent.futures.ThreadPoolExecutor( + max_workers=DELETE_WORKERS + ) as shared_executor, + _api_client() as client, + ): # Verify access and list existing folders in one request existing_folders = verify_access_and_get_folders(client, profile_id) if existing_folders is None: @@ -2439,7 +2509,11 @@ def print_summary_table( max_p = max((len(r["profile"]) for r in sync_results), default=25) w = [max(25, max_p), 10, 12, 10, 15] - t_f, t_r, t_d = sum(r["folders"] for r in sync_results), sum(r["rules"] for r in sync_results), sum(r["duration"] for r in sync_results) + t_f, t_r, t_d = ( + sum(r["folders"] for r in sync_results), + sum(r["rules"] for r in sync_results), + sum(r["duration"] for r in sync_results), + ) all_ok = success_count == total t_status = ("✅ Ready" if dry_run else "✅ All Good") if all_ok else "❌ Errors" t_col = Colors.GREEN if all_ok else Colors.FAIL @@ -2449,27 +2523,52 @@ def print_summary_table( # Simple ASCII Fallback header = f"{'Profile ID':<{w[0]}} | {'Folders':>{w[1]}} | {'Rules':>{w[2]}} | {'Duration':>{w[3]}} | {'Status':<{w[4]}}" sep = "-" * len(header) - print(f"\n{('DRY RUN' if dry_run else 'SYNC') + ' SUMMARY':^{len(header)}}\n{sep}\n{header}\n{sep}") + print( + f"\n{('DRY RUN' if dry_run else 'SYNC') + ' SUMMARY':^{len(header)}}\n{sep}\n{header}\n{sep}" + ) for r in sync_results: - print(f"{r['profile']:<{w[0]}} | {r['folders']:>{w[1]}} | {r['rules']:>{w[2]},} | {r['duration']:>{w[3]-1}.1f}s | {r['status_label']:<{w[4]}}") - print(f"{sep}\n{'TOTAL':<{w[0]}} | {t_f:>{w[1]}} | {t_r:>{w[2]},} | {t_d:>{w[3]-1}.1f}s | {t_status:<{w[4]}}\n{sep}\n") + print( + f"{r['profile']:<{w[0]}} | {r['folders']:>{w[1]}} | {r['rules']:>{w[2]},} | {r['duration']:>{w[3]-1}.1f}s | {r['status_label']:<{w[4]}}" + ) + print( + f"{sep}\n{'TOTAL':<{w[0]}} | {t_f:>{w[1]}} | {t_r:>{w[2]},} | {t_d:>{w[3]-1}.1f}s | {t_status:<{w[4]}}\n{sep}\n" + ) return # Unicode Table - def line(l, m, r): return f"{Colors.BOLD}{l}{m.join('─' * (x+2) for x in w)}{r}{Colors.ENDC}" - def row(c): return f"{Colors.BOLD}│{Colors.ENDC} {c[0]:<{w[0]}} {Colors.BOLD}│{Colors.ENDC} {c[1]:>{w[1]}} {Colors.BOLD}│{Colors.ENDC} {c[2]:>{w[2]}} {Colors.BOLD}│{Colors.ENDC} {c[3]:>{w[3]}} {Colors.BOLD}│{Colors.ENDC} {c[4]:<{w[4]}} {Colors.BOLD}│{Colors.ENDC}" + def line(l, m, r): + return f"{Colors.BOLD}{l}{m.join('─' * (x+2) for x in w)}{r}{Colors.ENDC}" + + def row(c): + return f"{Colors.BOLD}│{Colors.ENDC} {c[0]:<{w[0]}} {Colors.BOLD}│{Colors.ENDC} {c[1]:>{w[1]}} {Colors.BOLD}│{Colors.ENDC} {c[2]:>{w[2]}} {Colors.BOLD}│{Colors.ENDC} {c[3]:>{w[3]}} {Colors.BOLD}│{Colors.ENDC} {c[4]:<{w[4]}} {Colors.BOLD}│{Colors.ENDC}" print(f"\n{line('┌', '─', '┐')}") title = f"{'DRY RUN' if dry_run else 'SYNC'} SUMMARY" - print(f"{Colors.BOLD}│{Colors.CYAN if dry_run else Colors.HEADER}{title:^{sum(w) + 14}}{Colors.ENDC}{Colors.BOLD}│{Colors.ENDC}") - print(f"{line('├', '┬', '┤')}\n{row([f'{Colors.HEADER}Profile ID{Colors.ENDC}', f'{Colors.HEADER}Folders{Colors.ENDC}', f'{Colors.HEADER}Rules{Colors.ENDC}', f'{Colors.HEADER}Duration{Colors.ENDC}', f'{Colors.HEADER}Status{Colors.ENDC}'])}") + print( + f"{Colors.BOLD}│{Colors.CYAN if dry_run else Colors.HEADER}{title:^{sum(w) + 14}}{Colors.ENDC}{Colors.BOLD}│{Colors.ENDC}" + ) + print( + f"{line('├', '┬', '┤')}\n{row([f'{Colors.HEADER}Profile ID{Colors.ENDC}', f'{Colors.HEADER}Folders{Colors.ENDC}', f'{Colors.HEADER}Rules{Colors.ENDC}', f'{Colors.HEADER}Duration{Colors.ENDC}', f'{Colors.HEADER}Status{Colors.ENDC}'])}" + ) print(line("├", "┼", "┤")) for r in sync_results: sc = Colors.GREEN if r["success"] else Colors.FAIL - print(row([r["profile"], str(r["folders"]), f"{r['rules']:,}", f"{r['duration']:.1f}s", f"{sc}{r['status_label']}{Colors.ENDC}"])) + print( + row( + [ + r["profile"], + str(r["folders"]), + f"{r['rules']:,}", + f"{r['duration']:.1f}s", + f"{sc}{r['status_label']}{Colors.ENDC}", + ] + ) + ) - print(f"{line('├', '┼', '┤')}\n{row(['TOTAL', str(t_f), f'{t_r:,}', f'{t_d:.1f}s', f'{t_col}{t_status}{Colors.ENDC}'])}") + print( + f"{line('├', '┼', '┤')}\n{row(['TOTAL', str(t_f), f'{t_r:,}', f'{t_d:.1f}s', f'{t_col}{t_status}{Colors.ENDC}'])}" + ) print(f"{line('└', '┴', '┘')}\n") @@ -2488,18 +2587,28 @@ def print_success_message(profile_ids: List[str]) -> None: print(f"\n{Colors.GREEN}{random.choice(success_msgs)}{Colors.ENDC}") # Construct dashboard URL - if profile_ids and len(profile_ids) == 1 and profile_ids[0] != "dry-run-placeholder": - dashboard_url = f"https://controld.com/dashboard/profiles/{profile_ids[0]}/filters" - print(f"{Colors.CYAN}👀 View your changes: {Colors.UNDERLINE}{dashboard_url}{Colors.ENDC}") + if ( + profile_ids + and len(profile_ids) == 1 + and profile_ids[0] != "dry-run-placeholder" + ): + dashboard_url = ( + f"https://controld.com/dashboard/profiles/{profile_ids[0]}/filters" + ) + print( + f"{Colors.CYAN}👀 View your changes: {Colors.UNDERLINE}{dashboard_url}{Colors.ENDC}" + ) elif len(profile_ids) > 1: dashboard_url = "https://controld.com/dashboard/profiles" - print(f"{Colors.CYAN}👀 View your changes: {Colors.UNDERLINE}{dashboard_url}{Colors.ENDC}") + print( + f"{Colors.CYAN}👀 View your changes: {Colors.UNDERLINE}{dashboard_url}{Colors.ENDC}" + ) def parse_args() -> argparse.Namespace: """ Parses command-line arguments for the Control D sync tool. - + Supports profile IDs, folder URLs, dry-run mode, no-delete flag, and plan JSON output file path. """ @@ -2519,7 +2628,9 @@ def parse_args() -> argparse.Namespace: ) parser.add_argument("--plan-json", help="Write plan to JSON file", default=None) parser.add_argument( - "--clear-cache", action="store_true", help="Clear the persistent blocklist cache and exit" + "--clear-cache", + action="store_true", + help="Clear the persistent blocklist cache and exit", ) return parser.parse_args() @@ -2527,7 +2638,7 @@ def parse_args() -> argparse.Namespace: def main(): """ Main entry point for Control D Sync. - + Loads environment configuration, validates inputs, warms up cache, and syncs profiles. Supports interactive prompts for missing credentials when running in a TTY. Prints summary statistics and exits with appropriate @@ -2556,7 +2667,9 @@ def main(): if cache_file.exists(): try: cache_file.unlink() - print(f"{Colors.GREEN}✓ Cleared blocklist cache: {cache_file}{Colors.ENDC}") + print( + f"{Colors.GREEN}✓ Cleared blocklist cache: {cache_file}{Colors.ENDC}" + ) except OSError as e: print(f"{Colors.FAIL}✗ Failed to clear cache: {e}{Colors.ENDC}") exit(1) @@ -2846,7 +2959,9 @@ def make_col_separator(left, mid, right, horiz): cmd_str = " ".join(cmd_parts) if USE_COLORS: - print(f"{Colors.BOLD}👉 Ready to sync? Run the following command:{Colors.ENDC}") + print( + f"{Colors.BOLD}👉 Ready to sync? Run the following command:{Colors.ENDC}" + ) print(f" {Colors.CYAN}{cmd_str}{Colors.ENDC}") else: print("👉 Ready to sync? Run the following command:") @@ -2862,16 +2977,18 @@ def make_col_separator(left, mid, right, horiz): ) else: print("⚠️ Dry run encountered errors. Please check the logs above.") - + # Display API statistics - total_api_calls = _api_stats["control_d_api_calls"] + _api_stats["blocklist_fetches"] + total_api_calls = ( + _api_stats["control_d_api_calls"] + _api_stats["blocklist_fetches"] + ) if total_api_calls > 0: print(f"{Colors.BOLD}API Statistics:{Colors.ENDC}") print(f" • Control D API calls: {_api_stats['control_d_api_calls']:>7,}") print(f" • Blocklist fetches: {_api_stats['blocklist_fetches']:>7,}") print(f" • Total API requests: {total_api_calls:>7,}") print() - + # Display cache statistics if any cache activity occurred if _cache_stats["hits"] + _cache_stats["misses"] + _cache_stats["validations"] > 0: print(f"{Colors.BOLD}Cache Statistics:{Colors.ENDC}") @@ -2880,27 +2997,33 @@ def make_col_separator(left, mid, right, horiz): print(f" • Validations (304): {_cache_stats['validations']:>7,}") if _cache_stats["errors"] > 0: print(f" • Errors (non-fatal): {_cache_stats['errors']:>7,}") - + # Calculate cache effectiveness - total_requests = _cache_stats["hits"] + _cache_stats["misses"] + _cache_stats["validations"] + total_requests = ( + _cache_stats["hits"] + _cache_stats["misses"] + _cache_stats["validations"] + ) if total_requests > 0: # Hits + validations = avoided full downloads - cache_effectiveness = (_cache_stats["hits"] + _cache_stats["validations"]) / total_requests * 100 + cache_effectiveness = ( + (_cache_stats["hits"] + _cache_stats["validations"]) + / total_requests + * 100 + ) print(f" • Cache effectiveness: {cache_effectiveness:>6.1f}%") print() - + # Display rate limit information if available with _rate_limit_lock: if any(v is not None for v in _rate_limit_info.values()): print(f"{Colors.BOLD}API Rate Limit Status:{Colors.ENDC}") - + if _rate_limit_info["limit"] is not None: print(f" • Requests limit: {_rate_limit_info['limit']:>6,}") - + if _rate_limit_info["remaining"] is not None: remaining = _rate_limit_info["remaining"] limit = _rate_limit_info["limit"] - + # Color code based on remaining capacity if limit and limit > 0: pct = (remaining / limit) * 100 @@ -2910,19 +3033,20 @@ def make_col_separator(left, mid, right, horiz): color = Colors.WARNING # Yellow for caution else: color = Colors.GREEN # Green for healthy - print(f" • Requests remaining: {color}{remaining:>6,} ({pct:>5.1f}%){Colors.ENDC}") + print( + f" • Requests remaining: {color}{remaining:>6,} ({pct:>5.1f}%){Colors.ENDC}" + ) else: print(f" • Requests remaining: {remaining:>6,}") - + if _rate_limit_info["reset"] is not None: reset_time = time.strftime( - "%H:%M:%S", - time.localtime(_rate_limit_info["reset"]) + "%H:%M:%S", time.localtime(_rate_limit_info["reset"]) ) print(f" • Limit resets at: {reset_time}") - + print() - + # Save cache to disk after successful sync (non-fatal if it fails) if not args.dry_run: save_disk_cache() diff --git a/tests/test_fix_broken_validation.py b/tests/test_fix_broken_validation.py new file mode 100644 index 0000000..fedfc51 --- /dev/null +++ b/tests/test_fix_broken_validation.py @@ -0,0 +1,71 @@ +import unittest +from unittest.mock import MagicMock +import sys +import os + +# Add root to path to import main +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import main + +class TestFixBrokenValidation(unittest.TestCase): + def setUp(self): + self.original_log = main.log + main.log = MagicMock() + + def tearDown(self): + main.log = self.original_log + + def test_invalid_rule_type_in_rule_groups(self): + """ + Verify that validate_folder_data correctly identifies and rejects + non-dict rules inside rule_groups. + This tests the fix for the broken syntax block. + """ + # Data with invalid rule (string instead of dict) inside rule_groups + invalid_data = { + "group": {"group": "Test Group"}, + "rule_groups": [ + { + "rules": [ + {"PK": "valid.com"}, + "invalid_string_rule" # Should trigger the error + ] + } + ] + } + + result = main.validate_folder_data(invalid_data, "http://test.com") + + self.assertFalse(result, "Should return False for invalid rule type") + + # Verify the error log message + # We expect: "Invalid data from http://test.com: rule_groups[0].rules[1] must be an object." + main.log.error.assert_called() + args = main.log.error.call_args[0] + self.assertIn("rule_groups[0].rules[1] must be an object", args[0]) + + def test_invalid_rules_list_type(self): + """ + Verify that if 'rules' is not a list, it is caught. + This tests the fix for the malformed logging block above the loop. + """ + invalid_data = { + "group": {"group": "Test Group"}, + "rule_groups": [ + { + "rules": "not_a_list" # Should trigger error + } + ] + } + + result = main.validate_folder_data(invalid_data, "http://test.com") + self.assertFalse(result) + + main.log.error.assert_called() + args = main.log.error.call_args[0] + # We expect: "Invalid data from http://test.com: rule_groups[0].rules must be a list." + self.assertIn("rule_groups[0].rules must be a list", args[0]) + +if __name__ == '__main__': + unittest.main()