From c85ebf817cb90fb975444a7134b38d8c2dc07d3c Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Wed, 7 Jan 2026 10:49:12 +0000 Subject: [PATCH] =?UTF-8?q?=F0=9F=9B=A1=EF=B8=8F=20Sentinel:=20[HIGH]=20Fi?= =?UTF-8?q?x=20SSRF=20by=20validating=20resolved=20IPs=20(including=20link?= =?UTF-8?q?-local)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Description: - **Vulnerability:** The application previously only validated the hostname string of folder URLs. This allowed attackers to bypass SSRF protections by using domains that resolve to internal or private IP addresses (e.g., `localtest.me` -> `127.0.0.1`). Additionally, link-local addresses (used for cloud metadata services) were not explicitly blocked. - **Fix:** Enhanced `validate_folder_url` to: 1. Resolve domains to IP addresses using `socket.getaddrinfo`. 2. Check both IP literals and resolved IPs against `is_private`, `is_loopback`, and `is_link_local` checks. 3. Fail closed on DNS resolution errors. - **Verification:** Added `tests/test_security.py` with test cases for local, private, link-local, and public IPs, as well as DNS failures. --- .jules/sentinel.md | 8 ++++ main.py | 24 ++++++++++-- tests/test_security.py | 86 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 4 deletions(-) create mode 100644 tests/test_security.py diff --git a/.jules/sentinel.md b/.jules/sentinel.md index 675973d..0ad8576 100644 --- a/.jules/sentinel.md +++ b/.jules/sentinel.md @@ -39,3 +39,11 @@ **Prevention:** 1. Parse URLs and check hostnames against `localhost` and private IP ranges using `ipaddress` module. 2. Enforce strict length limits on user inputs (e.g., profile IDs) to prevent resource exhaustion or buffer abuse. + +## 2024-05-22 - SSRF Protection: Link-Local Addresses and DNS Resolution +**Vulnerability:** The application's SSRF protection checked private IPs but missed link-local addresses (e.g., `169.254.169.254`), exposing cloud metadata. Also, it only validated the hostname string, allowing bypass via DNS rebinding or domains pointing to private IPs. +**Learning:** SSRF protection must verify the *resolved* IP address, not just the hostname string. Cloud environments require blocking link-local addresses (`169.254.0.0/16`) in addition to standard private ranges. +**Prevention:** +1. Use `socket.getaddrinfo` to resolve domains during validation. +2. Check resolved IPs against `is_private`, `is_loopback`, AND `is_link_local`. +3. Apply checks to both IP literals and resolved domain IPs. diff --git a/main.py b/main.py index e6aabc5..e9eb77f 100644 --- a/main.py +++ b/main.py @@ -23,6 +23,7 @@ import concurrent.futures import threading import ipaddress +import socket from urllib.parse import urlparse from typing import Dict, List, Optional, Any, Set, Sequence @@ -205,12 +206,27 @@ def validate_folder_url(url: str) -> bool: try: ip = ipaddress.ip_address(hostname) - if ip.is_private or ip.is_loopback: - log.warning(f"Skipping unsafe URL (private IP): {sanitize_for_log(url)}") + if ip.is_private or ip.is_loopback or ip.is_link_local: + log.warning(f"Skipping unsafe URL (private/unsafe IP): {sanitize_for_log(url)}") return False except ValueError: - # Not an IP literal, it's a domain. - pass + # Not an IP literal, it's a domain. Resolve it. + try: + # Resolve hostname to check if it points to a private IP + # We use getaddrinfo to support both IPv4 and IPv6 + for _, _, _, _, sockaddr in socket.getaddrinfo(hostname, None): + ip_str = sockaddr[0] + ip = ipaddress.ip_address(ip_str) + # Block private, loopback, AND link-local (e.g. 169.254.x.x for cloud metadata) + if ip.is_private or ip.is_loopback or ip.is_link_local: + log.warning(f"Skipping unsafe URL (domain {hostname} resolves to unsafe IP {ip_str}): {sanitize_for_log(url)}") + return False + except socket.gaierror: + log.warning(f"Skipping URL (DNS resolution failed): {sanitize_for_log(url)}") + return False + except Exception as e: + log.warning(f"Error resolving domain {sanitize_for_log(hostname)}: {e}") + return False except Exception as e: log.warning(f"Failed to validate URL {sanitize_for_log(url)}: {e}") diff --git a/tests/test_security.py b/tests/test_security.py new file mode 100644 index 0000000..e8521e8 --- /dev/null +++ b/tests/test_security.py @@ -0,0 +1,86 @@ +import unittest +from unittest.mock import patch +import socket +import ipaddress +import logging + +# We import the module to test its logic. +# We need to be careful about side effects from top-level code in main.py. +# However, standard unittest discovery or running this file directly is the standard way. +# We will mock environmental variables if needed, but the main.py logic handles defaults. + +# Import validate_folder_url from main +# Note: This will execute the top-level code in main.py. +import main + +class TestSSRFProtection(unittest.TestCase): + + @patch('socket.getaddrinfo') + def test_domain_resolving_to_localhost(self, mock_getaddrinfo): + # Simulate localtest.me resolving to 127.0.0.1 + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 6, '', ('127.0.0.1', 443)) + ] + + url = "https://localtest.me/config.json" + is_valid = main.validate_folder_url(url) + + self.assertFalse(is_valid, "Should reject domain resolving to localhost") + + @patch('socket.getaddrinfo') + def test_domain_resolving_to_private_ip(self, mock_getaddrinfo): + # Simulate internal.corp resolving to 192.168.1.5 + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.5', 443)) + ] + + url = "https://internal.corp/secret.json" + is_valid = main.validate_folder_url(url) + + self.assertFalse(is_valid, "Should reject domain resolving to private IP") + + def test_ip_literal_link_local(self): + # Test explicit link-local IP (169.254.x.x) + # This hits the first check (IP literal), not the DNS resolution path. + url = "https://169.254.169.254/latest/meta-data/" + is_valid = main.validate_folder_url(url) + self.assertFalse(is_valid, "Should reject link-local IP literal") + + @patch('socket.getaddrinfo') + def test_domain_resolving_to_link_local(self, mock_getaddrinfo): + # Simulate aws-metadata resolving to 169.254.169.254 + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 6, '', ('169.254.169.254', 80)) + ] + + url = "https://169.254.169.254.nip.io/latest/meta-data/" + is_valid = main.validate_folder_url(url) + + self.assertFalse(is_valid, "Should reject domain resolving to link-local IP") + + @patch('socket.getaddrinfo') + def test_domain_resolving_to_public_ip(self, mock_getaddrinfo): + # Simulate google.com resolving to 8.8.8.8 (public) + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 6, '', ('8.8.8.8', 443)) + ] + + url = "https://google.com/config.json" + is_valid = main.validate_folder_url(url) + + self.assertTrue(is_valid, "Should accept domain resolving to public IP") + + @patch('socket.getaddrinfo') + def test_dns_resolution_failure(self, mock_getaddrinfo): + # Simulate DNS failure + mock_getaddrinfo.side_effect = socket.gaierror("Name or service not known") + + url = "https://nonexistent.domain/config.json" + is_valid = main.validate_folder_url(url) + + self.assertFalse(is_valid, "Should reject URL if DNS resolution fails") + +if __name__ == '__main__': + # Suppress logging during tests + logging.getLogger('control-d-sync').setLevel(logging.CRITICAL) + unittest.main()