diff --git a/tests/tools/test_browser_eval_ssrf.py b/tests/tools/test_browser_eval_ssrf.py new file mode 100644 index 00000000000..654e61b98aa --- /dev/null +++ b/tests/tools/test_browser_eval_ssrf.py @@ -0,0 +1,229 @@ +"""Tests that browser_console(expression=...) cannot bypass the SSRF guard. + +browser_snapshot / browser_vision re-check the page URL before returning +content, but ``_browser_eval`` returns arbitrary JS results directly. Two +sub-paths could read private content without ever touching snapshot/vision: + + 1. Direct fetch: ``fetch('http://127.0.0.1/secret').then(r => r.text())`` + — the page URL stays public, so the post-eval recheck can't see it. + Closed by a pre-scan of the expression for private-host URL literals. + 2. Navigate-then-read: ``location.href = 'http://127.0.0.1/'`` then a later + eval reads ``document.body.innerText`` — closed by re-checking the page + URL after the eval runs. + +This is the sibling fix for the eval return-value path of issue #44731. +""" + +import json + +import pytest + +from tools import browser_tool + + +PRIVATE_URL = "http://127.0.0.1:8080/secret" +PUBLIC_URL = "https://example.com/page" +METADATA_URL = "http://169.254.169.254/latest/meta-data/" + + +@pytest.fixture(autouse=True) +def _no_camofox(monkeypatch): + monkeypatch.setattr(browser_tool, "_is_camofox_mode", lambda: False) + # No supervisor — force the subprocess fallback path by default. + monkeypatch.setattr(browser_tool, "_last_session_key", lambda key: key) + + +def _eval(expression, task_id="test"): + return json.loads(browser_tool._browser_eval(expression, task_id=task_id)) + + +# --------------------------------------------------------------------------- +# Sub-path 1: direct private-host fetch literal in the expression (pre-scan) +# --------------------------------------------------------------------------- + + +class TestExpressionPreScan: + def _guard_on(self, monkeypatch): + monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False) + monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False) + monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False) + + def test_blocks_private_fetch_literal(self, monkeypatch): + self._guard_on(monkeypatch) + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False) + monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False) + + called = {"n": 0} + + def _run(task_id, command, args=None, **kwargs): + called["n"] += 1 + return {"success": True, "data": {"result": "leaked-content"}} + + monkeypatch.setattr(browser_tool, "_run_browser_command", _run) + + result = _eval(f"fetch('{PRIVATE_URL}').then(r => r.text())") + assert result["success"] is False + assert "private or internal address" in result["error"] + assert PRIVATE_URL in result["error"] + # Expression never executed — blocked before any browser command. + assert called["n"] == 0 + + def test_blocks_metadata_fetch_literal(self, monkeypatch): + self._guard_on(monkeypatch) + # Public-safe to is_safe_url, but the always-blocked floor catches IMDS. + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True) + monkeypatch.setattr( + browser_tool, "_is_always_blocked_url", + lambda url: "169.254.169.254" in url, + ) + monkeypatch.setattr( + browser_tool, "_run_browser_command", + lambda *a, **k: {"success": True, "data": {"result": "creds"}}, + ) + + result = _eval(f"fetch('{METADATA_URL}')") + assert result["success"] is False + assert "private or internal address" in result["error"] + + def test_allows_public_fetch_literal(self, monkeypatch): + self._guard_on(monkeypatch) + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True) + monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False) + # After the (public) eval, the page-URL recheck must also see a public URL. + monkeypatch.setattr( + browser_tool, "_run_browser_command", + lambda task_id, command, args=None, **k: ( + {"success": True, "data": {"result": PUBLIC_URL}} + if args == ["window.location.href"] + else {"success": True, "data": {"result": "ok"}} + ), + ) + + result = _eval(f"fetch('{PUBLIC_URL}').then(r => r.text())") + assert result["success"] is True + assert result["result"] == "ok" + + def test_skips_prescan_for_local_backend(self, monkeypatch): + monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: True) + monkeypatch.setattr( + browser_tool, "_run_browser_command", + lambda *a, **k: {"success": True, "data": {"result": "local-ok"}}, + ) + result = _eval(f"fetch('{PRIVATE_URL}')") + assert result["success"] is True + assert result["result"] == "local-ok" + + def test_skips_prescan_for_local_sidecar(self, monkeypatch): + monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False) + monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: True) + monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False) + monkeypatch.setattr( + browser_tool, "_run_browser_command", + lambda *a, **k: {"success": True, "data": {"result": "sidecar-ok"}}, + ) + result = _eval(f"fetch('{PRIVATE_URL}')") + assert result["success"] is True + + def test_skips_prescan_when_allow_private(self, monkeypatch): + monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False) + monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False) + monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: True) + monkeypatch.setattr( + browser_tool, "_run_browser_command", + lambda *a, **k: {"success": True, "data": {"result": "allowed"}}, + ) + result = _eval(f"fetch('{PRIVATE_URL}')") + assert result["success"] is True + + +# --------------------------------------------------------------------------- +# Sub-path 2: navigate-then-read (post-eval page-URL recheck) +# --------------------------------------------------------------------------- + + +class TestPostEvalPageRecheck: + def _guard_on(self, monkeypatch): + monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False) + monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False) + monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False) + + def test_blocks_when_page_navigated_private(self, monkeypatch): + self._guard_on(monkeypatch) + # Expression itself has no URL literal (reads the DOM), so the pre-scan + # passes; the danger is that the page was navigated to a private URL by + # an earlier eval. The recheck must catch it. + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False) + monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False) + monkeypatch.setattr( + browser_tool, "_run_browser_command", + lambda task_id, command, args=None, **k: ( + {"success": True, "data": {"result": PRIVATE_URL}} + if args == ["window.location.href"] + else {"success": True, "data": {"result": "secret DOM text"}} + ), + ) + + result = _eval("document.body.innerText") + assert result["success"] is False + assert "private or internal address" in result["error"] + assert PRIVATE_URL in result["error"] + + def test_allows_when_page_public(self, monkeypatch): + self._guard_on(monkeypatch) + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True) + monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False) + monkeypatch.setattr( + browser_tool, "_run_browser_command", + lambda task_id, command, args=None, **k: ( + {"success": True, "data": {"result": PUBLIC_URL}} + if args == ["window.location.href"] + else {"success": True, "data": {"result": "public DOM text"}} + ), + ) + + result = _eval("document.body.innerText") + assert result["success"] is True + assert result["result"] == "public DOM text" + + def test_fail_open_when_url_probe_fails(self, monkeypatch): + """If the window.location.href probe errors, don't block (fail-open).""" + self._guard_on(monkeypatch) + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False) + monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False) + + def _run(task_id, command, args=None, **k): + if args == ["window.location.href"]: + return {"success": False, "error": "CDP probe failed"} + return {"success": True, "data": {"result": "dom text"}} + + monkeypatch.setattr(browser_tool, "_run_browser_command", _run) + + result = _eval("document.body.innerText") + assert result["success"] is True + assert result["result"] == "dom text" + + +# --------------------------------------------------------------------------- +# Helper-level unit tests +# --------------------------------------------------------------------------- + + +class TestExpressionScanHelper: + def test_returns_first_private_literal(self, monkeypatch): + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: "127.0.0.1" not in url) + monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False) + out = browser_tool._expression_targets_private_url( + "fetch('https://example.com'); fetch('http://127.0.0.1/x')" + ) + assert out == "http://127.0.0.1/x" + + def test_none_when_no_url(self, monkeypatch): + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True) + monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False) + assert browser_tool._expression_targets_private_url("document.title") is None + + def test_strips_trailing_punctuation(self, monkeypatch): + monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False) + monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False) + out = browser_tool._expression_targets_private_url("location.href='http://10.0.0.1/';") + assert out == "http://10.0.0.1/" diff --git a/tools/browser_tool.py b/tools/browser_tool.py index ec38a0a1eaa..018fc008c8d 100644 --- a/tools/browser_tool.py +++ b/tools/browser_tool.py @@ -3031,6 +3031,74 @@ def browser_console(clear: bool = False, expression: Optional[str] = None, task_ return json.dumps(response, ensure_ascii=False) +def _eval_ssrf_guard_active(effective_task_id: str) -> bool: + """Return True when eval-driven private-network access must be guarded. + + Matches the gating used by ``browser_navigate`` / ``browser_snapshot`` / + ``browser_vision``: the SSRF guard is only meaningful for non-local + backends (cloud browser, or a containerized terminal whose browser-on-host + can reach internal networks the terminal can't), and is skipped for local + sidecar sessions and when ``allow_private_urls`` is set. + """ + return ( + not _is_local_backend() + and not _is_local_sidecar_key(effective_task_id) + and not _allow_private_urls() + ) + + +# URL-shaped literals embedded in a JS expression (http/https only). Used to +# pre-screen ``browser_console(expression=...)`` calls that fetch/XHR/navigate +# to a private host directly — that path never updates ``location.href`` so the +# post-eval page-URL recheck below can't see it. +_JS_URL_LITERAL_RE = re.compile(r"""https?://[^\s'"`)\]<>]+""", re.IGNORECASE) + + +def _expression_targets_private_url(expression: str) -> Optional[str]: + """Return the first private/always-blocked URL literal in a JS expression. + + Best-effort: scans for ``http(s)://...`` literals (fetch/XHR/navigation + targets the agent may have embedded) and returns the first one that targets + a private/internal address or the always-blocked cloud-metadata floor. + Returns ``None`` when no such literal is found. + """ + if not isinstance(expression, str): + return None + for match in _JS_URL_LITERAL_RE.findall(expression): + candidate = match.rstrip(".,;") + if _is_always_blocked_url(candidate) or not _is_safe_url(candidate): + return candidate + return None + + +def _current_page_private_url(effective_task_id: str) -> Optional[str]: + """Return the current page URL when it targets a private/internal address. + + Reads ``window.location.href`` via a low-cost eval and returns it when the + page has been navigated (e.g. via ``location.href = '...'`` in a prior + eval) to an address the SSRF guard would reject. Returns ``None`` when the + page is public, the URL can't be determined, or the check errors (fail-open + on probe failure, matching the snapshot/vision guards). + """ + try: + url_result = _run_browser_command( + effective_task_id, "eval", ["window.location.href"], + timeout=5, _engine_override="auto", + ) + if url_result.get("success"): + current_url = ( + url_result.get("data", {}).get("result", "") + .strip().strip('"').strip("'") + ) + if current_url and ( + _is_always_blocked_url(current_url) or not _is_safe_url(current_url) + ): + return current_url + except Exception as exc: + logger.debug("_current_page_private_url: probe failed (%s)", exc) + return None + + def _browser_eval(expression: str, task_id: Optional[str] = None) -> str: """Evaluate a JavaScript expression in the page context and return the result.""" if _is_camofox_mode(): @@ -3038,6 +3106,27 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str: effective_task_id = _last_session_key(task_id or "default") + # ── Private-network guard (eval return-value path) ────────────────────── + # browser_snapshot / browser_vision re-check the page URL before returning + # content, but eval returns arbitrary JS results directly — an attacker can + # read a private page via `fetch('http://127.0.0.1/secret')` or by reading + # the DOM after `location.href = 'http://127.0.0.1/'`, never touching + # snapshot/vision. Close both sub-paths on the same gating condition: + # 1. Pre-scan the expression for private-host URL literals (direct fetch). + # 2. After eval, re-check the page URL (navigate-then-read). + if _eval_ssrf_guard_active(effective_task_id): + blocked_literal = _expression_targets_private_url(expression) + if blocked_literal: + return json.dumps({ + "success": False, + "error": ( + "Blocked: JavaScript expression targets a private or " + f"internal address ({blocked_literal}). Reading internal " + "endpoints via browser_console is not permitted in this " + "browser mode." + ), + }, ensure_ascii=False) + # --- Fast path: route through the supervisor's persistent CDP WS --------- # When a CDPSupervisor is alive for this task_id, ``Runtime.evaluate`` runs # on the already-connected WebSocket — zero subprocess startup cost vs @@ -3059,6 +3148,20 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str: parsed = json.loads(raw_result) except (json.JSONDecodeError, ValueError): pass # keep as string + # Post-eval page-URL recheck: if this (or a prior) eval + # navigated the page to a private address, withhold the result. + if _eval_ssrf_guard_active(effective_task_id): + _blocked_url = _current_page_private_url(effective_task_id) + if _blocked_url: + return json.dumps({ + "success": False, + "error": ( + "Blocked: page URL targets a private or internal " + f"address ({_blocked_url}). This may have been " + "caused by a JavaScript navigation via " + "browser_console." + ), + }, ensure_ascii=False) response = { "success": True, "result": parsed, @@ -3134,6 +3237,19 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str: "result": parsed, "result_type": type(parsed).__name__, } + # Post-eval page-URL recheck: if this (or a prior) eval navigated the page + # to a private address, withhold the result (mirrors the supervisor path). + if _eval_ssrf_guard_active(effective_task_id): + _blocked_url = _current_page_private_url(effective_task_id) + if _blocked_url: + return json.dumps({ + "success": False, + "error": ( + "Blocked: page URL targets a private or internal address " + f"({_blocked_url}). This may have been caused by a " + "JavaScript navigation via browser_console." + ), + }, ensure_ascii=False) return json.dumps(_copy_fallback_warning(response, result), ensure_ascii=False, default=str)