mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-07-02 12:13:05 +00:00
fix(browser): close eval return-value SSRF bypass (sibling of #44731)
The snapshot/vision guards re-check the page URL before returning content,
but browser_console(expression=...) -> _browser_eval returns arbitrary JS
results directly, leaving two same-class bypasses open:
1. Direct fetch: fetch('http://127.0.0.1/secret').then(r=>r.text()) reads
a private endpoint and returns the body — the page URL stays public so
the post-eval recheck never sees it.
2. Navigate-then-read: location.href='http://127.0.0.1/' then a later eval
reads document.body.innerText.
Guard _browser_eval on the same condition as navigate/snapshot/vision
(not local backend, not local sidecar, not allow_private_urls):
- pre-scan the expression for private/always-blocked URL literals
- re-check window.location.href after the eval at both success-return
sites (supervisor fast-path + subprocess fallback)
Probe failures fail-open (matching the snapshot/vision guards).
This commit is contained in:
parent
0ae6196087
commit
7ef04ae7a7
2 changed files with 345 additions and 0 deletions
229
tests/tools/test_browser_eval_ssrf.py
Normal file
229
tests/tools/test_browser_eval_ssrf.py
Normal file
|
|
@ -0,0 +1,229 @@
|
||||||
|
"""Tests that browser_console(expression=...) cannot bypass the SSRF guard.
|
||||||
|
|
||||||
|
browser_snapshot / browser_vision re-check the page URL before returning
|
||||||
|
content, but ``_browser_eval`` returns arbitrary JS results directly. Two
|
||||||
|
sub-paths could read private content without ever touching snapshot/vision:
|
||||||
|
|
||||||
|
1. Direct fetch: ``fetch('http://127.0.0.1/secret').then(r => r.text())``
|
||||||
|
— the page URL stays public, so the post-eval recheck can't see it.
|
||||||
|
Closed by a pre-scan of the expression for private-host URL literals.
|
||||||
|
2. Navigate-then-read: ``location.href = 'http://127.0.0.1/'`` then a later
|
||||||
|
eval reads ``document.body.innerText`` — closed by re-checking the page
|
||||||
|
URL after the eval runs.
|
||||||
|
|
||||||
|
This is the sibling fix for the eval return-value path of issue #44731.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from tools import browser_tool
|
||||||
|
|
||||||
|
|
||||||
|
PRIVATE_URL = "http://127.0.0.1:8080/secret"
|
||||||
|
PUBLIC_URL = "https://example.com/page"
|
||||||
|
METADATA_URL = "http://169.254.169.254/latest/meta-data/"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _no_camofox(monkeypatch):
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_camofox_mode", lambda: False)
|
||||||
|
# No supervisor — force the subprocess fallback path by default.
|
||||||
|
monkeypatch.setattr(browser_tool, "_last_session_key", lambda key: key)
|
||||||
|
|
||||||
|
|
||||||
|
def _eval(expression, task_id="test"):
|
||||||
|
return json.loads(browser_tool._browser_eval(expression, task_id=task_id))
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Sub-path 1: direct private-host fetch literal in the expression (pre-scan)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestExpressionPreScan:
|
||||||
|
def _guard_on(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False)
|
||||||
|
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
|
||||||
|
|
||||||
|
def test_blocks_private_fetch_literal(self, monkeypatch):
|
||||||
|
self._guard_on(monkeypatch)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||||
|
|
||||||
|
called = {"n": 0}
|
||||||
|
|
||||||
|
def _run(task_id, command, args=None, **kwargs):
|
||||||
|
called["n"] += 1
|
||||||
|
return {"success": True, "data": {"result": "leaked-content"}}
|
||||||
|
|
||||||
|
monkeypatch.setattr(browser_tool, "_run_browser_command", _run)
|
||||||
|
|
||||||
|
result = _eval(f"fetch('{PRIVATE_URL}').then(r => r.text())")
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "private or internal address" in result["error"]
|
||||||
|
assert PRIVATE_URL in result["error"]
|
||||||
|
# Expression never executed — blocked before any browser command.
|
||||||
|
assert called["n"] == 0
|
||||||
|
|
||||||
|
def test_blocks_metadata_fetch_literal(self, monkeypatch):
|
||||||
|
self._guard_on(monkeypatch)
|
||||||
|
# Public-safe to is_safe_url, but the always-blocked floor catches IMDS.
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
browser_tool, "_is_always_blocked_url",
|
||||||
|
lambda url: "169.254.169.254" in url,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
browser_tool, "_run_browser_command",
|
||||||
|
lambda *a, **k: {"success": True, "data": {"result": "creds"}},
|
||||||
|
)
|
||||||
|
|
||||||
|
result = _eval(f"fetch('{METADATA_URL}')")
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "private or internal address" in result["error"]
|
||||||
|
|
||||||
|
def test_allows_public_fetch_literal(self, monkeypatch):
|
||||||
|
self._guard_on(monkeypatch)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||||
|
# After the (public) eval, the page-URL recheck must also see a public URL.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
browser_tool, "_run_browser_command",
|
||||||
|
lambda task_id, command, args=None, **k: (
|
||||||
|
{"success": True, "data": {"result": PUBLIC_URL}}
|
||||||
|
if args == ["window.location.href"]
|
||||||
|
else {"success": True, "data": {"result": "ok"}}
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
result = _eval(f"fetch('{PUBLIC_URL}').then(r => r.text())")
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["result"] == "ok"
|
||||||
|
|
||||||
|
def test_skips_prescan_for_local_backend(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: True)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
browser_tool, "_run_browser_command",
|
||||||
|
lambda *a, **k: {"success": True, "data": {"result": "local-ok"}},
|
||||||
|
)
|
||||||
|
result = _eval(f"fetch('{PRIVATE_URL}')")
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["result"] == "local-ok"
|
||||||
|
|
||||||
|
def test_skips_prescan_for_local_sidecar(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: True)
|
||||||
|
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
browser_tool, "_run_browser_command",
|
||||||
|
lambda *a, **k: {"success": True, "data": {"result": "sidecar-ok"}},
|
||||||
|
)
|
||||||
|
result = _eval(f"fetch('{PRIVATE_URL}')")
|
||||||
|
assert result["success"] is True
|
||||||
|
|
||||||
|
def test_skips_prescan_when_allow_private(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False)
|
||||||
|
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: True)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
browser_tool, "_run_browser_command",
|
||||||
|
lambda *a, **k: {"success": True, "data": {"result": "allowed"}},
|
||||||
|
)
|
||||||
|
result = _eval(f"fetch('{PRIVATE_URL}')")
|
||||||
|
assert result["success"] is True
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Sub-path 2: navigate-then-read (post-eval page-URL recheck)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestPostEvalPageRecheck:
|
||||||
|
def _guard_on(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_local_backend", lambda: False)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_local_sidecar_key", lambda key: False)
|
||||||
|
monkeypatch.setattr(browser_tool, "_allow_private_urls", lambda: False)
|
||||||
|
|
||||||
|
def test_blocks_when_page_navigated_private(self, monkeypatch):
|
||||||
|
self._guard_on(monkeypatch)
|
||||||
|
# Expression itself has no URL literal (reads the DOM), so the pre-scan
|
||||||
|
# passes; the danger is that the page was navigated to a private URL by
|
||||||
|
# an earlier eval. The recheck must catch it.
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
browser_tool, "_run_browser_command",
|
||||||
|
lambda task_id, command, args=None, **k: (
|
||||||
|
{"success": True, "data": {"result": PRIVATE_URL}}
|
||||||
|
if args == ["window.location.href"]
|
||||||
|
else {"success": True, "data": {"result": "secret DOM text"}}
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
result = _eval("document.body.innerText")
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "private or internal address" in result["error"]
|
||||||
|
assert PRIVATE_URL in result["error"]
|
||||||
|
|
||||||
|
def test_allows_when_page_public(self, monkeypatch):
|
||||||
|
self._guard_on(monkeypatch)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
browser_tool, "_run_browser_command",
|
||||||
|
lambda task_id, command, args=None, **k: (
|
||||||
|
{"success": True, "data": {"result": PUBLIC_URL}}
|
||||||
|
if args == ["window.location.href"]
|
||||||
|
else {"success": True, "data": {"result": "public DOM text"}}
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
result = _eval("document.body.innerText")
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["result"] == "public DOM text"
|
||||||
|
|
||||||
|
def test_fail_open_when_url_probe_fails(self, monkeypatch):
|
||||||
|
"""If the window.location.href probe errors, don't block (fail-open)."""
|
||||||
|
self._guard_on(monkeypatch)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||||
|
|
||||||
|
def _run(task_id, command, args=None, **k):
|
||||||
|
if args == ["window.location.href"]:
|
||||||
|
return {"success": False, "error": "CDP probe failed"}
|
||||||
|
return {"success": True, "data": {"result": "dom text"}}
|
||||||
|
|
||||||
|
monkeypatch.setattr(browser_tool, "_run_browser_command", _run)
|
||||||
|
|
||||||
|
result = _eval("document.body.innerText")
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["result"] == "dom text"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helper-level unit tests
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestExpressionScanHelper:
|
||||||
|
def test_returns_first_private_literal(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: "127.0.0.1" not in url)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||||
|
out = browser_tool._expression_targets_private_url(
|
||||||
|
"fetch('https://example.com'); fetch('http://127.0.0.1/x')"
|
||||||
|
)
|
||||||
|
assert out == "http://127.0.0.1/x"
|
||||||
|
|
||||||
|
def test_none_when_no_url(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: True)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||||
|
assert browser_tool._expression_targets_private_url("document.title") is None
|
||||||
|
|
||||||
|
def test_strips_trailing_punctuation(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_safe_url", lambda url: False)
|
||||||
|
monkeypatch.setattr(browser_tool, "_is_always_blocked_url", lambda url: False)
|
||||||
|
out = browser_tool._expression_targets_private_url("location.href='http://10.0.0.1/';")
|
||||||
|
assert out == "http://10.0.0.1/"
|
||||||
|
|
@ -3031,6 +3031,74 @@ def browser_console(clear: bool = False, expression: Optional[str] = None, task_
|
||||||
return json.dumps(response, ensure_ascii=False)
|
return json.dumps(response, ensure_ascii=False)
|
||||||
|
|
||||||
|
|
||||||
|
def _eval_ssrf_guard_active(effective_task_id: str) -> bool:
|
||||||
|
"""Return True when eval-driven private-network access must be guarded.
|
||||||
|
|
||||||
|
Matches the gating used by ``browser_navigate`` / ``browser_snapshot`` /
|
||||||
|
``browser_vision``: the SSRF guard is only meaningful for non-local
|
||||||
|
backends (cloud browser, or a containerized terminal whose browser-on-host
|
||||||
|
can reach internal networks the terminal can't), and is skipped for local
|
||||||
|
sidecar sessions and when ``allow_private_urls`` is set.
|
||||||
|
"""
|
||||||
|
return (
|
||||||
|
not _is_local_backend()
|
||||||
|
and not _is_local_sidecar_key(effective_task_id)
|
||||||
|
and not _allow_private_urls()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# URL-shaped literals embedded in a JS expression (http/https only). Used to
|
||||||
|
# pre-screen ``browser_console(expression=...)`` calls that fetch/XHR/navigate
|
||||||
|
# to a private host directly — that path never updates ``location.href`` so the
|
||||||
|
# post-eval page-URL recheck below can't see it.
|
||||||
|
_JS_URL_LITERAL_RE = re.compile(r"""https?://[^\s'"`)\]<>]+""", re.IGNORECASE)
|
||||||
|
|
||||||
|
|
||||||
|
def _expression_targets_private_url(expression: str) -> Optional[str]:
|
||||||
|
"""Return the first private/always-blocked URL literal in a JS expression.
|
||||||
|
|
||||||
|
Best-effort: scans for ``http(s)://...`` literals (fetch/XHR/navigation
|
||||||
|
targets the agent may have embedded) and returns the first one that targets
|
||||||
|
a private/internal address or the always-blocked cloud-metadata floor.
|
||||||
|
Returns ``None`` when no such literal is found.
|
||||||
|
"""
|
||||||
|
if not isinstance(expression, str):
|
||||||
|
return None
|
||||||
|
for match in _JS_URL_LITERAL_RE.findall(expression):
|
||||||
|
candidate = match.rstrip(".,;")
|
||||||
|
if _is_always_blocked_url(candidate) or not _is_safe_url(candidate):
|
||||||
|
return candidate
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _current_page_private_url(effective_task_id: str) -> Optional[str]:
|
||||||
|
"""Return the current page URL when it targets a private/internal address.
|
||||||
|
|
||||||
|
Reads ``window.location.href`` via a low-cost eval and returns it when the
|
||||||
|
page has been navigated (e.g. via ``location.href = '...'`` in a prior
|
||||||
|
eval) to an address the SSRF guard would reject. Returns ``None`` when the
|
||||||
|
page is public, the URL can't be determined, or the check errors (fail-open
|
||||||
|
on probe failure, matching the snapshot/vision guards).
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
url_result = _run_browser_command(
|
||||||
|
effective_task_id, "eval", ["window.location.href"],
|
||||||
|
timeout=5, _engine_override="auto",
|
||||||
|
)
|
||||||
|
if url_result.get("success"):
|
||||||
|
current_url = (
|
||||||
|
url_result.get("data", {}).get("result", "")
|
||||||
|
.strip().strip('"').strip("'")
|
||||||
|
)
|
||||||
|
if current_url and (
|
||||||
|
_is_always_blocked_url(current_url) or not _is_safe_url(current_url)
|
||||||
|
):
|
||||||
|
return current_url
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("_current_page_private_url: probe failed (%s)", exc)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
||||||
"""Evaluate a JavaScript expression in the page context and return the result."""
|
"""Evaluate a JavaScript expression in the page context and return the result."""
|
||||||
if _is_camofox_mode():
|
if _is_camofox_mode():
|
||||||
|
|
@ -3038,6 +3106,27 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
||||||
|
|
||||||
effective_task_id = _last_session_key(task_id or "default")
|
effective_task_id = _last_session_key(task_id or "default")
|
||||||
|
|
||||||
|
# ── Private-network guard (eval return-value path) ──────────────────────
|
||||||
|
# browser_snapshot / browser_vision re-check the page URL before returning
|
||||||
|
# content, but eval returns arbitrary JS results directly — an attacker can
|
||||||
|
# read a private page via `fetch('http://127.0.0.1/secret')` or by reading
|
||||||
|
# the DOM after `location.href = 'http://127.0.0.1/'`, never touching
|
||||||
|
# snapshot/vision. Close both sub-paths on the same gating condition:
|
||||||
|
# 1. Pre-scan the expression for private-host URL literals (direct fetch).
|
||||||
|
# 2. After eval, re-check the page URL (navigate-then-read).
|
||||||
|
if _eval_ssrf_guard_active(effective_task_id):
|
||||||
|
blocked_literal = _expression_targets_private_url(expression)
|
||||||
|
if blocked_literal:
|
||||||
|
return json.dumps({
|
||||||
|
"success": False,
|
||||||
|
"error": (
|
||||||
|
"Blocked: JavaScript expression targets a private or "
|
||||||
|
f"internal address ({blocked_literal}). Reading internal "
|
||||||
|
"endpoints via browser_console is not permitted in this "
|
||||||
|
"browser mode."
|
||||||
|
),
|
||||||
|
}, ensure_ascii=False)
|
||||||
|
|
||||||
# --- Fast path: route through the supervisor's persistent CDP WS ---------
|
# --- Fast path: route through the supervisor's persistent CDP WS ---------
|
||||||
# When a CDPSupervisor is alive for this task_id, ``Runtime.evaluate`` runs
|
# When a CDPSupervisor is alive for this task_id, ``Runtime.evaluate`` runs
|
||||||
# on the already-connected WebSocket — zero subprocess startup cost vs
|
# on the already-connected WebSocket — zero subprocess startup cost vs
|
||||||
|
|
@ -3059,6 +3148,20 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
||||||
parsed = json.loads(raw_result)
|
parsed = json.loads(raw_result)
|
||||||
except (json.JSONDecodeError, ValueError):
|
except (json.JSONDecodeError, ValueError):
|
||||||
pass # keep as string
|
pass # keep as string
|
||||||
|
# Post-eval page-URL recheck: if this (or a prior) eval
|
||||||
|
# navigated the page to a private address, withhold the result.
|
||||||
|
if _eval_ssrf_guard_active(effective_task_id):
|
||||||
|
_blocked_url = _current_page_private_url(effective_task_id)
|
||||||
|
if _blocked_url:
|
||||||
|
return json.dumps({
|
||||||
|
"success": False,
|
||||||
|
"error": (
|
||||||
|
"Blocked: page URL targets a private or internal "
|
||||||
|
f"address ({_blocked_url}). This may have been "
|
||||||
|
"caused by a JavaScript navigation via "
|
||||||
|
"browser_console."
|
||||||
|
),
|
||||||
|
}, ensure_ascii=False)
|
||||||
response = {
|
response = {
|
||||||
"success": True,
|
"success": True,
|
||||||
"result": parsed,
|
"result": parsed,
|
||||||
|
|
@ -3134,6 +3237,19 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
||||||
"result": parsed,
|
"result": parsed,
|
||||||
"result_type": type(parsed).__name__,
|
"result_type": type(parsed).__name__,
|
||||||
}
|
}
|
||||||
|
# Post-eval page-URL recheck: if this (or a prior) eval navigated the page
|
||||||
|
# to a private address, withhold the result (mirrors the supervisor path).
|
||||||
|
if _eval_ssrf_guard_active(effective_task_id):
|
||||||
|
_blocked_url = _current_page_private_url(effective_task_id)
|
||||||
|
if _blocked_url:
|
||||||
|
return json.dumps({
|
||||||
|
"success": False,
|
||||||
|
"error": (
|
||||||
|
"Blocked: page URL targets a private or internal address "
|
||||||
|
f"({_blocked_url}). This may have been caused by a "
|
||||||
|
"JavaScript navigation via browser_console."
|
||||||
|
),
|
||||||
|
}, ensure_ascii=False)
|
||||||
return json.dumps(_copy_fallback_warning(response, result), ensure_ascii=False, default=str)
|
return json.dumps(_copy_fallback_warning(response, result), ensure_ascii=False, default=str)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue