diff --git a/tests/tools/test_browser_ssrf_local.py b/tests/tools/test_browser_ssrf_local.py index b3b8bd2271..691f9256f2 100644 --- a/tests/tools/test_browser_ssrf_local.py +++ b/tests/tools/test_browser_ssrf_local.py @@ -106,6 +106,62 @@ class TestPreNavigationSsrf: assert result["success"] is True + # -- Always-blocked floor: hybrid routing bypass regression (#16234) ------- + + # Hybrid-routing feature flips auto_local_this_nav=True for private URLs, + # which previously short-circuited _is_safe_url() entirely. An agent + # running on EC2/GCP/Azure could navigate to 169.254.169.254 via the + # spawned local Chromium sidecar and read IAM credentials via + # browser_snapshot. The always-blocked floor must fire regardless of + # routing. + IMDS_URLS = [ + "http://169.254.169.254/latest/meta-data/", # AWS / GCP / Azure / DO / Oracle + "http://169.254.169.253/metadata/instance", # Azure IMDS wire server + "http://169.254.170.2/v2/credentials", # AWS ECS task metadata + "http://100.100.100.200/latest/meta-data/", # Alibaba Cloud + "http://metadata.google.internal/computeMetadata/v1/", # GCP hostname + ] + + @pytest.mark.parametrize("imds_url", IMDS_URLS) + def test_cloud_blocks_imds_even_when_routing_to_local_sidecar( + self, monkeypatch, _common_patches, imds_url + ): + """Hybrid routing must not let cloud metadata endpoints through.""" + monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False) + monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False) + # Simulate hybrid routing kicking in for this URL (what happens on + # main pre-fix — cloud provider configured, _url_is_private → True, + # so the session key routes to a local Chromium sidecar). + monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: True) + # _is_safe_url would catch IMDS, but pre-fix it never ran. Force + # it to return True here so the test is specifically pinning the + # always-blocked floor as an independent gate. + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True) + + result = json.loads(browser_tool.browser_navigate(imds_url)) + + assert result["success"] is False + assert "cloud metadata endpoint" in result["error"] + + def test_cloud_allows_ordinary_private_url_via_sidecar( + self, monkeypatch, _common_patches + ): + """Hybrid routing still works for ordinary private URLs — floor + must be narrow enough to not break the PR #16136 feature.""" + monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False) + monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False) + monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: True) + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False) + + for private in ( + "http://127.0.0.1:8080/dashboard", + "http://192.168.1.1/admin", + "http://10.0.0.5/", + "http://myservice.local/", + ): + result = json.loads(browser_tool.browser_navigate(private)) + assert result["success"] is True, f"Unexpected block for {private}: {result}" + # --------------------------------------------------------------------------- # _is_local_backend() unit tests @@ -236,6 +292,32 @@ class TestPostRedirectSsrf: assert result["success"] is True assert result["url"] == final + # -- Always-blocked floor: redirect to IMDS via hybrid sidecar (#16234) ---- + + def test_cloud_blocks_redirect_to_imds_even_via_sidecar( + self, monkeypatch, _common_patches + ): + """Redirect to a cloud metadata endpoint is blocked regardless of + routing — even the hybrid local sidecar path can't return IMDS + content to the agent.""" + imds_final = "http://169.254.169.254/latest/meta-data/" + monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False) + monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False) + monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: True) + # _is_safe_url would catch it on main; force True to pin the + # always-blocked floor as an independent gate. + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True) + monkeypatch.setattr( + browser_tool, + "_run_browser_command", + lambda *a, **kw: _make_browser_result(url=imds_final), + ) + + result = json.loads(browser_tool.browser_navigate(self.PUBLIC_URL)) + + assert result["success"] is False + assert "cloud metadata endpoint" in result["error"] + class TestAllowPrivateUrlsConfig: @pytest.fixture(autouse=True) diff --git a/tests/tools/test_url_safety.py b/tests/tools/test_url_safety.py index 12b5b92ac5..38d27d40af 100644 --- a/tests/tools/test_url_safety.py +++ b/tests/tools/test_url_safety.py @@ -5,6 +5,7 @@ from unittest.mock import patch from tools.url_safety import ( is_safe_url, + is_always_blocked_url, _is_blocked_ip, _global_allow_private_urls, _reset_allow_private_cache, @@ -407,3 +408,69 @@ class TestAllowPrivateUrlsIntegration: """Empty URLs are still blocked.""" monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") assert is_safe_url("") is False + + +class TestIsAlwaysBlockedUrl: + """The always-blocked floor — cloud metadata only, narrower than is_safe_url.""" + + # -- The sentinel set that must always block -------------------------------- + + @pytest.mark.parametrize("url", [ + "http://169.254.169.254/latest/meta-data/", # AWS / GCP / Azure / DO / Oracle + "http://169.254.169.253/metadata/instance", # Azure IMDS wire server + "http://169.254.170.2/v2/credentials", # AWS ECS task metadata + "http://100.100.100.200/latest/meta-data/", # Alibaba Cloud + "http://169.254.42.1/", # Any /16 link-local + ]) + def test_literal_imds_ips_always_blocked(self, url): + """Literal IMDS IPs and the /16 link-local range always block.""" + assert is_always_blocked_url(url) is True + + def test_gcp_metadata_hostname_always_blocked_even_without_dns(self): + """metadata.google.internal blocks by hostname, no DNS needed.""" + with patch("socket.getaddrinfo", side_effect=socket.gaierror("nope")): + assert is_always_blocked_url("http://metadata.google.internal/") is True + + def test_hostname_resolving_to_imds_always_blocked(self): + """Attacker-controlled hostname resolving to IMDS still blocks.""" + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("169.254.169.254", 0)), + ]): + assert is_always_blocked_url("http://attacker-controlled.example.com/") is True + + # -- Things the floor must NOT block ---------------------------------------- + + def test_public_url_not_blocked(self): + assert is_always_blocked_url("https://example.com/path") is False + + @pytest.mark.parametrize("url", [ + "http://127.0.0.1:8080/", + "http://192.168.1.1/", + "http://10.0.0.5/", + "http://172.16.0.1/", + "http://100.64.0.1/", # CGNAT — blocked by is_safe_url but not by the floor + ]) + def test_ordinary_private_urls_not_in_floor(self, url): + """Floor is narrower than is_safe_url — ordinary private URLs pass.""" + assert is_always_blocked_url(url) is False + + def test_dns_failure_not_in_floor(self): + """DNS failure on a non-sentinel hostname = not always-blocked. + + Caller's ordinary fail-closed path (is_safe_url) handles that case. + """ + with patch("socket.getaddrinfo", side_effect=socket.gaierror("fail")): + assert is_always_blocked_url("http://nonexistent.example.com/") is False + + def test_empty_url_not_in_floor(self): + """Empty URL falls through — caller decides what to do with a malformed URL.""" + assert is_always_blocked_url("") is False + + def test_malformed_url_not_in_floor(self): + """Parse errors don't claim always-blocked status.""" + assert is_always_blocked_url("not a url at all") is False + + def test_floor_ignores_allow_private_urls_toggle(self, monkeypatch): + """security.allow_private_urls can NOT unblock cloud metadata.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + assert is_always_blocked_url("http://169.254.169.254/") is True diff --git a/tools/browser_tool.py b/tools/browser_tool.py index 049565d638..c8cdedcf0b 100644 --- a/tools/browser_tool.py +++ b/tools/browser_tool.py @@ -76,9 +76,13 @@ except Exception: check_website_access = lambda url: None # noqa: E731 — fail-open if policy module unavailable try: - from tools.url_safety import is_safe_url as _is_safe_url + from tools.url_safety import ( + is_safe_url as _is_safe_url, + is_always_blocked_url as _is_always_blocked_url, + ) except Exception: _is_safe_url = lambda url: False # noqa: E731 — fail-closed: block all if safety module unavailable + _is_always_blocked_url = lambda url: True # noqa: E731 — fail-closed on the floor too from tools.browser_providers.base import CloudBrowserProvider from tools.browser_providers.browserbase import BrowserbaseProvider from tools.browser_providers.browser_use import BrowserUseProvider @@ -837,6 +841,10 @@ def _url_is_private(url: str) -> bool: ip.is_private or ip.is_loopback or ip.is_link_local + # 172.16.0.0/12: only covered by ip.is_private on Python + # ≥3.11 (bpo-40791). Explicit check keeps 3.10 runtimes + # routing these to the local sidecar correctly. + or ip in ipaddress.ip_network("172.16.0.0/12") or ip in ipaddress.ip_network("100.64.0.0/10") ) except ValueError: @@ -2081,6 +2089,18 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str: nav_session_key = _navigation_session_key(effective_task_id, url) auto_local_this_nav = _is_local_sidecar_key(nav_session_key) + # Always-blocked floor: cloud metadata / IMDS endpoints are denied + # regardless of backend, hybrid routing, or allow_private_urls. + # There's no legitimate agent use case for navigating to + # 169.254.169.254 / metadata.google.internal / ECS task metadata + # via a browser, and routing those to a local Chromium sidecar + # on an EC2/GCP/Azure host exfiltrates IAM credentials (#16234). + if not _is_local_backend() and _is_always_blocked_url(url): + return json.dumps({ + "success": False, + "error": "Blocked: URL targets a cloud metadata endpoint", + }) + if ( not _is_local_backend() and not auto_local_this_nav @@ -2143,6 +2163,21 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str: # Skipped for local backends (same rationale as the pre-nav check), # and for the hybrid local sidecar (we're already on a local browser # hitting a private URL by design). + # Always-blocked floor (cloud metadata / IMDS) is enforced even + # when auto_local_this_nav is true — see pre-nav check for + # rationale (#16234). + if ( + not _is_local_backend() + and final_url + and final_url != url + and _is_always_blocked_url(final_url) + ): + _run_browser_command(nav_session_key, "open", ["about:blank"], timeout=10) + return json.dumps({ + "success": False, + "error": "Blocked: redirect landed on a cloud metadata endpoint", + }) + if ( not _is_local_backend() and not auto_local_this_nav diff --git a/tools/url_safety.py b/tools/url_safety.py index 860d4d9dfa..723b1b0c7c 100644 --- a/tools/url_safety.py +++ b/tools/url_safety.py @@ -147,6 +147,102 @@ def _is_blocked_ip(ip: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool: return False +def is_always_blocked_url(url: str) -> bool: + """Return True when the URL targets an always-blocked endpoint. + + This is the security floor — cloud metadata IPs / hostnames + (169.254.169.254, metadata.google.internal, ECS task metadata, etc.) + that have no legitimate agent use regardless of backend, routing, or + the ``allow_private_urls`` toggle. Used by callers that bypass the + full ``is_safe_url`` check for their own reasons (e.g. hybrid cloud + browser routing to a local Chromium sidecar for private URLs) and + still need to enforce the non-negotiable floor before letting the + request proceed. + + Returns True (= blocked) on: + - Hostnames in ``_BLOCKED_HOSTNAMES`` + - IPs / networks in ``_ALWAYS_BLOCKED_IPS`` / ``_ALWAYS_BLOCKED_NETWORKS`` + - URLs whose hostname resolves to any of the above + + Returns False (= not in the always-blocked floor) on: + - Benign public / private / loopback URLs (whether or not they'd + be blocked by the ordinary SSRF check) + - DNS-resolution failures for non-sentinel hostnames (these are + someone else's problem — the caller's ordinary fail-closed path + will catch them if applicable) + - Parse errors (caller decides fail-open vs fail-closed) + + Intentionally narrower than ``is_safe_url``: only blocks the sentinel + set, not ordinary private addresses. Callers that want the full + SSRF check should still use ``is_safe_url``. + """ + try: + parsed = urlparse(url) + hostname = (parsed.hostname or "").strip().lower().rstrip(".") + if not hostname: + return False + + # Blocked-hostname check fires regardless of DNS resolution + if hostname in _BLOCKED_HOSTNAMES: + logger.warning( + "Blocked request to internal hostname (always-blocked floor): %s", + hostname, + ) + return True + + # Literal IP → check directly against the always-blocked set + try: + ip = ipaddress.ip_address(hostname) + except ValueError: + ip = None + + if ip is not None: + if ip in _ALWAYS_BLOCKED_IPS or any( + ip in net for net in _ALWAYS_BLOCKED_NETWORKS + ): + logger.warning( + "Blocked request to cloud metadata address " + "(always-blocked floor): %s", + hostname, + ) + return True + return False + + # Hostname → resolve and check every answer. DNS failure is NOT + # always-blocked (caller's ordinary path handles that). + try: + addr_info = socket.getaddrinfo( + hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM + ) + except socket.gaierror: + return False + + for _family, _, _, _, sockaddr in addr_info: + ip_str = sockaddr[0] + try: + resolved = ipaddress.ip_address(ip_str) + except ValueError: + continue + if resolved in _ALWAYS_BLOCKED_IPS or any( + resolved in net for net in _ALWAYS_BLOCKED_NETWORKS + ): + logger.warning( + "Blocked request to cloud metadata address " + "(always-blocked floor): %s -> %s", + hostname, + ip_str, + ) + return True + + return False + + except Exception as exc: + # Parse failures or unexpected errors — don't claim the URL is + # always-blocked. Caller decides what to do with a malformed URL. + logger.debug("is_always_blocked_url error for %s: %s", url, exc) + return False + + def _allows_private_ip_resolution(hostname: str, scheme: str) -> bool: """Return True when a trusted HTTPS hostname may bypass IP-class blocking.""" return scheme == "https" and hostname in _TRUSTED_PRIVATE_IP_HOSTS