diff --git a/.jules/sentinel.md b/.jules/sentinel.md index dd57a2a..b6ec121 100644 --- a/.jules/sentinel.md +++ b/.jules/sentinel.md @@ -45,3 +45,11 @@ **Vulnerability:** `sanitize_for_log` in `main.py` was stripping quotes from `repr()` output for aesthetic reasons, inadvertently exposing strings starting with `=`, `+`, `-`, `@` to formula injection if logs are viewed in Excel. **Learning:** Security controls (like `repr()`) can be weakened by subsequent "cleanup" steps. Always consider downstream consumption of logs (e.g., CSV export). **Prevention:** Check for dangerous prefixes before stripping quotes. If detected, retain the quotes to force string literal interpretation. + +## 2026-05-23 - Path Traversal via API Response (ID Validation) + +**Vulnerability:** The application trusted "PK" (ID) fields from the external API and used them directly in URL construction for deletion. An attacker compromising the API or MITM could return a malicious ID like `../../etc/passwd` to trigger path traversal or other injection attacks. + +**Learning:** Even trusted APIs should be treated as untrusted sources for critical identifiers used in system operations or path construction. + +**Prevention:** Whitelist valid characters for all identifiers (e.g. `^[a-zA-Z0-9_.-]+$`) and validate them immediately upon receipt, before any use. diff --git a/main.py b/main.py index 1f4f047..ea75ce3 100644 --- a/main.py +++ b/main.py @@ -183,6 +183,9 @@ def check_env_permissions(env_path: str = ".env") -> None: # Pre-compiled regex patterns for hot-path validation (>2x speedup on 10k+ items) PROFILE_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]+$") +# Folder IDs (PK) are typically alphanumeric but can contain other safe chars. +# We whitelist to prevent path traversal and injection. +FOLDER_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_.-]+$") RULE_PATTERN = re.compile(r"^[a-zA-Z0-9.\-_:*/@]+$") # Parallel processing configuration @@ -737,6 +740,17 @@ def validate_profile_id(profile_id: str, log_errors: bool = True) -> bool: return True +def validate_folder_id(folder_id: str, log_errors: bool = True) -> bool: + """Validates folder ID (PK) format to prevent path traversal.""" + if not folder_id: + return False + if folder_id in (".", "..") or not FOLDER_ID_PATTERN.match(folder_id): + if log_errors: + log.error(f"Invalid folder ID format: {sanitize_for_log(folder_id)}") + return False + return True + + def is_valid_rule(rule: str) -> bool: """ Validates that a rule is safe to use. @@ -1105,11 +1119,14 @@ def list_existing_folders(client: httpx.Client, profile_id: str) -> Dict[str, st try: data = _api_get(client, f"{API_BASE}/{profile_id}/groups").json() folders = data.get("body", {}).get("groups", []) - return { - f["group"].strip(): f["PK"] - for f in folders - if f.get("group") and f.get("PK") - } + result = {} + for f in folders: + if not f.get("group") or not f.get("PK"): + continue + pk = str(f["PK"]) + if validate_folder_id(pk): + result[f["group"].strip()] = pk + return result except (httpx.HTTPError, KeyError) as e: log.error(f"Failed to list existing folders: {sanitize_for_log(e)}") return {} @@ -1173,7 +1190,12 @@ def verify_access_and_get_folders( # Skip entries with empty or None values for required fields if not name or not pk: continue - result[str(name).strip()] = str(pk) + + pk_str = str(pk) + if not validate_folder_id(pk_str): + continue + + result[str(name).strip()] = pk_str return result except (ValueError, TypeError, AttributeError) as err: @@ -1395,24 +1417,31 @@ def create_folder( # Check if it returned a single group object if isinstance(body, dict) and "group" in body and "PK" in body["group"]: - pk = body["group"]["PK"] + pk = str(body["group"]["PK"]) + if not validate_folder_id(pk, log_errors=False): + log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}") + return None log.info( "Created folder %s (ID %s) [Direct]", sanitize_for_log(name), sanitize_for_log(pk), ) - return str(pk) + return pk # Check if it returned a list containing our group if isinstance(body, dict) and "groups" in body: for grp in body["groups"]: if grp.get("group") == name: + pk = str(grp["PK"]) + if not validate_folder_id(pk, log_errors=False): + log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}") + continue log.info( "Created folder %s (ID %s) [Direct]", sanitize_for_log(name), - sanitize_for_log(grp["PK"]), + sanitize_for_log(pk), ) - return str(grp["PK"]) + return pk except Exception as e: log.debug( f"Could not extract ID from POST response: " f"{sanitize_for_log(e)}" @@ -1426,12 +1455,16 @@ def create_folder( for grp in groups: if grp["group"].strip() == name.strip(): + pk = str(grp["PK"]) + if not validate_folder_id(pk, log_errors=False): + log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}") + return None log.info( "Created folder %s (ID %s) [Polled]", sanitize_for_log(name), - sanitize_for_log(grp["PK"]), + sanitize_for_log(pk), ) - return str(grp["PK"]) + return pk except Exception as e: log.warning( f"Error fetching groups on attempt {attempt}: {sanitize_for_log(e)}" diff --git a/tests/test_folder_id_validation.py b/tests/test_folder_id_validation.py new file mode 100644 index 0000000..26a2f7d --- /dev/null +++ b/tests/test_folder_id_validation.py @@ -0,0 +1,59 @@ +import importlib +import sys +from unittest.mock import MagicMock, patch + +def reload_main_with_env(monkeypatch): + monkeypatch.delenv("NO_COLOR", raising=False) + with patch("sys.stderr") as mock_stderr, patch("sys.stdout") as mock_stdout: + mock_stderr.isatty.return_value = True + mock_stdout.isatty.return_value = True + + module = sys.modules.get("main") + if module is None: + module = importlib.import_module("main") + + importlib.reload(module) + return module + + +def test_verify_access_and_get_folders_filters_malicious_ids(monkeypatch): + """ + Verify that verify_access_and_get_folders filters out malicious Folder IDs + containing path traversal characters (../). + """ + m = reload_main_with_env(monkeypatch) + mock_client = MagicMock() + + # Malicious Folder ID with path traversal + malicious_id = "../../etc/passwd" + # Malicious Folder ID with dangerous characters + malicious_id_2 = "foo;rm -rf /" + + mock_response = MagicMock() + mock_response.json.return_value = { + "body": { + "groups": [ + {"group": "Safe Folder", "PK": "safe_id_123"}, + {"group": "Safe Folder 2", "PK": "safe-id-456_789"}, + {"group": "Malicious Folder", "PK": malicious_id}, + {"group": "Malicious Folder 2", "PK": malicious_id_2} + ] + } + } + mock_client.get.return_value = mock_response + mock_response.raise_for_status.return_value = None + + # Function should filter out malicious IDs + result = m.verify_access_and_get_folders(mock_client, "valid_profile") + + assert result is not None + + # Check that valid IDs are preserved + assert "Safe Folder" in result + assert result["Safe Folder"] == "safe_id_123" + assert "Safe Folder 2" in result + assert result["Safe Folder 2"] == "safe-id-456_789" + + # Check that malicious IDs are removed + assert "Malicious Folder" not in result + assert "Malicious Folder 2" not in result diff --git a/uv.lock b/uv.lock index 53e9b38..4b9c404 100644 --- a/uv.lock +++ b/uv.lock @@ -59,6 +59,9 @@ requires-dist = [ ] provides-extras = ["dev"] +[package.metadata.requires-dev] +dev = [] + [[package]] name = "execnet" version = "2.1.2"