Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .jules/sentinel.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@
**Learning:** Path-based checks (`os.path.islink`, `os.stat`) followed by path-based operations (`os.chmod`) are inherently racy. File descriptors are the only way to pin the resource.

**Prevention:** Use `os.open` with `O_NOFOLLOW` to open the file securely (failing if it's a symlink). Then use file-descriptor-based operations: `os.fstat(fd)` to check modes and `os.fchmod(fd, mode)` to modify permissions. This ensures operations apply to the exact inode opened.

## 2026-02-14 - CSV Injection via Log Sanitization
**Vulnerability:** `sanitize_for_log` in `main.py` was stripping quotes from `repr()` output for aesthetic reasons, inadvertently exposing strings starting with `=`, `+`, `-`, `@` to formula injection if logs are viewed in Excel.
**Learning:** Security controls (like `repr()`) can be weakened by subsequent "cleanup" steps. Always consider downstream consumption of logs (e.g., CSV export).
**Prevention:** Check for dangerous prefixes before stripping quotes. If detected, retain the quotes to force string literal interpretation.
7 changes: 7 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@
# repr() safely escapes control characters (e.g., \n -> \\n, \x1b -> \\x1b)
# This prevents log injection and terminal hijacking.
safe = repr(s)

# Security: Prevent CSV Injection (Formula Injection)
# If the string starts with =, +, -, or @, we keep the quotes from repr()
# to force spreadsheet software to treat it as a string literal.
if s and s.startswith(("=", "+", "-", "@")):
return safe

if len(safe) >= 2 and safe[0] == safe[-1] and safe[0] in ("'", '"'):
return safe[1:-1]
return safe
Expand Down Expand Up @@ -807,192 +814,192 @@
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
)


def _retry_request(request_func, max_retries=MAX_RETRIES, delay=RETRY_DELAY):
for attempt in range(max_retries):
try:
response = request_func()
response.raise_for_status()
return response
except (httpx.HTTPError, httpx.TimeoutException) as e:
# Security Enhancement: Do not retry client errors (4xx) except 429 (Too Many Requests).
# Retrying 4xx errors is inefficient and can trigger security alerts or rate limits.
if isinstance(e, httpx.HTTPStatusError):
code = e.response.status_code
if 400 <= code < 500 and code != 429:
if hasattr(e, "response") and e.response is not None:
log.debug(
f"Response content: {sanitize_for_log(e.response.text)}"
)
raise

if attempt == max_retries - 1:
if hasattr(e, "response") and e.response is not None:
log.debug(f"Response content: {sanitize_for_log(e.response.text)}")
raise
wait_time = delay * (2**attempt)
log.warning(
f"Request failed (attempt {attempt + 1}/{max_retries}): "
f"{sanitize_for_log(e)}. Retrying in {wait_time}s..."
)
time.sleep(wait_time)


def _gh_get(url: str) -> Dict:
"""
Fetch blocklist data from URL with HTTP cache header support.

CACHING STRATEGY:
1. Check in-memory cache first (fastest)
2. Check disk cache and send conditional request (If-None-Match/If-Modified-Since)
3. If 304 Not Modified: reuse cached data (cache validation)
4. If 200 OK: download new data and update cache

SECURITY: Validates data structure regardless of cache source
"""
global _cache_stats

# First check: Quick check without holding lock for long
with _cache_lock:
if url in _cache:
_cache_stats["hits"] += 1
return _cache[url]

# Check disk cache for conditional request headers
headers = {}
cached_entry = _disk_cache.get(url)
if cached_entry:
# Send conditional request using cached ETag/Last-Modified
# Server returns 304 if content hasn't changed
# NOTE: Cached values may be None if the server didn't send these headers.
# httpx requires header values to be str/bytes, so we only add headers
# when the cached value is truthy.
etag = cached_entry.get("etag")
if etag:
headers["If-None-Match"] = etag
last_modified = cached_entry.get("last_modified")
if last_modified:
headers["If-Modified-Since"] = last_modified

# Fetch data (or validate cache)
# Explicitly let HTTPError propagate (no need to catch just to re-raise)
try:
with _gh.stream("GET", url, headers=headers) as r:
# Handle 304 Not Modified - cached data is still valid
if r.status_code == 304:
if cached_entry and "data" in cached_entry:
log.debug(f"Cache validated (304) for {sanitize_for_log(url)}")
_cache_stats["validations"] += 1

# Update in-memory cache with validated data
data = cached_entry["data"]
with _cache_lock:
_cache[url] = data

# Update timestamp in disk cache to track last validation
cached_entry["last_validated"] = time.time()
return data
else:
# Shouldn't happen, but handle gracefully
log.warning(f"Got 304 but no cached data for {sanitize_for_log(url)}, re-fetching")
_cache_stats["errors"] += 1
# Close the original streaming response before retrying
r.close()
# Retry without conditional headers using streaming again so that
# MAX_RESPONSE_SIZE and related protections still apply.
headers = {}
with _gh.stream("GET", url, headers=headers) as r_retry:
r_retry.raise_for_status()

# 1. Check Content-Length header if present
cl = r_retry.headers.get("Content-Length")
if cl:
try:
if int(cl) > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"({int(cl) / (1024 * 1024):.2f} MB)"
)
except ValueError as e:
# Only catch the conversion error, let the size error propagate
if "Response too large" in str(e):
raise e
log.warning(
f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. "
"Falling back to streaming size check."
)

# 2. Stream and check actual size
chunks = []
current_size = 0
for chunk in r_retry.iter_bytes():
current_size += len(chunk)
if current_size > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)"
)
chunks.append(chunk)

try:
data = json.loads(b"".join(chunks))
except json.JSONDecodeError as e:
raise ValueError(
f"Invalid JSON response from {sanitize_for_log(url)}"
) from e

# Store cache headers for future conditional requests
# ETag is preferred over Last-Modified (more reliable)
etag = r_retry.headers.get("ETag")
last_modified = r_retry.headers.get("Last-Modified")

# Update disk cache with new data and headers
_disk_cache[url] = {
"data": data,
"etag": etag,
"last_modified": last_modified,
"fetched_at": time.time(),
"last_validated": time.time(),
}

_cache_stats["misses"] += 1
return data

r.raise_for_status()

# 1. Check Content-Length header if present
cl = r.headers.get("Content-Length")
if cl:
try:
if int(cl) > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"({int(cl) / (1024 * 1024):.2f} MB)"
)
except ValueError as e:
# Only catch the conversion error, let the size error propagate
if "Response too large" in str(e):
raise e
log.warning(
f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. "
"Falling back to streaming size check."
)

# 2. Stream and check actual size
chunks = []
current_size = 0
for chunk in r.iter_bytes():
current_size += len(chunk)
if current_size > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)"
)
chunks.append(chunk)

try:

Check notice on line 1002 in main.py

View check run for this annotation

codefactor.io / CodeFactor

main.py#L817-L1002

Complex Method
data = json.loads(b"".join(chunks))
except json.JSONDecodeError as e:
raise ValueError(
Expand Down
66 changes: 66 additions & 0 deletions tests/test_csv_injection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@

import unittest
import sys
import os

# Add root to path to import main
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import main

class TestCSVInjection(unittest.TestCase):
def test_csv_injection_prevention(self):
"""
Verify that sanitize_for_log correctly keeps quotes around strings
that start with characters known to trigger formula execution in spreadsheets
(CSV Injection).
"""
# Test cases for CSV injection characters
dangerous_inputs = [
"=cmd|' /C calc'!A0",
"+cmd|' /C calc'!A0",
"-cmd|' /C calc'!A0",
"@cmd|' /C calc'!A0",
]

for inp in dangerous_inputs:
sanitized = main.sanitize_for_log(inp)
# Should keep quotes (repr adds them)
# repr("=...") -> "'=...'"
# So sanitized should start with ' or "
self.assertTrue(sanitized.startswith("'") or sanitized.startswith('"'),
f"Input '{inp}' should be quoted to prevent CSV injection. Got: {sanitized}")

# Should contain the input
self.assertIn(inp, sanitized)

def test_normal_string_behavior(self):
"""
Verify that normal strings (not starting with =, +, -, @) still have
their outer quotes stripped, preserving existing behavior.
"""
safe_inputs = [
"NormalString",
"Folder Name",
"12345",
"<script>alert(1)</script>", # XSS attempt (handled by repr escaping but checked here for quote stripping)
]

for inp in safe_inputs:
sanitized = main.sanitize_for_log(inp)
# Should NOT start with quote (unless repr escaped something inside and used different quotes, but for simple strings it shouldn't)
# Actually, repr("NormalString") is 'NormalString'. Stripped -> NormalString.
# repr("Folder Name") is 'Folder Name'. Stripped -> Folder Name.
self.assertFalse(sanitized.startswith(("'", '"')),
f"Input '{inp}' should have outer quotes stripped. Got: {sanitized}")

# For strict check:
self.assertEqual(sanitized, repr(inp)[1:-1])

def test_empty_input(self):
"""Verify empty input handling."""
self.assertEqual(main.sanitize_for_log(""), "")
self.assertEqual(main.sanitize_for_log(None), "None")

if __name__ == '__main__':
unittest.main()
44 changes: 22 additions & 22 deletions tests/test_push_rules_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,31 +98,31 @@ def test_push_rules_multi_batch(self, mock_executor, mock_as_completed):
# This should ALWAYS be True
self.assertTrue(mock_executor.called, "ThreadPoolExecutor should be called for multi-batch")

@patch("main.is_valid_rule")
def test_push_rules_skips_validation_for_existing(self, mock_is_valid):
def test_push_rules_skips_validation_for_existing(self):
"""
Test that is_valid_rule is NOT called for rules that are already in existing_rules.
"""
mock_is_valid.return_value = True
hostnames = ["h1", "h2"]
# h1 is already known, h2 is new
existing_rules = {"h1"}

main.push_rules(
self.profile_id,
self.folder_name,
self.folder_id,
self.do,
self.status,
hostnames,
existing_rules,
self.client
)

# h1 is in existing_rules, so we should skip validation for it.
# h2 is NOT in existing_rules, so we should validate it.
# So is_valid_rule should be called EXACTLY once, with "h2".
mock_is_valid.assert_called_once_with("h2")
with patch.object(main, "is_valid_rule") as mock_is_valid:
mock_is_valid.return_value = True
hostnames = ["h1", "h2"]
# h1 is already known, h2 is new
existing_rules = {"h1"}

main.push_rules(
self.profile_id,
self.folder_name,
self.folder_id,
self.do,
self.status,
hostnames,
existing_rules,
self.client
)

# h1 is in existing_rules, so we should skip validation for it.
# h2 is NOT in existing_rules, so we should validate it.
# So is_valid_rule should be called EXACTLY once, with "h2".
mock_is_valid.assert_called_once_with("h2")
Comment on lines +101 to +125
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test refactoring appears unrelated to the CSV injection fix described in the PR title and description. The change from decorator-based @patch("main.is_valid_rule") to context manager with patch.object(main, "is_valid_rule") is a valid refactoring, but it's unclear why it's included in this PR focused on security fixes. Consider moving unrelated refactorings to a separate PR to maintain clear separation of concerns and make reviews easier.

Copilot uses AI. Check for mistakes.

if __name__ == '__main__':
unittest.main()
Loading