mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-08 03:01:47 +00:00
Cloud metadata endpoints (169.254.169.254 etc.) are now always blocked
by browser_navigate regardless of hybrid routing, allow_private_urls,
or backend.
Bug: commit 42c076d3 (#16136) added hybrid routing that flips
auto_local_this_nav=True for private URLs and short-circuits
_is_safe_url(). IMDS endpoints are technically private (169.254/16
link-local), so the sidecar happily routed them to a local Chromium,
and the agent could read IAM credentials via browser_snapshot. On
EC2/GCP/Azure this is a full SSRF-to-credential-theft.
Fix: new is_always_blocked_url() in url_safety.py — a narrow floor
that checks _BLOCKED_HOSTNAMES, _ALWAYS_BLOCKED_IPS,
_ALWAYS_BLOCKED_NETWORKS only. Applied as an independent gate in
browser_navigate's pre-nav and post-redirect checks, BEFORE
auto_local_this_nav gets a chance to short-circuit. Ordinary private
URLs (localhost, 192.168.x, 10.x, .local, CGNAT) still route to the
local sidecar as the #16136 feature intends.
Secondary fix (reporter's finding): _url_is_private() now explicitly
checks 172.16.0.0/12. ipaddress.is_private only covers that range on
Python ≥3.11 (bpo-40791), so on 3.10 runtimes those URLs were routed
to cloud instead of the local sidecar. No security impact — just a
correctness fix for the hybrid-routing feature.
Closes #16234.
This commit is contained in:
parent
12289c2630
commit
0214858ef5
4 changed files with 281 additions and 1 deletions
|
|
@ -106,6 +106,62 @@ class TestPreNavigationSsrf:
|
|||
|
||||
assert result["success"] is True
|
||||
|
||||
# -- Always-blocked floor: hybrid routing bypass regression (#16234) -------
|
||||
|
||||
# Hybrid-routing feature flips auto_local_this_nav=True for private URLs,
|
||||
# which previously short-circuited _is_safe_url() entirely. An agent
|
||||
# running on EC2/GCP/Azure could navigate to 169.254.169.254 via the
|
||||
# spawned local Chromium sidecar and read IAM credentials via
|
||||
# browser_snapshot. The always-blocked floor must fire regardless of
|
||||
# routing.
|
||||
IMDS_URLS = [
|
||||
"http://169.254.169.254/latest/meta-data/", # AWS / GCP / Azure / DO / Oracle
|
||||
"http://169.254.169.253/metadata/instance", # Azure IMDS wire server
|
||||
"http://169.254.170.2/v2/credentials", # AWS ECS task metadata
|
||||
"http://100.100.100.200/latest/meta-data/", # Alibaba Cloud
|
||||
"http://metadata.google.internal/computeMetadata/v1/", # GCP hostname
|
||||
]
|
||||
|
||||
@pytest.mark.parametrize("imds_url", IMDS_URLS)
|
||||
def test_cloud_blocks_imds_even_when_routing_to_local_sidecar(
|
||||
self, monkeypatch, _common_patches, imds_url
|
||||
):
|
||||
"""Hybrid routing must not let cloud metadata endpoints through."""
|
||||
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
|
||||
# Simulate hybrid routing kicking in for this URL (what happens on
|
||||
# main pre-fix — cloud provider configured, _url_is_private → True,
|
||||
# so the session key routes to a local Chromium sidecar).
|
||||
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: True)
|
||||
# _is_safe_url would catch IMDS, but pre-fix it never ran. Force
|
||||
# it to return True here so the test is specifically pinning the
|
||||
# always-blocked floor as an independent gate.
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
|
||||
|
||||
result = json.loads(browser_tool.browser_navigate(imds_url))
|
||||
|
||||
assert result["success"] is False
|
||||
assert "cloud metadata endpoint" in result["error"]
|
||||
|
||||
def test_cloud_allows_ordinary_private_url_via_sidecar(
|
||||
self, monkeypatch, _common_patches
|
||||
):
|
||||
"""Hybrid routing still works for ordinary private URLs — floor
|
||||
must be narrow enough to not break the PR #16136 feature."""
|
||||
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: True)
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
|
||||
|
||||
for private in (
|
||||
"http://127.0.0.1:8080/dashboard",
|
||||
"http://192.168.1.1/admin",
|
||||
"http://10.0.0.5/",
|
||||
"http://myservice.local/",
|
||||
):
|
||||
result = json.loads(browser_tool.browser_navigate(private))
|
||||
assert result["success"] is True, f"Unexpected block for {private}: {result}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _is_local_backend() unit tests
|
||||
|
|
@ -236,6 +292,32 @@ class TestPostRedirectSsrf:
|
|||
assert result["success"] is True
|
||||
assert result["url"] == final
|
||||
|
||||
# -- Always-blocked floor: redirect to IMDS via hybrid sidecar (#16234) ----
|
||||
|
||||
def test_cloud_blocks_redirect_to_imds_even_via_sidecar(
|
||||
self, monkeypatch, _common_patches
|
||||
):
|
||||
"""Redirect to a cloud metadata endpoint is blocked regardless of
|
||||
routing — even the hybrid local sidecar path can't return IMDS
|
||||
content to the agent."""
|
||||
imds_final = "http://169.254.169.254/latest/meta-data/"
|
||||
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: True)
|
||||
# _is_safe_url would catch it on main; force True to pin the
|
||||
# always-blocked floor as an independent gate.
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
|
||||
monkeypatch.setattr(
|
||||
browser_tool,
|
||||
"_run_browser_command",
|
||||
lambda *a, **kw: _make_browser_result(url=imds_final),
|
||||
)
|
||||
|
||||
result = json.loads(browser_tool.browser_navigate(self.PUBLIC_URL))
|
||||
|
||||
assert result["success"] is False
|
||||
assert "cloud metadata endpoint" in result["error"]
|
||||
|
||||
|
||||
class TestAllowPrivateUrlsConfig:
|
||||
@pytest.fixture(autouse=True)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ from unittest.mock import patch
|
|||
|
||||
from tools.url_safety import (
|
||||
is_safe_url,
|
||||
is_always_blocked_url,
|
||||
_is_blocked_ip,
|
||||
_global_allow_private_urls,
|
||||
_reset_allow_private_cache,
|
||||
|
|
@ -407,3 +408,69 @@ class TestAllowPrivateUrlsIntegration:
|
|||
"""Empty URLs are still blocked."""
|
||||
monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true")
|
||||
assert is_safe_url("") is False
|
||||
|
||||
|
||||
class TestIsAlwaysBlockedUrl:
|
||||
"""The always-blocked floor — cloud metadata only, narrower than is_safe_url."""
|
||||
|
||||
# -- The sentinel set that must always block --------------------------------
|
||||
|
||||
@pytest.mark.parametrize("url", [
|
||||
"http://169.254.169.254/latest/meta-data/", # AWS / GCP / Azure / DO / Oracle
|
||||
"http://169.254.169.253/metadata/instance", # Azure IMDS wire server
|
||||
"http://169.254.170.2/v2/credentials", # AWS ECS task metadata
|
||||
"http://100.100.100.200/latest/meta-data/", # Alibaba Cloud
|
||||
"http://169.254.42.1/", # Any /16 link-local
|
||||
])
|
||||
def test_literal_imds_ips_always_blocked(self, url):
|
||||
"""Literal IMDS IPs and the /16 link-local range always block."""
|
||||
assert is_always_blocked_url(url) is True
|
||||
|
||||
def test_gcp_metadata_hostname_always_blocked_even_without_dns(self):
|
||||
"""metadata.google.internal blocks by hostname, no DNS needed."""
|
||||
with patch("socket.getaddrinfo", side_effect=socket.gaierror("nope")):
|
||||
assert is_always_blocked_url("http://metadata.google.internal/") is True
|
||||
|
||||
def test_hostname_resolving_to_imds_always_blocked(self):
|
||||
"""Attacker-controlled hostname resolving to IMDS still blocks."""
|
||||
with patch("socket.getaddrinfo", return_value=[
|
||||
(2, 1, 6, "", ("169.254.169.254", 0)),
|
||||
]):
|
||||
assert is_always_blocked_url("http://attacker-controlled.example.com/") is True
|
||||
|
||||
# -- Things the floor must NOT block ----------------------------------------
|
||||
|
||||
def test_public_url_not_blocked(self):
|
||||
assert is_always_blocked_url("https://example.com/path") is False
|
||||
|
||||
@pytest.mark.parametrize("url", [
|
||||
"http://127.0.0.1:8080/",
|
||||
"http://192.168.1.1/",
|
||||
"http://10.0.0.5/",
|
||||
"http://172.16.0.1/",
|
||||
"http://100.64.0.1/", # CGNAT — blocked by is_safe_url but not by the floor
|
||||
])
|
||||
def test_ordinary_private_urls_not_in_floor(self, url):
|
||||
"""Floor is narrower than is_safe_url — ordinary private URLs pass."""
|
||||
assert is_always_blocked_url(url) is False
|
||||
|
||||
def test_dns_failure_not_in_floor(self):
|
||||
"""DNS failure on a non-sentinel hostname = not always-blocked.
|
||||
|
||||
Caller's ordinary fail-closed path (is_safe_url) handles that case.
|
||||
"""
|
||||
with patch("socket.getaddrinfo", side_effect=socket.gaierror("fail")):
|
||||
assert is_always_blocked_url("http://nonexistent.example.com/") is False
|
||||
|
||||
def test_empty_url_not_in_floor(self):
|
||||
"""Empty URL falls through — caller decides what to do with a malformed URL."""
|
||||
assert is_always_blocked_url("") is False
|
||||
|
||||
def test_malformed_url_not_in_floor(self):
|
||||
"""Parse errors don't claim always-blocked status."""
|
||||
assert is_always_blocked_url("not a url at all") is False
|
||||
|
||||
def test_floor_ignores_allow_private_urls_toggle(self, monkeypatch):
|
||||
"""security.allow_private_urls can NOT unblock cloud metadata."""
|
||||
monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true")
|
||||
assert is_always_blocked_url("http://169.254.169.254/") is True
|
||||
|
|
|
|||
|
|
@ -76,9 +76,13 @@ except Exception:
|
|||
check_website_access = lambda url: None # noqa: E731 — fail-open if policy module unavailable
|
||||
|
||||
try:
|
||||
from tools.url_safety import is_safe_url as _is_safe_url
|
||||
from tools.url_safety import (
|
||||
is_safe_url as _is_safe_url,
|
||||
is_always_blocked_url as _is_always_blocked_url,
|
||||
)
|
||||
except Exception:
|
||||
_is_safe_url = lambda url: False # noqa: E731 — fail-closed: block all if safety module unavailable
|
||||
_is_always_blocked_url = lambda url: True # noqa: E731 — fail-closed on the floor too
|
||||
from tools.browser_providers.base import CloudBrowserProvider
|
||||
from tools.browser_providers.browserbase import BrowserbaseProvider
|
||||
from tools.browser_providers.browser_use import BrowserUseProvider
|
||||
|
|
@ -837,6 +841,10 @@ def _url_is_private(url: str) -> bool:
|
|||
ip.is_private
|
||||
or ip.is_loopback
|
||||
or ip.is_link_local
|
||||
# 172.16.0.0/12: only covered by ip.is_private on Python
|
||||
# ≥3.11 (bpo-40791). Explicit check keeps 3.10 runtimes
|
||||
# routing these to the local sidecar correctly.
|
||||
or ip in ipaddress.ip_network("172.16.0.0/12")
|
||||
or ip in ipaddress.ip_network("100.64.0.0/10")
|
||||
)
|
||||
except ValueError:
|
||||
|
|
@ -2081,6 +2089,18 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str:
|
|||
nav_session_key = _navigation_session_key(effective_task_id, url)
|
||||
auto_local_this_nav = _is_local_sidecar_key(nav_session_key)
|
||||
|
||||
# Always-blocked floor: cloud metadata / IMDS endpoints are denied
|
||||
# regardless of backend, hybrid routing, or allow_private_urls.
|
||||
# There's no legitimate agent use case for navigating to
|
||||
# 169.254.169.254 / metadata.google.internal / ECS task metadata
|
||||
# via a browser, and routing those to a local Chromium sidecar
|
||||
# on an EC2/GCP/Azure host exfiltrates IAM credentials (#16234).
|
||||
if not _is_local_backend() and _is_always_blocked_url(url):
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": "Blocked: URL targets a cloud metadata endpoint",
|
||||
})
|
||||
|
||||
if (
|
||||
not _is_local_backend()
|
||||
and not auto_local_this_nav
|
||||
|
|
@ -2143,6 +2163,21 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str:
|
|||
# Skipped for local backends (same rationale as the pre-nav check),
|
||||
# and for the hybrid local sidecar (we're already on a local browser
|
||||
# hitting a private URL by design).
|
||||
# Always-blocked floor (cloud metadata / IMDS) is enforced even
|
||||
# when auto_local_this_nav is true — see pre-nav check for
|
||||
# rationale (#16234).
|
||||
if (
|
||||
not _is_local_backend()
|
||||
and final_url
|
||||
and final_url != url
|
||||
and _is_always_blocked_url(final_url)
|
||||
):
|
||||
_run_browser_command(nav_session_key, "open", ["about:blank"], timeout=10)
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": "Blocked: redirect landed on a cloud metadata endpoint",
|
||||
})
|
||||
|
||||
if (
|
||||
not _is_local_backend()
|
||||
and not auto_local_this_nav
|
||||
|
|
|
|||
|
|
@ -147,6 +147,102 @@ def _is_blocked_ip(ip: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def is_always_blocked_url(url: str) -> bool:
|
||||
"""Return True when the URL targets an always-blocked endpoint.
|
||||
|
||||
This is the security floor — cloud metadata IPs / hostnames
|
||||
(169.254.169.254, metadata.google.internal, ECS task metadata, etc.)
|
||||
that have no legitimate agent use regardless of backend, routing, or
|
||||
the ``allow_private_urls`` toggle. Used by callers that bypass the
|
||||
full ``is_safe_url`` check for their own reasons (e.g. hybrid cloud
|
||||
browser routing to a local Chromium sidecar for private URLs) and
|
||||
still need to enforce the non-negotiable floor before letting the
|
||||
request proceed.
|
||||
|
||||
Returns True (= blocked) on:
|
||||
- Hostnames in ``_BLOCKED_HOSTNAMES``
|
||||
- IPs / networks in ``_ALWAYS_BLOCKED_IPS`` / ``_ALWAYS_BLOCKED_NETWORKS``
|
||||
- URLs whose hostname resolves to any of the above
|
||||
|
||||
Returns False (= not in the always-blocked floor) on:
|
||||
- Benign public / private / loopback URLs (whether or not they'd
|
||||
be blocked by the ordinary SSRF check)
|
||||
- DNS-resolution failures for non-sentinel hostnames (these are
|
||||
someone else's problem — the caller's ordinary fail-closed path
|
||||
will catch them if applicable)
|
||||
- Parse errors (caller decides fail-open vs fail-closed)
|
||||
|
||||
Intentionally narrower than ``is_safe_url``: only blocks the sentinel
|
||||
set, not ordinary private addresses. Callers that want the full
|
||||
SSRF check should still use ``is_safe_url``.
|
||||
"""
|
||||
try:
|
||||
parsed = urlparse(url)
|
||||
hostname = (parsed.hostname or "").strip().lower().rstrip(".")
|
||||
if not hostname:
|
||||
return False
|
||||
|
||||
# Blocked-hostname check fires regardless of DNS resolution
|
||||
if hostname in _BLOCKED_HOSTNAMES:
|
||||
logger.warning(
|
||||
"Blocked request to internal hostname (always-blocked floor): %s",
|
||||
hostname,
|
||||
)
|
||||
return True
|
||||
|
||||
# Literal IP → check directly against the always-blocked set
|
||||
try:
|
||||
ip = ipaddress.ip_address(hostname)
|
||||
except ValueError:
|
||||
ip = None
|
||||
|
||||
if ip is not None:
|
||||
if ip in _ALWAYS_BLOCKED_IPS or any(
|
||||
ip in net for net in _ALWAYS_BLOCKED_NETWORKS
|
||||
):
|
||||
logger.warning(
|
||||
"Blocked request to cloud metadata address "
|
||||
"(always-blocked floor): %s",
|
||||
hostname,
|
||||
)
|
||||
return True
|
||||
return False
|
||||
|
||||
# Hostname → resolve and check every answer. DNS failure is NOT
|
||||
# always-blocked (caller's ordinary path handles that).
|
||||
try:
|
||||
addr_info = socket.getaddrinfo(
|
||||
hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM
|
||||
)
|
||||
except socket.gaierror:
|
||||
return False
|
||||
|
||||
for _family, _, _, _, sockaddr in addr_info:
|
||||
ip_str = sockaddr[0]
|
||||
try:
|
||||
resolved = ipaddress.ip_address(ip_str)
|
||||
except ValueError:
|
||||
continue
|
||||
if resolved in _ALWAYS_BLOCKED_IPS or any(
|
||||
resolved in net for net in _ALWAYS_BLOCKED_NETWORKS
|
||||
):
|
||||
logger.warning(
|
||||
"Blocked request to cloud metadata address "
|
||||
"(always-blocked floor): %s -> %s",
|
||||
hostname,
|
||||
ip_str,
|
||||
)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
except Exception as exc:
|
||||
# Parse failures or unexpected errors — don't claim the URL is
|
||||
# always-blocked. Caller decides what to do with a malformed URL.
|
||||
logger.debug("is_always_blocked_url error for %s: %s", url, exc)
|
||||
return False
|
||||
|
||||
|
||||
def _allows_private_ip_resolution(hostname: str, scheme: str) -> bool:
|
||||
"""Return True when a trusted HTTPS hostname may bypass IP-class blocking."""
|
||||
return scheme == "https" and hostname in _TRUSTED_PRIVATE_IP_HOSTS
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue