diff --git a/.jules/sentinel.md b/.jules/sentinel.md new file mode 100644 index 0000000..8b7d6c5 --- /dev/null +++ b/.jules/sentinel.md @@ -0,0 +1,4 @@ +## 2025-10-26 - Trusting API IDs +**Vulnerability:** IDOR / Path Traversal risk if API returns unsafe IDs. +**Learning:** Even "trusted" upstream APIs can be compromised or buggy. Trust nothing. +**Prevention:** Validate all resource IDs (PKs) from APIs against a strict whitelist before using them in local operations or downstream requests. diff --git a/.python-version b/.python-version deleted file mode 100644 index 3a4f41e..0000000 --- a/.python-version +++ /dev/null @@ -1 +0,0 @@ -3.13 \ No newline at end of file diff --git a/main.py b/main.py index 86792da..a6c3d7a 100644 --- a/main.py +++ b/main.py @@ -397,25 +397,34 @@ def extract_profile_id(text: str) -> str: return text -def is_valid_profile_id_format(profile_id: str) -> bool: - if not re.match(r"^[a-zA-Z0-9_-]+$", profile_id): - return False - if len(profile_id) > 64: +def validate_resource_id( + resource_id: str, type_name: str = "ID", log_errors: bool = True +) -> bool: + """ + Validates a resource ID (Profile ID, Folder ID/PK). + Allowed: Alphanumeric, hyphen, underscore. + Max Length: 64 + """ + if not resource_id: return False - return True + if len(resource_id) > 64: + if log_errors: + log.error(f"Invalid {type_name} length (max 64 chars)") + return False -def validate_profile_id(profile_id: str, log_errors: bool = True) -> bool: - if not is_valid_profile_id_format(profile_id): + if not re.match(r"^[a-zA-Z0-9_-]+$", resource_id): if log_errors: - if not re.match(r"^[a-zA-Z0-9_-]+$", profile_id): - log.error("Invalid profile ID format (contains unsafe characters)") - elif len(profile_id) > 64: - log.error("Invalid profile ID length (max 64 chars)") + log.error(f"Invalid {type_name} format (contains unsafe characters)") return False + return True +def validate_profile_id(profile_id: str, log_errors: bool = True) -> bool: + return validate_resource_id(profile_id, "profile ID", log_errors) + + def is_valid_rule(rule: str) -> bool: """ Validates that a rule is safe to use. @@ -436,15 +445,17 @@ def is_valid_rule(rule: str) -> bool: def is_valid_folder_name(name: str) -> bool: """ Validates folder name to prevent XSS and ensure printability. - Allowed: Anything printable except < > " ' ` + Allowed: Alphanumeric, space, hyphen, dot, underscore, parens, brackets, braces. + Max Length: 64 """ if not name or not name.strip() or not name.isprintable(): return False - # Block XSS and HTML injection characters - # Allow: ( ) [ ] { } for folder names (e.g. "Work (Private)") - dangerous_chars = set("<>\"'`") - if any(c in dangerous_chars for c in name): + if len(name) > 64: + return False + + # Whitelist: Alphanumeric, space, hyphen, dot, underscore, parens, brackets, braces + if not re.match(r"^[a-zA-Z0-9 \-_.()\[\]{}]+$", name): return False return True @@ -627,11 +638,18 @@ def list_existing_folders(client: httpx.Client, profile_id: str) -> Dict[str, st try: data = _api_get(client, f"{API_BASE}/{profile_id}/groups").json() folders = data.get("body", {}).get("groups", []) - return { - f["group"].strip(): f["PK"] - for f in folders - if f.get("group") and f.get("PK") - } + result = {} + for f in folders: + name = f.get("group") + pk = f.get("PK") + if name and pk: + if validate_resource_id(pk, "Folder PK", log_errors=False): + result[name.strip()] = pk + else: + log.warning( + f"Skipping folder '{sanitize_for_log(name)}' with unsafe PK: {sanitize_for_log(pk)}" + ) + return result except (httpx.HTTPError, KeyError) as e: log.error(f"Failed to list existing folders: {sanitize_for_log(e)}") return {} @@ -759,6 +777,10 @@ def _validate_and_fetch(url: str): def delete_folder( client: httpx.Client, profile_id: str, name: str, folder_id: str ) -> bool: + if not validate_resource_id(folder_id, "Folder ID to delete"): + log.error(f"Aborting deletion of {sanitize_for_log(name)}: Unsafe Folder ID") + return False + try: _api_delete(client, f"{API_BASE}/{profile_id}/groups/{folder_id}") log.info("Deleted folder %s (ID %s)", sanitize_for_log(name), folder_id) @@ -793,21 +815,34 @@ def create_folder( # Check if it returned a single group object if isinstance(body, dict) and "group" in body and "PK" in body["group"]: pk = body["group"]["PK"] - log.info( - "Created folder %s (ID %s) [Direct]", sanitize_for_log(name), pk - ) - return str(pk) + if validate_resource_id(str(pk), "New Folder PK"): + log.info( + "Created folder %s (ID %s) [Direct]", sanitize_for_log(name), pk + ) + return str(pk) + else: + log.error( + f"API returned unsafe PK for folder {sanitize_for_log(name)}: {sanitize_for_log(pk)}" + ) + return None # Check if it returned a list containing our group if isinstance(body, dict) and "groups" in body: for grp in body["groups"]: if grp.get("group") == name: - log.info( - "Created folder %s (ID %s) [Direct]", - sanitize_for_log(name), - grp["PK"], - ) - return str(grp["PK"]) + pk = grp["PK"] + if validate_resource_id(str(pk), "New Folder PK"): + log.info( + "Created folder %s (ID %s) [Direct]", + sanitize_for_log(name), + pk, + ) + return str(pk) + else: + log.error( + f"API returned unsafe PK for folder {sanitize_for_log(name)}: {sanitize_for_log(pk)}" + ) + return None except Exception as e: log.debug(f"Could not extract ID from POST response: {e}") @@ -819,12 +854,19 @@ def create_folder( for grp in groups: if grp["group"].strip() == name.strip(): - log.info( - "Created folder %s (ID %s) [Polled]", - sanitize_for_log(name), - grp["PK"], - ) - return str(grp["PK"]) + pk = grp["PK"] + if validate_resource_id(str(pk), "Polled Folder PK"): + log.info( + "Created folder %s (ID %s) [Polled]", + sanitize_for_log(name), + pk, + ) + return str(pk) + else: + log.error( + f"API returned unsafe PK for folder {sanitize_for_log(name)}: {sanitize_for_log(pk)}" + ) + return None except Exception as e: log.warning(f"Error fetching groups on attempt {attempt}: {e}") diff --git a/tests/test_id_validation.py b/tests/test_id_validation.py new file mode 100644 index 0000000..b73becf --- /dev/null +++ b/tests/test_id_validation.py @@ -0,0 +1,91 @@ +import pytest +from unittest.mock import MagicMock +import main + +def test_validate_resource_id_valid(): + """Test valid resource IDs.""" + valid_ids = [ + "123", + "abc", + "ABC", + "Folder_1", + "Profile-2", + "valid_id_123-ABC", + "a" * 64, # Max length + ] + for rid in valid_ids: + assert main.validate_resource_id(rid) is True, f"Should accept {rid}" + +def test_validate_resource_id_invalid_format(): + """Test invalid formats (unsafe chars).""" + invalid_ids = [ + "id with space", + "id/slash", + "id\\backslash", + "id.dot", + "id:colon", + "id