fix(browser): close eval return-value SSRF bypass (sibling of #44731)

The snapshot/vision guards re-check the page URL before returning content,
but browser_console(expression=...) -> _browser_eval returns arbitrary JS
results directly, leaving two same-class bypasses open:

  1. Direct fetch: fetch('http://127.0.0.1/secret').then(r=>r.text()) reads
     a private endpoint and returns the body — the page URL stays public so
     the post-eval recheck never sees it.
  2. Navigate-then-read: location.href='http://127.0.0.1/' then a later eval
     reads document.body.innerText.

Guard _browser_eval on the same condition as navigate/snapshot/vision
(not local backend, not local sidecar, not allow_private_urls):
  - pre-scan the expression for private/always-blocked URL literals
  - re-check window.location.href after the eval at both success-return
    sites (supervisor fast-path + subprocess fallback)

Probe failures fail-open (matching the snapshot/vision guards).
This commit is contained in:
teknium1 2026-06-28 02:23:37 -07:00 committed by Teknium
parent 0ae6196087
commit 7ef04ae7a7
2 changed files with 345 additions and 0 deletions

View file

@ -0,0 +1,229 @@
"""Tests that browser_console(expression=...) cannot bypass the SSRF guard.
browser_snapshot / browser_vision re-check the page URL before returning
content, but ``_browser_eval`` returns arbitrary JS results directly. Two
sub-paths could read private content without ever touching snapshot/vision:
1. Direct fetch: ``fetch('http://127.0.0.1/secret').then(r => r.text())``
the page URL stays public, so the post-eval recheck can't see it.
Closed by a pre-scan of the expression for private-host URL literals.
2. Navigate-then-read: ``location.href = 'http://127.0.0.1/'`` then a later
eval reads ``document.body.innerText`` closed by re-checking the page
URL after the eval runs.
This is the sibling fix for the eval return-value path of issue #44731.
"""
import json
import pytest
from tools import browser_tool
PRIVATE_URL = "http://127.0.0.1:8080/secret"
PUBLIC_URL = "https://example.com/page"
METADATA_URL = "http://169.254.169.254/latest/meta-data/"
@pytest.fixture(autouse=True)
def _no_camofox(monkeypatch):
monkeypatch.setattr(browser_tool, "_is_camofox_mode", lambda: False)
# No supervisor — force the subprocess fallback path by default.
monkeypatch.setattr(browser_tool, "_last_session_key", lambda key: key)
def _eval(expression, task_id="test"):
return json.loads(browser_tool._browser_eval(expression, task_id=task_id))
# ---------------------------------------------------------------------------
# Sub-path 1: direct private-host fetch literal in the expression (pre-scan)
# ---------------------------------------------------------------------------
class TestExpressionPreScan:
def _guard_on(self, monkeypatch):
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False)
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
def test_blocks_private_fetch_literal(self, monkeypatch):
self._guard_on(monkeypatch)
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
called = {"n": 0}
def _run(task_id, command, args=None, **kwargs):
called["n"] += 1
return {"success": True, "data": {"result": "leaked-content"}}
monkeypatch.setattr(browser_tool, "_run_browser_command", _run)
result = _eval(f"fetch('{PRIVATE_URL}').then(r => r.text())")
assert result["success"] is False
assert "private or internal address" in result["error"]
assert PRIVATE_URL in result["error"]
# Expression never executed — blocked before any browser command.
assert called["n"] == 0
def test_blocks_metadata_fetch_literal(self, monkeypatch):
self._guard_on(monkeypatch)
# Public-safe to is_safe_url, but the always-blocked floor catches IMDS.
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
monkeypatch.setattr(
browser_tool, "_is_always_blocked_url",
lambda url: "169.254.169.254" in url,
)
monkeypatch.setattr(
browser_tool, "_run_browser_command",
lambda *a, **k: {"success": True, "data": {"result": "creds"}},
)
result = _eval(f"fetch('{METADATA_URL}')")
assert result["success"] is False
assert "private or internal address" in result["error"]
def test_allows_public_fetch_literal(self, monkeypatch):
self._guard_on(monkeypatch)
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
# After the (public) eval, the page-URL recheck must also see a public URL.
monkeypatch.setattr(
browser_tool, "_run_browser_command",
lambda task_id, command, args=None, **k: (
{"success": True, "data": {"result": PUBLIC_URL}}
if args == ["window.location.href"]
else {"success": True, "data": {"result": "ok"}}
),
)
result = _eval(f"fetch('{PUBLIC_URL}').then(r => r.text())")
assert result["success"] is True
assert result["result"] == "ok"
def test_skips_prescan_for_local_backend(self, monkeypatch):
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: True)
monkeypatch.setattr(
browser_tool, "_run_browser_command",
lambda *a, **k: {"success": True, "data": {"result": "local-ok"}},
)
result = _eval(f"fetch('{PRIVATE_URL}')")
assert result["success"] is True
assert result["result"] == "local-ok"
def test_skips_prescan_for_local_sidecar(self, monkeypatch):
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: True)
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
monkeypatch.setattr(
browser_tool, "_run_browser_command",
lambda *a, **k: {"success": True, "data": {"result": "sidecar-ok"}},
)
result = _eval(f"fetch('{PRIVATE_URL}')")
assert result["success"] is True
def test_skips_prescan_when_allow_private(self, monkeypatch):
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False)
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: True)
monkeypatch.setattr(
browser_tool, "_run_browser_command",
lambda *a, **k: {"success": True, "data": {"result": "allowed"}},
)
result = _eval(f"fetch('{PRIVATE_URL}')")
assert result["success"] is True
# ---------------------------------------------------------------------------
# Sub-path 2: navigate-then-read (post-eval page-URL recheck)
# ---------------------------------------------------------------------------
class TestPostEvalPageRecheck:
def _guard_on(self, monkeypatch):
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False)
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
def test_blocks_when_page_navigated_private(self, monkeypatch):
self._guard_on(monkeypatch)
# Expression itself has no URL literal (reads the DOM), so the pre-scan
# passes; the danger is that the page was navigated to a private URL by
# an earlier eval. The recheck must catch it.
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
monkeypatch.setattr(
browser_tool, "_run_browser_command",
lambda task_id, command, args=None, **k: (
{"success": True, "data": {"result": PRIVATE_URL}}
if args == ["window.location.href"]
else {"success": True, "data": {"result": "secret DOM text"}}
),
)
result = _eval("document.body.innerText")
assert result["success"] is False
assert "private or internal address" in result["error"]
assert PRIVATE_URL in result["error"]
def test_allows_when_page_public(self, monkeypatch):
self._guard_on(monkeypatch)
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
monkeypatch.setattr(
browser_tool, "_run_browser_command",
lambda task_id, command, args=None, **k: (
{"success": True, "data": {"result": PUBLIC_URL}}
if args == ["window.location.href"]
else {"success": True, "data": {"result": "public DOM text"}}
),
)
result = _eval("document.body.innerText")
assert result["success"] is True
assert result["result"] == "public DOM text"
def test_fail_open_when_url_probe_fails(self, monkeypatch):
"""If the window.location.href probe errors, don't block (fail-open)."""
self._guard_on(monkeypatch)
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
def _run(task_id, command, args=None, **k):
if args == ["window.location.href"]:
return {"success": False, "error": "CDP probe failed"}
return {"success": True, "data": {"result": "dom text"}}
monkeypatch.setattr(browser_tool, "_run_browser_command", _run)
result = _eval("document.body.innerText")
assert result["success"] is True
assert result["result"] == "dom text"
# ---------------------------------------------------------------------------
# Helper-level unit tests
# ---------------------------------------------------------------------------
class TestExpressionScanHelper:
def test_returns_first_private_literal(self, monkeypatch):
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: "127.0.0.1" not in url)
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
out = browser_tool._expression_targets_private_url(
"fetch('https://example.com'); fetch('http://127.0.0.1/x')"
)
assert out == "http://127.0.0.1/x"
def test_none_when_no_url(self, monkeypatch):
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
assert browser_tool._expression_targets_private_url("document.title") is None
def test_strips_trailing_punctuation(self, monkeypatch):
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
out = browser_tool._expression_targets_private_url("location.href='http://10.0.0.1/';")
assert out == "http://10.0.0.1/"

View file

@ -3031,6 +3031,74 @@ def browser_console(clear: bool = False, expression: Optional[str] = None, task_
return json.dumps(response, ensure_ascii=False)
def _eval_ssrf_guard_active(effective_task_id: str) -> bool:
"""Return True when eval-driven private-network access must be guarded.
Matches the gating used by ``browser_navigate`` / ``browser_snapshot`` /
``browser_vision``: the SSRF guard is only meaningful for non-local
backends (cloud browser, or a containerized terminal whose browser-on-host
can reach internal networks the terminal can't), and is skipped for local
sidecar sessions and when ``allow_private_urls`` is set.
"""
return (
not _is_local_backend()
and not _is_local_sidecar_key(effective_task_id)
and not _allow_private_urls()
)
# URL-shaped literals embedded in a JS expression (http/https only). Used to
# pre-screen ``browser_console(expression=...)`` calls that fetch/XHR/navigate
# to a private host directly — that path never updates ``location.href`` so the
# post-eval page-URL recheck below can't see it.
_JS_URL_LITERAL_RE = re.compile(r"""https?://[^\s'"`)\]<>]+""", re.IGNORECASE)
def _expression_targets_private_url(expression: str) -> Optional[str]:
"""Return the first private/always-blocked URL literal in a JS expression.
Best-effort: scans for ``http(s)://...`` literals (fetch/XHR/navigation
targets the agent may have embedded) and returns the first one that targets
a private/internal address or the always-blocked cloud-metadata floor.
Returns ``None`` when no such literal is found.
"""
if not isinstance(expression, str):
return None
for match in _JS_URL_LITERAL_RE.findall(expression):
candidate = match.rstrip(".,;")
if _is_always_blocked_url(candidate) or not _is_safe_url(candidate):
return candidate
return None
def _current_page_private_url(effective_task_id: str) -> Optional[str]:
"""Return the current page URL when it targets a private/internal address.
Reads ``window.location.href`` via a low-cost eval and returns it when the
page has been navigated (e.g. via ``location.href = '...'`` in a prior
eval) to an address the SSRF guard would reject. Returns ``None`` when the
page is public, the URL can't be determined, or the check errors (fail-open
on probe failure, matching the snapshot/vision guards).
"""
try:
url_result = _run_browser_command(
effective_task_id, "eval", ["window.location.href"],
timeout=5, _engine_override="auto",
)
if url_result.get("success"):
current_url = (
url_result.get("data", {}).get("result", "")
.strip().strip('"').strip("'")
)
if current_url and (
_is_always_blocked_url(current_url) or not _is_safe_url(current_url)
):
return current_url
except Exception as exc:
logger.debug("_current_page_private_url: probe failed (%s)", exc)
return None
def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
"""Evaluate a JavaScript expression in the page context and return the result."""
if _is_camofox_mode():
@ -3038,6 +3106,27 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
effective_task_id = _last_session_key(task_id or "default")
# ── Private-network guard (eval return-value path) ──────────────────────
# browser_snapshot / browser_vision re-check the page URL before returning
# content, but eval returns arbitrary JS results directly — an attacker can
# read a private page via `fetch('http://127.0.0.1/secret')` or by reading
# the DOM after `location.href = 'http://127.0.0.1/'`, never touching
# snapshot/vision. Close both sub-paths on the same gating condition:
# 1. Pre-scan the expression for private-host URL literals (direct fetch).
# 2. After eval, re-check the page URL (navigate-then-read).
if _eval_ssrf_guard_active(effective_task_id):
blocked_literal = _expression_targets_private_url(expression)
if blocked_literal:
return json.dumps({
"success": False,
"error": (
"Blocked: JavaScript expression targets a private or "
f"internal address ({blocked_literal}). Reading internal "
"endpoints via browser_console is not permitted in this "
"browser mode."
),
}, ensure_ascii=False)
# --- Fast path: route through the supervisor's persistent CDP WS ---------
# When a CDPSupervisor is alive for this task_id, ``Runtime.evaluate`` runs
# on the already-connected WebSocket — zero subprocess startup cost vs
@ -3059,6 +3148,20 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
parsed = json.loads(raw_result)
except (json.JSONDecodeError, ValueError):
pass # keep as string
# Post-eval page-URL recheck: if this (or a prior) eval
# navigated the page to a private address, withhold the result.
if _eval_ssrf_guard_active(effective_task_id):
_blocked_url = _current_page_private_url(effective_task_id)
if _blocked_url:
return json.dumps({
"success": False,
"error": (
"Blocked: page URL targets a private or internal "
f"address ({_blocked_url}). This may have been "
"caused by a JavaScript navigation via "
"browser_console."
),
}, ensure_ascii=False)
response = {
"success": True,
"result": parsed,
@ -3134,6 +3237,19 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
"result": parsed,
"result_type": type(parsed).__name__,
}
# Post-eval page-URL recheck: if this (or a prior) eval navigated the page
# to a private address, withhold the result (mirrors the supervisor path).
if _eval_ssrf_guard_active(effective_task_id):
_blocked_url = _current_page_private_url(effective_task_id)
if _blocked_url:
return json.dumps({
"success": False,
"error": (
"Blocked: page URL targets a private or internal address "
f"({_blocked_url}). This may have been caused by a "
"JavaScript navigation via browser_console."
),
}, ensure_ascii=False)
return json.dumps(_copy_fallback_warning(response, result), ensure_ascii=False, default=str)