Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
**Learning:** When downloading large files (e.g., blocklists), the default chunk size of HTTP libraries might be small, leading to excessive loop iterations and list operations. Increasing the buffer size (e.g., to 16KB) reduces CPU overhead during I/O-bound operations.
**Action:** When using `iter_bytes()` or similar streaming methods for large resources, explicitly set a larger `chunk_size` (e.g., 16384) to improve throughput and reduce CPU usage.

## 2026-02-17 - [Cache DNS Lookups by Hostname]
**Learning:** When validating multiple URLs from the same host (e.g., githubusercontent), caching based on the full URL still triggers redundant DNS lookups for each unique path. Extracting hostname validation into a separate `@lru_cache` function avoids repeated blocking `getaddrinfo` calls for the same domain.
**Action:** Identify expensive validation steps (like DNS) that depend on a subset of the input (hostname vs full URL) and cache them independently.
## 2024-03-24 - [Avoid Regex on Simple Strings]
**Learning:** Running complex regex substitutions on every log message (for sanitization) introduces measurable CPU overhead, especially when most strings don't contain sensitive patterns. Simple string checks (`in`) are orders of magnitude faster than regex execution.
**Action:** Add early return checks (e.g., `if "://" in s:`) before invoking expensive regex operations in hot paths like logging or string sanitization.
Expand Down
85 changes: 48 additions & 37 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,12 +894,57 @@
log.debug(f"Failed to parse rate limit headers: {e}")


@lru_cache(maxsize=128)
def validate_hostname(hostname: str) -> bool:
"""
Validates a hostname (DNS resolution and IP checks).
Cached to prevent redundant DNS lookups for the same host across different URLs.
"""
# Check for potentially malicious hostnames
if hostname.lower() in ("localhost", "127.0.0.1", "::1"):
log.warning(
f"Skipping unsafe hostname (localhost detected): {sanitize_for_log(hostname)}"
)
return False

try:
ip = ipaddress.ip_address(hostname)
if not ip.is_global or ip.is_multicast:
log.warning(f"Skipping unsafe IP: {sanitize_for_log(hostname)}")
return False
return True
except ValueError:
# Not an IP literal, it's a domain. Resolve and check IPs.
try:
# Resolve hostname to IPs (IPv4 and IPv6)
# We filter for AF_INET/AF_INET6 to ensure we get IP addresses
addr_info = socket.getaddrinfo(hostname, None, proto=socket.IPPROTO_TCP)
for res in addr_info:
# res is (family, type, proto, canonname, sockaddr)
# sockaddr is (address, port) for AF_INET/AF_INET6
ip_str = res[4][0]
ip = ipaddress.ip_address(ip_str)
if not ip.is_global or ip.is_multicast:
log.warning(
f"Skipping unsafe hostname {sanitize_for_log(hostname)} (resolves to non-global/multicast IP {ip})"
)
return False
except (socket.gaierror, ValueError, OSError) as e:
log.warning(
f"Failed to resolve/validate domain {sanitize_for_log(hostname)}: {sanitize_for_log(e)}"
)
return False

if not addr_info:
return False
for res in addr_info:


@lru_cache(maxsize=128)
def validate_folder_url(url: str) -> bool:
"""
Validates a folder URL.
Cached to avoid repeated DNS lookups (socket.getaddrinfo) for the same URL
during warm-up and sync phases.
Cached to avoid repeated URL parsing for the same URL.
"""
if not url.startswith("https://"):
log.warning(
Expand All @@ -913,41 +958,7 @@
if not hostname:
return False

# Check for potentially malicious hostnames
if hostname.lower() in ("localhost", "127.0.0.1", "::1"):
log.warning(
f"Skipping unsafe URL (localhost detected): {sanitize_for_log(url)}"
)
return False

try:
ip = ipaddress.ip_address(hostname)
if not ip.is_global or ip.is_multicast:
log.warning(
f"Skipping unsafe URL (non-global/multicast IP): {sanitize_for_log(url)}"
)
return False
except ValueError:
# Not an IP literal, it's a domain. Resolve and check IPs.
try:
# Resolve hostname to IPs (IPv4 and IPv6)
# We filter for AF_INET/AF_INET6 to ensure we get IP addresses
addr_info = socket.getaddrinfo(hostname, None, proto=socket.IPPROTO_TCP)
for res in addr_info:
# res is (family, type, proto, canonname, sockaddr)
# sockaddr is (address, port) for AF_INET/AF_INET6
ip_str = res[4][0]
ip = ipaddress.ip_address(ip_str)
if not ip.is_global or ip.is_multicast:
log.warning(
f"Skipping unsafe URL (domain {hostname} resolves to non-global/multicast IP {ip}): {sanitize_for_log(url)}"
)
return False
except (socket.gaierror, ValueError, OSError) as e:
log.warning(
f"Failed to resolve/validate domain {hostname}: {sanitize_for_log(e)}"
)
return False
return validate_hostname(hostname)

except Exception as e:
log.warning(
Expand Down Expand Up @@ -2341,7 +2352,7 @@
return

# Unicode Table
def line(l, m, r): return f"{Colors.BOLD}{l}{m.join('─' * (x+2) for x in w)}{r}{Colors.ENDC}"

Check notice on line 2355 in main.py

View check run for this annotation

codefactor.io / CodeFactor

main.py#L2355

Ambiguous variable name 'l'. (E741)
def row(c): return f"{Colors.BOLD}β”‚{Colors.ENDC} {c[0]:<{w[0]}} {Colors.BOLD}β”‚{Colors.ENDC} {c[1]:>{w[1]}} {Colors.BOLD}β”‚{Colors.ENDC} {c[2]:>{w[2]}} {Colors.BOLD}β”‚{Colors.ENDC} {c[3]:>{w[3]}} {Colors.BOLD}β”‚{Colors.ENDC} {c[4]:<{w[4]}} {Colors.BOLD}β”‚{Colors.ENDC}"

print(f"\n{line('β”Œ', '─', '┐')}")
Expand Down
73 changes: 73 additions & 0 deletions tests/test_hostname_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@

import socket
from unittest.mock import MagicMock, patch

import pytest
import main

def test_validate_hostname_caching():
"""
Verify that validate_hostname caches results and avoids redundant DNS lookups.
"""
# Mock socket.getaddrinfo
with patch("socket.getaddrinfo") as mock_dns:
# Setup mock return value (valid IP)
mock_dns.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 443))]

# Clear cache to start fresh
main.validate_hostname.cache_clear()

# First call - should trigger DNS lookup
assert main.validate_hostname("example.com") is True

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
assert mock_dns.call_count == 1

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

# Second call - should use cache
assert main.validate_hostname("example.com") is True

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
assert mock_dns.call_count == 1 # Still 1

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

# different hostname - should trigger DNS lookup
assert main.validate_hostname("google.com") is True

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
assert mock_dns.call_count == 2

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

def test_validate_hostname_security():
"""
Verify security checks in validate_hostname.
"""
# Localhost
assert main.validate_hostname("localhost") is False

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
assert main.validate_hostname("127.0.0.1") is False

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
assert main.validate_hostname("::1") is False

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

# Private IP
assert main.validate_hostname("192.168.1.1") is False

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

# Domain resolving to private IP
with patch("socket.getaddrinfo") as mock_dns:
# Return private IP
mock_dns.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 443))]
main.validate_hostname.cache_clear()

assert main.validate_hostname("private.local") is False

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

def test_validate_folder_url_uses_validate_hostname():
"""
Verify that validate_folder_url calls validate_hostname.
"""
with patch("main.validate_hostname") as mock_validate:
mock_validate.return_value = True

# Clear cache
main.validate_folder_url.cache_clear()

url = "https://example.com/data.json"
assert main.validate_folder_url(url) is True

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

mock_validate.assert_called_with("example.com")

# Invalid hostname
mock_validate.return_value = False

# Clear cache again because URL is the same
main.validate_folder_url.cache_clear()

assert main.validate_folder_url(url) is False

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.
Loading