Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .jules/sentinel.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,11 @@
**Vulnerability:** `sanitize_for_log` in `main.py` was stripping quotes from `repr()` output for aesthetic reasons, inadvertently exposing strings starting with `=`, `+`, `-`, `@` to formula injection if logs are viewed in Excel.
**Learning:** Security controls (like `repr()`) can be weakened by subsequent "cleanup" steps. Always consider downstream consumption of logs (e.g., CSV export).
**Prevention:** Check for dangerous prefixes before stripping quotes. If detected, retain the quotes to force string literal interpretation.

## 2026-05-23 - Path Traversal via API Response (ID Validation)

**Vulnerability:** The application trusted "PK" (ID) fields from the external API and used them directly in URL construction for deletion. An attacker compromising the API or MITM could return a malicious ID like `../../etc/passwd` to trigger path traversal or other injection attacks.

**Learning:** Even trusted APIs should be treated as untrusted sources for critical identifiers used in system operations or path construction.

**Prevention:** Whitelist valid characters for all identifiers (e.g. `^[a-zA-Z0-9_.-]+$`) and validate them immediately upon receipt, before any use.
57 changes: 45 additions & 12 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@

# Pre-compiled regex patterns for hot-path validation (>2x speedup on 10k+ items)
PROFILE_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]+$")
# Folder IDs (PK) are typically alphanumeric but can contain other safe chars.
# We whitelist to prevent path traversal and injection.
FOLDER_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_.-]+$")
RULE_PATTERN = re.compile(r"^[a-zA-Z0-9.\-_:*/@]+$")

# Parallel processing configuration
Expand Down Expand Up @@ -737,6 +740,17 @@
return True


def validate_folder_id(folder_id: str, log_errors: bool = True) -> bool:
"""Validates folder ID (PK) format to prevent path traversal."""
if not folder_id:
return False
if folder_id in (".", "..") or not FOLDER_ID_PATTERN.match(folder_id):
if log_errors:
log.error(f"Invalid folder ID format: {sanitize_for_log(folder_id)}")
return False
return True


def is_valid_rule(rule: str) -> bool:
"""
Validates that a rule is safe to use.
Expand Down Expand Up @@ -1105,11 +1119,14 @@
try:
data = _api_get(client, f"{API_BASE}/{profile_id}/groups").json()
folders = data.get("body", {}).get("groups", [])
return {
f["group"].strip(): f["PK"]
for f in folders
if f.get("group") and f.get("PK")
}
result = {}
for f in folders:
if not f.get("group") or not f.get("PK"):
continue
pk = str(f["PK"])
if validate_folder_id(pk):
result[f["group"].strip()] = pk
return result
except (httpx.HTTPError, KeyError) as e:
log.error(f"Failed to list existing folders: {sanitize_for_log(e)}")
return {}
Expand Down Expand Up @@ -1173,7 +1190,12 @@
# Skip entries with empty or None values for required fields
if not name or not pk:
continue
result[str(name).strip()] = str(pk)

pk_str = str(pk)
if not validate_folder_id(pk_str):
continue

result[str(name).strip()] = pk_str

return result
except (ValueError, TypeError, AttributeError) as err:
Expand Down Expand Up @@ -1373,87 +1395,98 @@
return False


def create_folder(
client: httpx.Client, profile_id: str, name: str, do: int, status: int
) -> Optional[str]:
"""
Create a new folder and return its ID.
Attempts to read ID from response first, then falls back to polling.
"""
try:
# 1. Send the Create Request
response = _api_post(
client,
f"{API_BASE}/{profile_id}/groups",
data={"name": name, "do": do, "status": status},
)

# OPTIMIZATION: Try to grab ID directly from response to avoid the wait loop
try:
resp_data = response.json()
body = resp_data.get("body", {})

# Check if it returned a single group object
if isinstance(body, dict) and "group" in body and "PK" in body["group"]:
pk = body["group"]["PK"]
pk = str(body["group"]["PK"])
if not validate_folder_id(pk, log_errors=False):
log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}")
return None
log.info(
"Created folder %s (ID %s) [Direct]",
sanitize_for_log(name),
sanitize_for_log(pk),
)
return str(pk)
return pk

# Check if it returned a list containing our group
if isinstance(body, dict) and "groups" in body:
for grp in body["groups"]:
if grp.get("group") == name:
pk = str(grp["PK"])
if not validate_folder_id(pk, log_errors=False):
log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}")
continue
log.info(
"Created folder %s (ID %s) [Direct]",
sanitize_for_log(name),
sanitize_for_log(grp["PK"]),
sanitize_for_log(pk),
)
return str(grp["PK"])
return pk
except Exception as e:
log.debug(
f"Could not extract ID from POST response: " f"{sanitize_for_log(e)}"
)

# 2. Fallback: Poll for the new folder (The Robust Retry Logic)
for attempt in range(MAX_RETRIES + 1):
try:
data = _api_get(client, f"{API_BASE}/{profile_id}/groups").json()
groups = data.get("body", {}).get("groups", [])

for grp in groups:
if grp["group"].strip() == name.strip():
pk = str(grp["PK"])
if not validate_folder_id(pk, log_errors=False):
log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}")
return None
log.info(
"Created folder %s (ID %s) [Polled]",
sanitize_for_log(name),
sanitize_for_log(grp["PK"]),
sanitize_for_log(pk),
)
return str(grp["PK"])
return pk
except Exception as e:
log.warning(
f"Error fetching groups on attempt {attempt}: {sanitize_for_log(e)}"
)

if attempt < MAX_RETRIES:
wait_time = FOLDER_CREATION_DELAY * (attempt + 1)
log.info(
f"Folder '{sanitize_for_log(name)}' not found yet. Retrying in {wait_time}s..."
)
time.sleep(wait_time)

log.error(
f"Folder {sanitize_for_log(name)} was not found after creation and retries."
)
return None

except (httpx.HTTPError, KeyError) as e:
log.error(
f"Failed to create folder {sanitize_for_log(name)}: {sanitize_for_log(e)}"
)
return None

Check notice on line 1489 in main.py

View check run for this annotation

codefactor.io / CodeFactor

main.py#L1398-L1489

Complex Method


def push_rules(
Expand Down
59 changes: 59 additions & 0 deletions tests/test_folder_id_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import importlib
import sys
from unittest.mock import MagicMock, patch

def reload_main_with_env(monkeypatch):
monkeypatch.delenv("NO_COLOR", raising=False)
with patch("sys.stderr") as mock_stderr, patch("sys.stdout") as mock_stdout:
mock_stderr.isatty.return_value = True
mock_stdout.isatty.return_value = True

module = sys.modules.get("main")
if module is None:
module = importlib.import_module("main")

importlib.reload(module)
return module


def test_verify_access_and_get_folders_filters_malicious_ids(monkeypatch):
"""
Verify that verify_access_and_get_folders filters out malicious Folder IDs
containing path traversal characters (../).
"""
m = reload_main_with_env(monkeypatch)
mock_client = MagicMock()

# Malicious Folder ID with path traversal
malicious_id = "../../etc/passwd"
# Malicious Folder ID with dangerous characters
malicious_id_2 = "foo;rm -rf /"

mock_response = MagicMock()
mock_response.json.return_value = {
"body": {
"groups": [
{"group": "Safe Folder", "PK": "safe_id_123"},
{"group": "Safe Folder 2", "PK": "safe-id-456_789"},
{"group": "Malicious Folder", "PK": malicious_id},
{"group": "Malicious Folder 2", "PK": malicious_id_2}
]
}
}
mock_client.get.return_value = mock_response
mock_response.raise_for_status.return_value = None

# Function should filter out malicious IDs
result = m.verify_access_and_get_folders(mock_client, "valid_profile")

assert result is not None

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

# Check that valid IDs are preserved
assert "Safe Folder" in result

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
assert result["Safe Folder"] == "safe_id_123"

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
assert "Safe Folder 2" in result

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
assert result["Safe Folder 2"] == "safe-id-456_789"

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

# Check that malicious IDs are removed
assert "Malicious Folder" not in result

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
assert "Malicious Folder 2" not in result

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
3 changes: 3 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading