diff --git a/tests/tools/test_url_safety.py b/tests/tools/test_url_safety.py index 6a2de78f6..4382d8ab3 100644 --- a/tests/tools/test_url_safety.py +++ b/tests/tools/test_url_safety.py @@ -152,6 +152,34 @@ class TestIsSafeUrl: # 100.0.0.1 is a global IP, not in CGNAT range assert is_safe_url("http://legit-host.example/") is True + def test_benchmark_ip_blocked_for_non_allowlisted_host(self): + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("198.18.0.23", 0)), + ]): + assert is_safe_url("https://example.com/file.jpg") is False + + def test_qq_multimedia_hostname_allowed_with_benchmark_ip(self): + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("198.18.0.23", 0)), + ]): + assert is_safe_url("https://multimedia.nt.qq.com.cn/download?id=123") is True + + def test_qq_multimedia_hostname_exception_is_exact_match(self): + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("198.18.0.23", 0)), + ]): + assert is_safe_url("https://sub.multimedia.nt.qq.com.cn/download?id=123") is False + + def test_qq_multimedia_hostname_exception_requires_https(self): + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("198.18.0.23", 0)), + ]): + assert is_safe_url("http://multimedia.nt.qq.com.cn/download?id=123") is False + + def test_qq_multimedia_hostname_dns_failure_still_blocked(self): + with patch("socket.getaddrinfo", side_effect=socket.gaierror("Name resolution failed")): + assert is_safe_url("https://multimedia.nt.qq.com.cn/download?id=123") is False + class TestIsBlockedIp: """Direct tests for the _is_blocked_ip helper.""" @@ -159,7 +187,7 @@ class TestIsBlockedIp: @pytest.mark.parametrize("ip_str", [ "127.0.0.1", "10.0.0.1", "172.16.0.1", "192.168.1.1", "169.254.169.254", "0.0.0.0", "224.0.0.1", "255.255.255.255", - "100.64.0.1", "100.100.100.100", "100.127.255.254", + "100.64.0.1", "100.100.100.100", "100.127.255.254", "198.18.0.23", "::1", "fe80::1", "fc00::1", "fd12::1", "ff02::1", "::ffff:127.0.0.1", "::ffff:169.254.169.254", ]) diff --git a/tools/url_safety.py b/tools/url_safety.py index 3dc57ca45..c961f722c 100644 --- a/tools/url_safety.py +++ b/tools/url_safety.py @@ -29,6 +29,13 @@ _BLOCKED_HOSTNAMES = frozenset({ "metadata.goog", }) +# Exact HTTPS hostnames allowed to resolve to private/benchmark-space IPs. +# This is intentionally narrow: QQ media downloads can legitimately resolve +# to 198.18.0.0/15 behind local proxy/benchmark infrastructure. +_TRUSTED_PRIVATE_IP_HOSTS = frozenset({ + "multimedia.nt.qq.com.cn", +}) + # 100.64.0.0/10 (CGNAT / Shared Address Space, RFC 6598) is NOT covered by # ipaddress.is_private — it returns False for both is_private and is_global. # Must be blocked explicitly. Used by carrier-grade NAT, Tailscale/WireGuard @@ -48,6 +55,11 @@ def _is_blocked_ip(ip: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool: return False +def _allows_private_ip_resolution(hostname: str, scheme: str) -> bool: + """Return True when a trusted HTTPS hostname may bypass IP-class blocking.""" + return scheme == "https" and hostname in _TRUSTED_PRIVATE_IP_HOSTS + + def is_safe_url(url: str) -> bool: """Return True if the URL target is not a private/internal address. @@ -56,7 +68,8 @@ def is_safe_url(url: str) -> bool: """ try: parsed = urlparse(url) - hostname = (parsed.hostname or "").strip().lower() + hostname = (parsed.hostname or "").strip().lower().rstrip(".") + scheme = (parsed.scheme or "").strip().lower() if not hostname: return False @@ -65,6 +78,8 @@ def is_safe_url(url: str) -> bool: logger.warning("Blocked request to internal hostname: %s", hostname) return False + allow_private_ip = _allows_private_ip_resolution(hostname, scheme) + # Try to resolve and check IP try: addr_info = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM) @@ -81,13 +96,19 @@ def is_safe_url(url: str) -> bool: except ValueError: continue - if _is_blocked_ip(ip): + if not allow_private_ip and _is_blocked_ip(ip): logger.warning( "Blocked request to private/internal address: %s -> %s", hostname, ip_str, ) return False + if allow_private_ip: + logger.debug( + "Allowing trusted hostname despite private/internal resolution: %s", + hostname, + ) + return True except Exception as exc: