mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-07-01 12:02:05 +00:00
fix(browser): close eval return-value SSRF bypass (sibling of #44731)
The snapshot/vision guards re-check the page URL before returning content,
but browser_console(expression=...) -> _browser_eval returns arbitrary JS
results directly, leaving two same-class bypasses open:
1. Direct fetch: fetch('http://127.0.0.1/secret').then(r=>r.text()) reads
a private endpoint and returns the body — the page URL stays public so
the post-eval recheck never sees it.
2. Navigate-then-read: location.href='http://127.0.0.1/' then a later eval
reads document.body.innerText.
Guard _browser_eval on the same condition as navigate/snapshot/vision
(not local backend, not local sidecar, not allow_private_urls):
- pre-scan the expression for private/always-blocked URL literals
- re-check window.location.href after the eval at both success-return
sites (supervisor fast-path + subprocess fallback)
Probe failures fail-open (matching the snapshot/vision guards).
This commit is contained in:
parent
0ae6196087
commit
7ef04ae7a7
2 changed files with 345 additions and 0 deletions
229
tests/tools/test_browser_eval_ssrf.py
Normal file
229
tests/tools/test_browser_eval_ssrf.py
Normal file
|
|
@ -0,0 +1,229 @@
|
|||
"""Tests that browser_console(expression=...) cannot bypass the SSRF guard.
|
||||
|
||||
browser_snapshot / browser_vision re-check the page URL before returning
|
||||
content, but ``_browser_eval`` returns arbitrary JS results directly. Two
|
||||
sub-paths could read private content without ever touching snapshot/vision:
|
||||
|
||||
1. Direct fetch: ``fetch('http://127.0.0.1/secret').then(r => r.text())``
|
||||
— the page URL stays public, so the post-eval recheck can't see it.
|
||||
Closed by a pre-scan of the expression for private-host URL literals.
|
||||
2. Navigate-then-read: ``location.href = 'http://127.0.0.1/'`` then a later
|
||||
eval reads ``document.body.innerText`` — closed by re-checking the page
|
||||
URL after the eval runs.
|
||||
|
||||
This is the sibling fix for the eval return-value path of issue #44731.
|
||||
"""
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from tools import browser_tool
|
||||
|
||||
|
||||
PRIVATE_URL = "http://127.0.0.1:8080/secret"
|
||||
PUBLIC_URL = "https://example.com/page"
|
||||
METADATA_URL = "http://169.254.169.254/latest/meta-data/"
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _no_camofox(monkeypatch):
|
||||
monkeypatch.setattr(browser_tool, "_is_camofox_mode", lambda: False)
|
||||
# No supervisor — force the subprocess fallback path by default.
|
||||
monkeypatch.setattr(browser_tool, "_last_session_key", lambda key: key)
|
||||
|
||||
|
||||
def _eval(expression, task_id="test"):
|
||||
return json.loads(browser_tool._browser_eval(expression, task_id=task_id))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sub-path 1: direct private-host fetch literal in the expression (pre-scan)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExpressionPreScan:
|
||||
def _guard_on(self, monkeypatch):
|
||||
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False)
|
||||
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
|
||||
|
||||
def test_blocks_private_fetch_literal(self, monkeypatch):
|
||||
self._guard_on(monkeypatch)
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||
|
||||
called = {"n": 0}
|
||||
|
||||
def _run(task_id, command, args=None, **kwargs):
|
||||
called["n"] += 1
|
||||
return {"success": True, "data": {"result": "leaked-content"}}
|
||||
|
||||
monkeypatch.setattr(browser_tool, "_run_browser_command", _run)
|
||||
|
||||
result = _eval(f"fetch('{PRIVATE_URL}').then(r => r.text())")
|
||||
assert result["success"] is False
|
||||
assert "private or internal address" in result["error"]
|
||||
assert PRIVATE_URL in result["error"]
|
||||
# Expression never executed — blocked before any browser command.
|
||||
assert called["n"] == 0
|
||||
|
||||
def test_blocks_metadata_fetch_literal(self, monkeypatch):
|
||||
self._guard_on(monkeypatch)
|
||||
# Public-safe to is_safe_url, but the always-blocked floor catches IMDS.
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
|
||||
monkeypatch.setattr(
|
||||
browser_tool, "_is_always_blocked_url",
|
||||
lambda url: "169.254.169.254" in url,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
browser_tool, "_run_browser_command",
|
||||
lambda *a, **k: {"success": True, "data": {"result": "creds"}},
|
||||
)
|
||||
|
||||
result = _eval(f"fetch('{METADATA_URL}')")
|
||||
assert result["success"] is False
|
||||
assert "private or internal address" in result["error"]
|
||||
|
||||
def test_allows_public_fetch_literal(self, monkeypatch):
|
||||
self._guard_on(monkeypatch)
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
|
||||
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||
# After the (public) eval, the page-URL recheck must also see a public URL.
|
||||
monkeypatch.setattr(
|
||||
browser_tool, "_run_browser_command",
|
||||
lambda task_id, command, args=None, **k: (
|
||||
{"success": True, "data": {"result": PUBLIC_URL}}
|
||||
if args == ["window.location.href"]
|
||||
else {"success": True, "data": {"result": "ok"}}
|
||||
),
|
||||
)
|
||||
|
||||
result = _eval(f"fetch('{PUBLIC_URL}').then(r => r.text())")
|
||||
assert result["success"] is True
|
||||
assert result["result"] == "ok"
|
||||
|
||||
def test_skips_prescan_for_local_backend(self, monkeypatch):
|
||||
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: True)
|
||||
monkeypatch.setattr(
|
||||
browser_tool, "_run_browser_command",
|
||||
lambda *a, **k: {"success": True, "data": {"result": "local-ok"}},
|
||||
)
|
||||
result = _eval(f"fetch('{PRIVATE_URL}')")
|
||||
assert result["success"] is True
|
||||
assert result["result"] == "local-ok"
|
||||
|
||||
def test_skips_prescan_for_local_sidecar(self, monkeypatch):
|
||||
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: True)
|
||||
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
|
||||
monkeypatch.setattr(
|
||||
browser_tool, "_run_browser_command",
|
||||
lambda *a, **k: {"success": True, "data": {"result": "sidecar-ok"}},
|
||||
)
|
||||
result = _eval(f"fetch('{PRIVATE_URL}')")
|
||||
assert result["success"] is True
|
||||
|
||||
def test_skips_prescan_when_allow_private(self, monkeypatch):
|
||||
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False)
|
||||
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: True)
|
||||
monkeypatch.setattr(
|
||||
browser_tool, "_run_browser_command",
|
||||
lambda *a, **k: {"success": True, "data": {"result": "allowed"}},
|
||||
)
|
||||
result = _eval(f"fetch('{PRIVATE_URL}')")
|
||||
assert result["success"] is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sub-path 2: navigate-then-read (post-eval page-URL recheck)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPostEvalPageRecheck:
|
||||
def _guard_on(self, monkeypatch):
|
||||
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False)
|
||||
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
|
||||
|
||||
def test_blocks_when_page_navigated_private(self, monkeypatch):
|
||||
self._guard_on(monkeypatch)
|
||||
# Expression itself has no URL literal (reads the DOM), so the pre-scan
|
||||
# passes; the danger is that the page was navigated to a private URL by
|
||||
# an earlier eval. The recheck must catch it.
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||
monkeypatch.setattr(
|
||||
browser_tool, "_run_browser_command",
|
||||
lambda task_id, command, args=None, **k: (
|
||||
{"success": True, "data": {"result": PRIVATE_URL}}
|
||||
if args == ["window.location.href"]
|
||||
else {"success": True, "data": {"result": "secret DOM text"}}
|
||||
),
|
||||
)
|
||||
|
||||
result = _eval("document.body.innerText")
|
||||
assert result["success"] is False
|
||||
assert "private or internal address" in result["error"]
|
||||
assert PRIVATE_URL in result["error"]
|
||||
|
||||
def test_allows_when_page_public(self, monkeypatch):
|
||||
self._guard_on(monkeypatch)
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
|
||||
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||
monkeypatch.setattr(
|
||||
browser_tool, "_run_browser_command",
|
||||
lambda task_id, command, args=None, **k: (
|
||||
{"success": True, "data": {"result": PUBLIC_URL}}
|
||||
if args == ["window.location.href"]
|
||||
else {"success": True, "data": {"result": "public DOM text"}}
|
||||
),
|
||||
)
|
||||
|
||||
result = _eval("document.body.innerText")
|
||||
assert result["success"] is True
|
||||
assert result["result"] == "public DOM text"
|
||||
|
||||
def test_fail_open_when_url_probe_fails(self, monkeypatch):
|
||||
"""If the window.location.href probe errors, don't block (fail-open)."""
|
||||
self._guard_on(monkeypatch)
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||
|
||||
def _run(task_id, command, args=None, **k):
|
||||
if args == ["window.location.href"]:
|
||||
return {"success": False, "error": "CDP probe failed"}
|
||||
return {"success": True, "data": {"result": "dom text"}}
|
||||
|
||||
monkeypatch.setattr(browser_tool, "_run_browser_command", _run)
|
||||
|
||||
result = _eval("document.body.innerText")
|
||||
assert result["success"] is True
|
||||
assert result["result"] == "dom text"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helper-level unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExpressionScanHelper:
|
||||
def test_returns_first_private_literal(self, monkeypatch):
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: "127.0.0.1" not in url)
|
||||
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||
out = browser_tool._expression_targets_private_url(
|
||||
"fetch('https://example.com'); fetch('http://127.0.0.1/x')"
|
||||
)
|
||||
assert out == "http://127.0.0.1/x"
|
||||
|
||||
def test_none_when_no_url(self, monkeypatch):
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
|
||||
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||
assert browser_tool._expression_targets_private_url("document.title") is None
|
||||
|
||||
def test_strips_trailing_punctuation(self, monkeypatch):
|
||||
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
|
||||
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||
out = browser_tool._expression_targets_private_url("location.href='http://10.0.0.1/';")
|
||||
assert out == "http://10.0.0.1/"
|
||||
|
|
@ -3031,6 +3031,74 @@ def browser_console(clear: bool = False, expression: Optional[str] = None, task_
|
|||
return json.dumps(response, ensure_ascii=False)
|
||||
|
||||
|
||||
def _eval_ssrf_guard_active(effective_task_id: str) -> bool:
|
||||
"""Return True when eval-driven private-network access must be guarded.
|
||||
|
||||
Matches the gating used by ``browser_navigate`` / ``browser_snapshot`` /
|
||||
``browser_vision``: the SSRF guard is only meaningful for non-local
|
||||
backends (cloud browser, or a containerized terminal whose browser-on-host
|
||||
can reach internal networks the terminal can't), and is skipped for local
|
||||
sidecar sessions and when ``allow_private_urls`` is set.
|
||||
"""
|
||||
return (
|
||||
not _is_local_backend()
|
||||
and not _is_local_sidecar_key(effective_task_id)
|
||||
and not _allow_private_urls()
|
||||
)
|
||||
|
||||
|
||||
# URL-shaped literals embedded in a JS expression (http/https only). Used to
|
||||
# pre-screen ``browser_console(expression=...)`` calls that fetch/XHR/navigate
|
||||
# to a private host directly — that path never updates ``location.href`` so the
|
||||
# post-eval page-URL recheck below can't see it.
|
||||
_JS_URL_LITERAL_RE = re.compile(r"""https?://[^\s'"`)\]<>]+""", re.IGNORECASE)
|
||||
|
||||
|
||||
def _expression_targets_private_url(expression: str) -> Optional[str]:
|
||||
"""Return the first private/always-blocked URL literal in a JS expression.
|
||||
|
||||
Best-effort: scans for ``http(s)://...`` literals (fetch/XHR/navigation
|
||||
targets the agent may have embedded) and returns the first one that targets
|
||||
a private/internal address or the always-blocked cloud-metadata floor.
|
||||
Returns ``None`` when no such literal is found.
|
||||
"""
|
||||
if not isinstance(expression, str):
|
||||
return None
|
||||
for match in _JS_URL_LITERAL_RE.findall(expression):
|
||||
candidate = match.rstrip(".,;")
|
||||
if _is_always_blocked_url(candidate) or not _is_safe_url(candidate):
|
||||
return candidate
|
||||
return None
|
||||
|
||||
|
||||
def _current_page_private_url(effective_task_id: str) -> Optional[str]:
|
||||
"""Return the current page URL when it targets a private/internal address.
|
||||
|
||||
Reads ``window.location.href`` via a low-cost eval and returns it when the
|
||||
page has been navigated (e.g. via ``location.href = '...'`` in a prior
|
||||
eval) to an address the SSRF guard would reject. Returns ``None`` when the
|
||||
page is public, the URL can't be determined, or the check errors (fail-open
|
||||
on probe failure, matching the snapshot/vision guards).
|
||||
"""
|
||||
try:
|
||||
url_result = _run_browser_command(
|
||||
effective_task_id, "eval", ["window.location.href"],
|
||||
timeout=5, _engine_override="auto",
|
||||
)
|
||||
if url_result.get("success"):
|
||||
current_url = (
|
||||
url_result.get("data", {}).get("result", "")
|
||||
.strip().strip('"').strip("'")
|
||||
)
|
||||
if current_url and (
|
||||
_is_always_blocked_url(current_url) or not _is_safe_url(current_url)
|
||||
):
|
||||
return current_url
|
||||
except Exception as exc:
|
||||
logger.debug("_current_page_private_url: probe failed (%s)", exc)
|
||||
return None
|
||||
|
||||
|
||||
def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
||||
"""Evaluate a JavaScript expression in the page context and return the result."""
|
||||
if _is_camofox_mode():
|
||||
|
|
@ -3038,6 +3106,27 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
|||
|
||||
effective_task_id = _last_session_key(task_id or "default")
|
||||
|
||||
# ── Private-network guard (eval return-value path) ──────────────────────
|
||||
# browser_snapshot / browser_vision re-check the page URL before returning
|
||||
# content, but eval returns arbitrary JS results directly — an attacker can
|
||||
# read a private page via `fetch('http://127.0.0.1/secret')` or by reading
|
||||
# the DOM after `location.href = 'http://127.0.0.1/'`, never touching
|
||||
# snapshot/vision. Close both sub-paths on the same gating condition:
|
||||
# 1. Pre-scan the expression for private-host URL literals (direct fetch).
|
||||
# 2. After eval, re-check the page URL (navigate-then-read).
|
||||
if _eval_ssrf_guard_active(effective_task_id):
|
||||
blocked_literal = _expression_targets_private_url(expression)
|
||||
if blocked_literal:
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": (
|
||||
"Blocked: JavaScript expression targets a private or "
|
||||
f"internal address ({blocked_literal}). Reading internal "
|
||||
"endpoints via browser_console is not permitted in this "
|
||||
"browser mode."
|
||||
),
|
||||
}, ensure_ascii=False)
|
||||
|
||||
# --- Fast path: route through the supervisor's persistent CDP WS ---------
|
||||
# When a CDPSupervisor is alive for this task_id, ``Runtime.evaluate`` runs
|
||||
# on the already-connected WebSocket — zero subprocess startup cost vs
|
||||
|
|
@ -3059,6 +3148,20 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
|||
parsed = json.loads(raw_result)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass # keep as string
|
||||
# Post-eval page-URL recheck: if this (or a prior) eval
|
||||
# navigated the page to a private address, withhold the result.
|
||||
if _eval_ssrf_guard_active(effective_task_id):
|
||||
_blocked_url = _current_page_private_url(effective_task_id)
|
||||
if _blocked_url:
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": (
|
||||
"Blocked: page URL targets a private or internal "
|
||||
f"address ({_blocked_url}). This may have been "
|
||||
"caused by a JavaScript navigation via "
|
||||
"browser_console."
|
||||
),
|
||||
}, ensure_ascii=False)
|
||||
response = {
|
||||
"success": True,
|
||||
"result": parsed,
|
||||
|
|
@ -3134,6 +3237,19 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
|||
"result": parsed,
|
||||
"result_type": type(parsed).__name__,
|
||||
}
|
||||
# Post-eval page-URL recheck: if this (or a prior) eval navigated the page
|
||||
# to a private address, withhold the result (mirrors the supervisor path).
|
||||
if _eval_ssrf_guard_active(effective_task_id):
|
||||
_blocked_url = _current_page_private_url(effective_task_id)
|
||||
if _blocked_url:
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": (
|
||||
"Blocked: page URL targets a private or internal address "
|
||||
f"({_blocked_url}). This may have been caused by a "
|
||||
"JavaScript navigation via browser_console."
|
||||
),
|
||||
}, ensure_ascii=False)
|
||||
return json.dumps(_copy_fallback_warning(response, result), ensure_ascii=False, default=str)
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue