Skip to content
Merged
6 changes: 6 additions & 0 deletions .jules/sentinel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
## 2024-05-23 - [Input Validation and Syntax Fix]
**Vulnerability:** The `create_folder` function contained a syntax error (positional arg after keyword arg) preventing execution. Additionally, `folder_url` and `profile_id` lacked validation, potentially allowing SSRF (via non-HTTPS URLs) or path traversal/injection (via crafted profile IDs).
**Learning:** Even simple scripts need robust input validation, especially when inputs are used to construct URLs or file paths. A syntax error can mask security issues by preventing the code from running in the first place.
**Prevention:**
1. Always validate external inputs against a strict allowlist (e.g., regex for IDs, protocol check for URLs).
2. Use linters/static analysis to catch syntax errors before runtime.
72 changes: 69 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
import time
import concurrent.futures
import re
from typing import Dict, List, Optional, Any, Set, Sequence

import httpx
Expand Down Expand Up @@ -98,6 +99,22 @@ def _api_client() -> httpx.Client:
_cache: Dict[str, Dict] = {}


def validate_folder_url(url: str) -> bool:
"""Validate that the folder URL is safe (HTTPS only)."""
if not url.startswith("https://"):
log.warning(f"Skipping unsafe or invalid URL: {url}")
return False
return True
Comment on lines +102 to +107
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validate_folder_url function only checks if the URL starts with "https://" but doesn't validate the overall URL structure. This could still allow malformed URLs to pass validation. Consider using urllib.parse.urlparse to validate the complete URL structure, checking that the scheme is 'https' and that the netloc (domain) is present and valid.

Copilot uses AI. Check for mistakes.


def validate_profile_id(profile_id: str) -> bool:
"""Validate that the profile ID contains only safe characters."""
if not re.match(r"^[a-zA-Z0-9_-]+$", profile_id):
log.error(f"Invalid profile ID format: {profile_id}")
return False
return True


def _api_get(client: httpx.Client, url: str) -> httpx.Response:
"""GET helper for Control-D API with retries."""
return _retry_request(lambda: client.get(url))
Expand Down Expand Up @@ -335,6 +352,17 @@ def sync_profile(
# Fetch all folder data first
folder_data_list = []

# Validate URLs first
valid_urls = [url for url in folder_urls if validate_folder_url(url)]

invalid_count = len(folder_urls) - len(valid_urls)
if invalid_count > 0:
log.warning(f"Filtered out {invalid_count} invalid URL(s)")

if not valid_urls:
log.error("No valid folder URLs to fetch")
return False

def safe_fetch(url):
try:
return fetch_folder_data(url)
Expand All @@ -343,9 +371,9 @@ def safe_fetch(url):
return None

# Fetch folder data in parallel to speed up startup
max_workers = min(10, len(folder_urls)) if folder_urls else 1
max_workers = min(10, len(valid_urls))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
results = executor.map(safe_fetch, folder_urls)
results = executor.map(safe_fetch, valid_urls)

folder_data_list = [r for r in results if r is not None]

Expand Down Expand Up @@ -467,16 +495,54 @@ def main():

plan: List[Dict[str, Any]] = []
success_count = 0
sync_results = []

for profile_id in (profile_ids or ["dry-run-placeholder"]):
# Skip validation for dry-run placeholder
if profile_id != "dry-run-placeholder" and not validate_profile_id(profile_id):
continue

log.info("Starting sync for profile %s", profile_id)
if sync_profile(profile_id, folder_urls, dry_run=args.dry_run, no_delete=args.no_delete, plan_accumulator=plan):
status = sync_profile(
profile_id,
folder_urls,
dry_run=args.dry_run,
no_delete=args.no_delete,
plan_accumulator=plan,
)

if status:
success_count += 1

# Calculate stats for this profile from the plan
entry = next((p for p in plan if p["profile"] == profile_id), None)
folder_count = len(entry["folders"]) if entry else 0
rule_count = sum(f["rules"] for f in entry["folders"]) if entry else 0

sync_results.append({
"profile": profile_id,
"folders": folder_count,
"rules": rule_count,
"status": "✅ Success" if status else "❌ Failed",
})

if args.plan_json:
with open(args.plan_json, "w", encoding="utf-8") as f:
json.dump(plan, f, indent=2)
log.info("Plan written to %s", args.plan_json)

# Print Summary Table
print("\n" + "=" * 80)
print(f"{'SYNC SUMMARY':^80}")
print("=" * 80)
print(f"{'Profile ID':<25} | {'Folders':>10} | {'Rules':>10} | {'Status':<15}")
print("-" * 80)
for res in sync_results:
print(
f"{res['profile']:<25} | {res['folders']:>10} | {res['rules']:>10,} | {res['status']:<15}"
)
print("=" * 80 + "\n")

total = len(profile_ids or ["dry-run-placeholder"])
log.info(f"All profiles processed: {success_count}/{total} successful")
exit(0 if success_count == total else 1)
Expand Down
Loading