diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 81275a7f9..ce77af7ca 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -840,6 +840,7 @@ DEFAULT_CONFIG = { # Pre-exec security scanning via tirith "security": { + "allow_private_urls": False, # Allow requests to private/internal IPs (for OpenWrt, proxies, VPNs) "redact_secrets": True, "tirith_enabled": True, "tirith_path": "tirith", diff --git a/tests/tools/test_url_safety.py b/tests/tools/test_url_safety.py index 4382d8ab3..9377fc40e 100644 --- a/tests/tools/test_url_safety.py +++ b/tests/tools/test_url_safety.py @@ -3,7 +3,12 @@ import socket from unittest.mock import patch -from tools.url_safety import is_safe_url, _is_blocked_ip +from tools.url_safety import ( + is_safe_url, + _is_blocked_ip, + _global_allow_private_urls, + _reset_allow_private_cache, +) import ipaddress import pytest @@ -202,3 +207,189 @@ class TestIsBlockedIp: def test_allowed_ips(self, ip_str): ip = ipaddress.ip_address(ip_str) assert _is_blocked_ip(ip) is False, f"{ip_str} should be allowed" + + +class TestGlobalAllowPrivateUrls: + """Tests for the security.allow_private_urls config toggle.""" + + @pytest.fixture(autouse=True) + def _reset_cache(self): + """Reset the module-level toggle cache before and after each test.""" + _reset_allow_private_cache() + yield + _reset_allow_private_cache() + + def test_default_is_false(self, monkeypatch): + """Toggle defaults to False when no env var or config is set.""" + monkeypatch.delenv("HERMES_ALLOW_PRIVATE_URLS", raising=False) + with patch("hermes_cli.config.read_raw_config", side_effect=Exception("no config")): + assert _global_allow_private_urls() is False + + def test_env_var_true(self, monkeypatch): + """HERMES_ALLOW_PRIVATE_URLS=true enables the toggle.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + assert _global_allow_private_urls() is True + + def test_env_var_1(self, monkeypatch): + """HERMES_ALLOW_PRIVATE_URLS=1 enables the toggle.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "1") + assert _global_allow_private_urls() is True + + def test_env_var_yes(self, monkeypatch): + """HERMES_ALLOW_PRIVATE_URLS=yes enables the toggle.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "yes") + assert _global_allow_private_urls() is True + + def test_env_var_false(self, monkeypatch): + """HERMES_ALLOW_PRIVATE_URLS=false keeps it disabled.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "false") + assert _global_allow_private_urls() is False + + def test_config_security_section(self, monkeypatch): + """security.allow_private_urls in config enables the toggle.""" + monkeypatch.delenv("HERMES_ALLOW_PRIVATE_URLS", raising=False) + cfg = {"security": {"allow_private_urls": True}} + with patch("hermes_cli.config.read_raw_config", return_value=cfg): + assert _global_allow_private_urls() is True + + def test_config_browser_fallback(self, monkeypatch): + """browser.allow_private_urls works as legacy fallback.""" + monkeypatch.delenv("HERMES_ALLOW_PRIVATE_URLS", raising=False) + cfg = {"browser": {"allow_private_urls": True}} + with patch("hermes_cli.config.read_raw_config", return_value=cfg): + assert _global_allow_private_urls() is True + + def test_config_security_takes_precedence_over_browser(self, monkeypatch): + """security section is checked before browser section.""" + monkeypatch.delenv("HERMES_ALLOW_PRIVATE_URLS", raising=False) + cfg = {"security": {"allow_private_urls": True}, "browser": {"allow_private_urls": False}} + with patch("hermes_cli.config.read_raw_config", return_value=cfg): + assert _global_allow_private_urls() is True + + def test_env_var_overrides_config(self, monkeypatch): + """Env var takes priority over config.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "false") + cfg = {"security": {"allow_private_urls": True}} + with patch("hermes_cli.config.read_raw_config", return_value=cfg): + assert _global_allow_private_urls() is False + + def test_result_is_cached(self, monkeypatch): + """Second call uses cached result, doesn't re-read config.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + assert _global_allow_private_urls() is True + # Change env after first call — should still be True (cached) + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "false") + assert _global_allow_private_urls() is True + + +class TestAllowPrivateUrlsIntegration: + """Integration tests: is_safe_url respects the global toggle.""" + + @pytest.fixture(autouse=True) + def _reset_cache(self): + _reset_allow_private_cache() + yield + _reset_allow_private_cache() + + def test_private_ip_allowed_when_toggle_on(self, monkeypatch): + """Private IPs pass is_safe_url when toggle is enabled.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("192.168.1.1", 0)), + ]): + assert is_safe_url("http://router.local") is True + + def test_benchmark_ip_allowed_when_toggle_on(self, monkeypatch): + """198.18.x.x (benchmark/OpenWrt proxy range) passes when toggle is on.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("198.18.23.183", 0)), + ]): + assert is_safe_url("https://nousresearch.com") is True + + def test_cgnat_allowed_when_toggle_on(self, monkeypatch): + """CGNAT range (100.64.0.0/10) passes when toggle is on.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("100.100.100.100", 0)), + ]): + assert is_safe_url("http://tailscale-peer.example/") is True + + def test_localhost_allowed_when_toggle_on(self, monkeypatch): + """Even localhost passes when toggle is on.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("127.0.0.1", 0)), + ]): + assert is_safe_url("http://localhost:8080/api") is True + + # --- Cloud metadata always blocked regardless of toggle --- + + def test_metadata_hostname_blocked_even_with_toggle(self, monkeypatch): + """metadata.google.internal is ALWAYS blocked.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + assert is_safe_url("http://metadata.google.internal/computeMetadata/v1/") is False + + def test_metadata_goog_blocked_even_with_toggle(self, monkeypatch): + """metadata.goog is ALWAYS blocked.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + assert is_safe_url("http://metadata.goog/computeMetadata/v1/") is False + + def test_metadata_ip_blocked_even_with_toggle(self, monkeypatch): + """169.254.169.254 (AWS/GCP metadata IP) is ALWAYS blocked.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("169.254.169.254", 0)), + ]): + assert is_safe_url("http://169.254.169.254/latest/meta-data/") is False + + def test_metadata_ipv6_blocked_even_with_toggle(self, monkeypatch): + """fd00:ec2::254 (AWS IPv6 metadata) is ALWAYS blocked.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + with patch("socket.getaddrinfo", return_value=[ + (10, 1, 6, "", ("fd00:ec2::254", 0, 0, 0)), + ]): + assert is_safe_url("http://[fd00:ec2::254]/latest/") is False + + def test_ecs_metadata_blocked_even_with_toggle(self, monkeypatch): + """169.254.170.2 (AWS ECS task metadata) is ALWAYS blocked.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("169.254.170.2", 0)), + ]): + assert is_safe_url("http://169.254.170.2/v2/credentials") is False + + def test_alibaba_metadata_blocked_even_with_toggle(self, monkeypatch): + """100.100.100.200 (Alibaba Cloud metadata) is ALWAYS blocked.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("100.100.100.200", 0)), + ]): + assert is_safe_url("http://100.100.100.200/latest/meta-data/") is False + + def test_azure_wire_server_blocked_even_with_toggle(self, monkeypatch): + """169.254.169.253 (Azure IMDS wire server) is ALWAYS blocked.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("169.254.169.253", 0)), + ]): + assert is_safe_url("http://169.254.169.253/") is False + + def test_entire_link_local_blocked_even_with_toggle(self, monkeypatch): + """Any 169.254.x.x address is ALWAYS blocked (entire link-local range).""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + with patch("socket.getaddrinfo", return_value=[ + (2, 1, 6, "", ("169.254.42.99", 0)), + ]): + assert is_safe_url("http://169.254.42.99/anything") is False + + def test_dns_failure_still_blocked_with_toggle(self, monkeypatch): + """DNS failures are still blocked even with toggle on.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + with patch("socket.getaddrinfo", side_effect=socket.gaierror("fail")): + assert is_safe_url("https://nonexistent.example.com") is False + + def test_empty_url_still_blocked_with_toggle(self, monkeypatch): + """Empty URLs are still blocked.""" + monkeypatch.setenv("HERMES_ALLOW_PRIVATE_URLS", "true") + assert is_safe_url("") is False diff --git a/tools/url_safety.py b/tools/url_safety.py index c961f722c..7ff09ebb5 100644 --- a/tools/url_safety.py +++ b/tools/url_safety.py @@ -5,6 +5,13 @@ skill could trick the agent into fetching internal resources like cloud metadata endpoints (169.254.169.254), localhost services, or private network hosts. +The check can be globally disabled via ``security.allow_private_urls: true`` +in config.yaml for environments where DNS resolves external domains to +private/benchmark-range IPs (OpenWrt routers, corporate proxies, VPNs +that use 198.18.0.0/15 or 100.64.0.0/10). Even when disabled, cloud +metadata hostnames (metadata.google.internal, 169.254.169.254) are +**always** blocked — those are never legitimate agent targets. + Limitations (documented, not fixable at pre-flight level): - DNS rebinding (TOCTOU): an attacker-controlled DNS server with TTL=0 can return a public IP for the check, then a private IP for the actual @@ -18,17 +25,35 @@ Limitations (documented, not fixable at pre-flight level): import ipaddress import logging +import os import socket from urllib.parse import urlparse logger = logging.getLogger(__name__) # Hostnames that should always be blocked regardless of IP resolution +# or any config toggle. These are cloud metadata endpoints that an +# attacker could use to steal instance credentials. _BLOCKED_HOSTNAMES = frozenset({ "metadata.google.internal", "metadata.goog", }) +# IPs and networks that should always be blocked regardless of the +# allow_private_urls toggle. These are cloud metadata / credential +# endpoints — the #1 SSRF target — and the link-local range where +# they all live. +_ALWAYS_BLOCKED_IPS = frozenset({ + ipaddress.ip_address("169.254.169.254"), # AWS/GCP/Azure/DO/Oracle metadata + ipaddress.ip_address("169.254.170.2"), # AWS ECS task metadata (task IAM creds) + ipaddress.ip_address("169.254.169.253"), # Azure IMDS wire server + ipaddress.ip_address("fd00:ec2::254"), # AWS metadata (IPv6) + ipaddress.ip_address("100.100.100.200"), # Alibaba Cloud metadata +}) +_ALWAYS_BLOCKED_NETWORKS = ( + ipaddress.ip_network("169.254.0.0/16"), # Entire link-local range (no legit agent target) +) + # Exact HTTPS hostnames allowed to resolve to private/benchmark-space IPs. # This is intentionally narrow: QQ media downloads can legitimately resolve # to 198.18.0.0/15 behind local proxy/benchmark infrastructure. @@ -42,6 +67,67 @@ _TRUSTED_PRIVATE_IP_HOSTS = frozenset({ # VPNs, and some cloud internal networks. _CGNAT_NETWORK = ipaddress.ip_network("100.64.0.0/10") +# --------------------------------------------------------------------------- +# Global toggle: allow private/internal IP resolution +# --------------------------------------------------------------------------- +# Cached after first read so we don't hit the filesystem on every URL check. +_allow_private_resolved = False +_cached_allow_private: bool = False + + +def _global_allow_private_urls() -> bool: + """Return True when the user has opted out of private-IP blocking. + + Checks (in priority order): + 1. ``HERMES_ALLOW_PRIVATE_URLS`` env var (``true``/``1``/``yes``) + 2. ``security.allow_private_urls`` in config.yaml + 3. ``browser.allow_private_urls`` in config.yaml (legacy / backward compat) + + Result is cached for the process lifetime. + """ + global _allow_private_resolved, _cached_allow_private + if _allow_private_resolved: + return _cached_allow_private + + _allow_private_resolved = True + _cached_allow_private = False # safe default + + # 1. Env var override (highest priority) + env_val = os.getenv("HERMES_ALLOW_PRIVATE_URLS", "").strip().lower() + if env_val in ("true", "1", "yes"): + _cached_allow_private = True + return _cached_allow_private + if env_val in ("false", "0", "no"): + # Explicit false — don't fall through to config + return _cached_allow_private + + # 2. Config file + try: + from hermes_cli.config import read_raw_config + cfg = read_raw_config() + # security.allow_private_urls (preferred) + sec = cfg.get("security", {}) + if isinstance(sec, dict) and sec.get("allow_private_urls"): + _cached_allow_private = True + return _cached_allow_private + # browser.allow_private_urls (legacy fallback) + browser = cfg.get("browser", {}) + if isinstance(browser, dict) and browser.get("allow_private_urls"): + _cached_allow_private = True + return _cached_allow_private + except Exception: + # Config unavailable (e.g. tests, early import) — keep default + pass + + return _cached_allow_private + + +def _reset_allow_private_cache() -> None: + """Reset the cached toggle — only for tests.""" + global _allow_private_resolved, _cached_allow_private + _allow_private_resolved = False + _cached_allow_private = False + def _is_blocked_ip(ip: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool: """Return True if the IP should be blocked for SSRF protection.""" @@ -65,6 +151,11 @@ def is_safe_url(url: str) -> bool: Resolves the hostname to an IP and checks against private ranges. Fails closed: DNS errors and unexpected exceptions block the request. + + When ``security.allow_private_urls`` is enabled (or the env var + ``HERMES_ALLOW_PRIVATE_URLS=true``), private-IP blocking is skipped. + Cloud metadata endpoints (169.254.169.254, metadata.google.internal) + remain blocked regardless — they are never legitimate agent targets. """ try: parsed = urlparse(url) @@ -73,11 +164,14 @@ def is_safe_url(url: str) -> bool: if not hostname: return False - # Block known internal hostnames + # Block known internal hostnames — ALWAYS, even with toggle on if hostname in _BLOCKED_HOSTNAMES: logger.warning("Blocked request to internal hostname: %s", hostname) return False + # Check the global toggle AFTER blocking metadata hostnames + allow_all_private = _global_allow_private_urls() + allow_private_ip = _allows_private_ip_resolution(hostname, scheme) # Try to resolve and check IP @@ -96,14 +190,27 @@ def is_safe_url(url: str) -> bool: except ValueError: continue - if not allow_private_ip and _is_blocked_ip(ip): + # Always block cloud metadata IPs and link-local, even with toggle on + if ip in _ALWAYS_BLOCKED_IPS or any(ip in net for net in _ALWAYS_BLOCKED_NETWORKS): + logger.warning( + "Blocked request to cloud metadata address: %s -> %s", + hostname, ip_str, + ) + return False + + if not allow_all_private and not allow_private_ip and _is_blocked_ip(ip): logger.warning( "Blocked request to private/internal address: %s -> %s", hostname, ip_str, ) return False - if allow_private_ip: + if allow_all_private: + logger.debug( + "Allowing private/internal resolution (security.allow_private_urls=true): %s", + hostname, + ) + elif allow_private_ip: logger.debug( "Allowing trusted hostname despite private/internal resolution: %s", hostname,