diff --git a/tests/tools/test_browser_cleanup.py b/tests/tools/test_browser_cleanup.py new file mode 100644 index 0000000000..9dfabe6404 --- /dev/null +++ b/tests/tools/test_browser_cleanup.py @@ -0,0 +1,96 @@ +"""Regression tests for browser session cleanup and screenshot recovery.""" + +from unittest.mock import patch + + +class TestScreenshotPathRecovery: + def test_extracts_standard_absolute_path(self): + from tools.browser_tool import _extract_screenshot_path_from_text + + assert ( + _extract_screenshot_path_from_text("Screenshot saved to /tmp/foo.png") + == "/tmp/foo.png" + ) + + def test_extracts_quoted_absolute_path(self): + from tools.browser_tool import _extract_screenshot_path_from_text + + assert ( + _extract_screenshot_path_from_text( + "Screenshot saved to '/Users/david/.hermes/browser_screenshots/shot.png'" + ) + == "/Users/david/.hermes/browser_screenshots/shot.png" + ) + + +class TestBrowserCleanup: + def setup_method(self): + from tools import browser_tool + + self.browser_tool = browser_tool + self.orig_active_sessions = browser_tool._active_sessions.copy() + self.orig_session_last_activity = browser_tool._session_last_activity.copy() + self.orig_recording_sessions = browser_tool._recording_sessions.copy() + self.orig_cleanup_done = browser_tool._cleanup_done + + def teardown_method(self): + self.browser_tool._active_sessions.clear() + self.browser_tool._active_sessions.update(self.orig_active_sessions) + self.browser_tool._session_last_activity.clear() + self.browser_tool._session_last_activity.update(self.orig_session_last_activity) + self.browser_tool._recording_sessions.clear() + self.browser_tool._recording_sessions.update(self.orig_recording_sessions) + self.browser_tool._cleanup_done = self.orig_cleanup_done + + def test_cleanup_browser_clears_tracking_state(self): + browser_tool = self.browser_tool + browser_tool._active_sessions["task-1"] = { + "session_name": "sess-1", + "bb_session_id": None, + } + browser_tool._session_last_activity["task-1"] = 123.0 + + with ( + patch("tools.browser_tool._maybe_stop_recording") as mock_stop, + patch( + "tools.browser_tool._run_browser_command", + return_value={"success": True}, + ) as mock_run, + patch("tools.browser_tool.os.path.exists", return_value=False), + ): + browser_tool.cleanup_browser("task-1") + + assert "task-1" not in browser_tool._active_sessions + assert "task-1" not in browser_tool._session_last_activity + mock_stop.assert_called_once_with("task-1") + mock_run.assert_called_once_with("task-1", "close", [], timeout=10) + + def test_browser_close_delegates_to_cleanup_browser(self): + import json + + browser_tool = self.browser_tool + browser_tool._active_sessions["task-2"] = {"session_name": "sess-2"} + + with patch("tools.browser_tool.cleanup_browser") as mock_cleanup: + result = json.loads(browser_tool.browser_close("task-2")) + + assert result == {"success": True, "closed": True} + mock_cleanup.assert_called_once_with("task-2") + + def test_emergency_cleanup_clears_all_tracking_state(self): + browser_tool = self.browser_tool + browser_tool._cleanup_done = False + browser_tool._active_sessions["task-1"] = {"session_name": "sess-1"} + browser_tool._active_sessions["task-2"] = {"session_name": "sess-2"} + browser_tool._session_last_activity["task-1"] = 1.0 + browser_tool._session_last_activity["task-2"] = 2.0 + browser_tool._recording_sessions.update({"task-1", "task-2"}) + + with patch("tools.browser_tool.cleanup_all_browsers") as mock_cleanup_all: + browser_tool._emergency_cleanup_all_sessions() + + mock_cleanup_all.assert_called_once_with() + assert browser_tool._active_sessions == {} + assert browser_tool._session_last_activity == {} + assert browser_tool._recording_sessions == set() + assert browser_tool._cleanup_done is True diff --git a/tools/browser_tool.py b/tools/browser_tool.py index b3516c4f24..9a2c9de4ca 100644 --- a/tools/browser_tool.py +++ b/tools/browser_tool.py @@ -49,10 +49,12 @@ Usage: browser_click("@e5", task_id="task_123") """ +from tools.registry import registry import atexit import json import logging import os +import re import signal import subprocess import shutil @@ -126,7 +128,8 @@ def _socket_safe_tmpdir() -> str: # Track active sessions per task # Stores: session_name (always), bb_session_id + cdp_url (cloud mode only) -_active_sessions: Dict[str, Dict[str, str]] = {} # task_id -> {session_name, ...} +# task_id -> {session_name, ...} +_active_sessions: Dict[str, Dict[str, str]] = {} _recording_sessions: set = set() # task_ids with active recordings # Flag to track if cleanup has been done @@ -139,7 +142,8 @@ _cleanup_done = False # Session inactivity timeout (seconds) - cleanup if no activity for this long # Default: 5 minutes. Needs headroom for LLM reasoning between browser commands, # especially when subagents are doing multi-step browser tasks. -BROWSER_SESSION_INACTIVITY_TIMEOUT = int(os.environ.get("BROWSER_INACTIVITY_TIMEOUT", "300")) +BROWSER_SESSION_INACTIVITY_TIMEOUT = int( + os.environ.get("BROWSER_INACTIVITY_TIMEOUT", "300")) # Track last activity time per session _session_last_activity: Dict[str, float] = {} @@ -161,67 +165,22 @@ def _emergency_cleanup_all_sessions(): if _cleanup_done: return _cleanup_done = True - + if not _active_sessions: return - - logger.info("Emergency cleanup: closing %s active session(s)...", len(_active_sessions)) - + + logger.info("Emergency cleanup: closing %s active session(s)...", + len(_active_sessions)) + try: - if _is_local_mode(): - # Local mode: just close agent-browser sessions via CLI - for task_id, session_info in list(_active_sessions.items()): - session_name = session_info.get("session_name") - if session_name: - try: - browser_cmd = _find_agent_browser() - task_socket_dir = os.path.join( - _socket_safe_tmpdir(), - f"agent-browser-{session_name}" - ) - env = {**os.environ, "AGENT_BROWSER_SOCKET_DIR": task_socket_dir} - subprocess.run( - browser_cmd.split() + ["--session", session_name, "--json", "close"], - capture_output=True, timeout=5, env=env, - ) - logger.info("Closed local session %s", session_name) - except Exception as e: - logger.debug("Error closing local session %s: %s", session_name, e) - else: - # Cloud mode: release Browserbase sessions via API - api_key = os.environ.get("BROWSERBASE_API_KEY") - project_id = os.environ.get("BROWSERBASE_PROJECT_ID") - - if not api_key or not project_id: - logger.warning("Cannot cleanup - missing BROWSERBASE credentials") - return - - for task_id, session_info in list(_active_sessions.items()): - bb_session_id = session_info.get("bb_session_id") - if bb_session_id: - try: - response = requests.post( - f"https://api.browserbase.com/v1/sessions/{bb_session_id}", - headers={ - "X-BB-API-Key": api_key, - "Content-Type": "application/json" - }, - json={ - "projectId": project_id, - "status": "REQUEST_RELEASE" - }, - timeout=5 # Short timeout for cleanup - ) - if response.status_code in (200, 201, 204): - logger.info("Closed session %s", bb_session_id) - else: - logger.warning("Failed to close session %s: HTTP %s", bb_session_id, response.status_code) - except Exception as e: - logger.error("Error closing session %s: %s", bb_session_id, e) - - _active_sessions.clear() + cleanup_all_browsers() except Exception as e: logger.error("Emergency cleanup error: %s", e) + finally: + with _cleanup_lock: + _active_sessions.clear() + _session_last_activity.clear() + _recording_sessions.clear() # Register cleanup via atexit only. Previous versions installed SIGINT/SIGTERM @@ -240,46 +199,49 @@ atexit.register(_emergency_cleanup_all_sessions) def _cleanup_inactive_browser_sessions(): """ Clean up browser sessions that have been inactive for longer than the timeout. - + This function is called periodically by the background cleanup thread to automatically close sessions that haven't been used recently, preventing orphaned sessions (local or Browserbase) from accumulating. """ current_time = time.time() sessions_to_cleanup = [] - + with _cleanup_lock: for task_id, last_time in list(_session_last_activity.items()): if current_time - last_time > BROWSER_SESSION_INACTIVITY_TIMEOUT: sessions_to_cleanup.append(task_id) - + for task_id in sessions_to_cleanup: try: - elapsed = int(current_time - _session_last_activity.get(task_id, current_time)) - logger.info("Cleaning up inactive session for task: %s (inactive for %ss)", task_id, elapsed) + elapsed = int( + current_time - _session_last_activity.get(task_id, current_time)) + logger.info( + "Cleaning up inactive session for task: %s (inactive for %ss)", task_id, elapsed) cleanup_browser(task_id) with _cleanup_lock: if task_id in _session_last_activity: del _session_last_activity[task_id] except Exception as e: - logger.warning("Error cleaning up inactive session %s: %s", task_id, e) + logger.warning( + "Error cleaning up inactive session %s: %s", task_id, e) def _browser_cleanup_thread_worker(): """ Background thread that periodically cleans up inactive browser sessions. - + Runs every 30 seconds and checks for sessions that haven't been used within the BROWSER_SESSION_INACTIVITY_TIMEOUT period. """ global _cleanup_running - + while _cleanup_running: try: _cleanup_inactive_browser_sessions() except Exception as e: logger.warning("Cleanup thread error: %s", e) - + # Sleep in 1-second intervals so we can stop quickly if needed for _ in range(30): if not _cleanup_running: @@ -290,7 +252,7 @@ def _browser_cleanup_thread_worker(): def _start_browser_cleanup_thread(): """Start the background cleanup thread if not already running.""" global _cleanup_thread, _cleanup_running - + with _cleanup_lock: if _cleanup_thread is None or not _cleanup_thread.is_alive(): _cleanup_running = True @@ -300,7 +262,8 @@ def _start_browser_cleanup_thread(): name="browser-cleanup" ) _cleanup_thread.start() - logger.info("Started inactivity cleanup thread (timeout: %ss)", BROWSER_SESSION_INACTIVITY_TIMEOUT) + logger.info("Started inactivity cleanup thread (timeout: %ss)", + BROWSER_SESSION_INACTIVITY_TIMEOUT) def _stop_browser_cleanup_thread(): @@ -487,38 +450,41 @@ BROWSER_TOOL_SCHEMAS = [ def _create_browserbase_session(task_id: str) -> Dict[str, str]: """ Create a Browserbase session with stealth features. - + Browserbase Stealth Modes: - Basic Stealth: ALWAYS enabled automatically. Generates random fingerprints, viewports, and solves visual CAPTCHAs. No configuration needed. - Advanced Stealth: Uses custom Chromium build for better bot detection avoidance. Requires Scale Plan. Enable via BROWSERBASE_ADVANCED_STEALTH=true. - + Proxies are enabled by default to route traffic through residential IPs, which significantly improves CAPTCHA solving rates. Can be disabled via BROWSERBASE_PROXIES=false if needed. - + Args: task_id: Unique identifier for the task - + Returns: Dict with session_name, bb_session_id, cdp_url, and feature flags """ import uuid import sys - + config = _get_browserbase_config() - + # Check for optional settings from environment # Proxies: enabled by default for better CAPTCHA solving - enable_proxies = os.environ.get("BROWSERBASE_PROXIES", "true").lower() != "false" + enable_proxies = os.environ.get( + "BROWSERBASE_PROXIES", "true").lower() != "false" # Advanced Stealth: requires Scale Plan, disabled by default - enable_advanced_stealth = os.environ.get("BROWSERBASE_ADVANCED_STEALTH", "false").lower() == "true" + enable_advanced_stealth = os.environ.get( + "BROWSERBASE_ADVANCED_STEALTH", "false").lower() == "true" # keepAlive: enabled by default (requires paid plan) - allows reconnection after disconnects - enable_keep_alive = os.environ.get("BROWSERBASE_KEEP_ALIVE", "true").lower() != "false" + enable_keep_alive = os.environ.get( + "BROWSERBASE_KEEP_ALIVE", "true").lower() != "false" # Custom session timeout in milliseconds (optional) - extends session beyond project default custom_timeout_ms = os.environ.get("BROWSERBASE_SESSION_TIMEOUT") - + # Track which features are actually enabled for logging/debugging features_enabled = { "basic_stealth": True, # Always on @@ -527,18 +493,18 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]: "keep_alive": False, "custom_timeout": False, } - + # Build session configuration # Note: Basic stealth mode is ALWAYS active - no configuration needed session_config = { "projectId": config["project_id"], } - + # Enable keepAlive for session reconnection (default: true, requires paid plan) # Allows reconnecting to the same session after network hiccups if enable_keep_alive: session_config["keepAlive"] = True - + # Add custom timeout if specified (in milliseconds) # This extends session duration beyond project's default timeout if custom_timeout_ms: @@ -547,20 +513,21 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]: if timeout_val > 0: session_config["timeout"] = timeout_val except ValueError: - logger.warning("Invalid BROWSERBASE_SESSION_TIMEOUT value: %s", custom_timeout_ms) - + logger.warning( + "Invalid BROWSERBASE_SESSION_TIMEOUT value: %s", custom_timeout_ms) + # Enable proxies for better CAPTCHA solving (default: true) # Routes traffic through residential IPs for more reliable access if enable_proxies: session_config["proxies"] = True - + # Add advanced stealth if enabled (requires Scale Plan) # Uses custom Chromium build to avoid bot detection altogether if enable_advanced_stealth: session_config["browserSettings"] = { "advancedStealth": True, } - + # Create session via Browserbase API response = requests.post( "https://api.browserbase.com/v1/sessions", @@ -571,11 +538,11 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]: json=session_config, timeout=30 ) - + # Track if we fell back from paid features proxies_fallback = False keepalive_fallback = False - + # Handle 402 Payment Required - likely paid features not available # Try to identify which feature caused the issue and retry without it if response.status_code == 402: @@ -583,7 +550,7 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]: if enable_keep_alive: keepalive_fallback = True logger.warning("keepAlive may require paid plan (402), retrying without it. " - "Sessions may timeout during long operations.") + "Sessions may timeout during long operations.") session_config.pop("keepAlive", None) response = requests.post( "https://api.browserbase.com/v1/sessions", @@ -594,12 +561,12 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]: json=session_config, timeout=30 ) - + # If still 402, try without proxies too if response.status_code == 402 and enable_proxies: proxies_fallback = True logger.warning("Proxies unavailable (402), retrying without proxies. " - "Bot detection may be less effective.") + "Bot detection may be less effective.") session_config.pop("proxies", None) response = requests.post( "https://api.browserbase.com/v1/sessions", @@ -610,13 +577,14 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]: json=session_config, timeout=30 ) - + if not response.ok: - raise RuntimeError(f"Failed to create Browserbase session: {response.status_code} {response.text}") - + raise RuntimeError( + f"Failed to create Browserbase session: {response.status_code} {response.text}") + session_data = response.json() session_name = f"hermes_{task_id}_{uuid.uuid4().hex[:8]}" - + # Update features based on what actually succeeded if enable_proxies and not proxies_fallback: features_enabled["proxies"] = True @@ -626,11 +594,12 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]: features_enabled["keep_alive"] = True if custom_timeout_ms and "timeout" in session_config: features_enabled["custom_timeout"] = True - + # Log session info for debugging feature_str = ", ".join(k for k, v in features_enabled.items() if v) - logger.info("Created session %s with features: %s", session_name, feature_str) - + logger.info("Created session %s with features: %s", + session_name, feature_str) + return { "session_name": session_name, "bb_session_id": session_data["id"], @@ -640,18 +609,14 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]: def _create_local_session(task_id: str) -> Dict[str, str]: - """Create a lightweight local browser session (no cloud API call). - - Returns the same dict shape as ``_create_browserbase_session`` so the rest - of the code can treat both modes uniformly. - """ import uuid - session_name = f"hermes_{task_id}_{uuid.uuid4().hex[:8]}" - logger.info("Created local browser session %s", session_name) + session_name = f"h_{uuid.uuid4().hex[:10]}" + logger.info("Created local browser session %s for task %s", + session_name, task_id) return { "session_name": session_name, - "bb_session_id": None, # Not applicable in local mode - "cdp_url": None, # Not applicable in local mode + "bb_session_id": None, + "cdp_url": None, "features": {"local": True}, } @@ -659,51 +624,51 @@ def _create_local_session(task_id: str) -> Dict[str, str]: def _get_session_info(task_id: Optional[str] = None) -> Dict[str, str]: """ Get or create session info for the given task. - + In cloud mode, creates a Browserbase session with proxies enabled. In local mode, generates a session name for agent-browser --session. Also starts the inactivity cleanup thread and updates activity tracking. Thread-safe: multiple subagents can call this concurrently. - + Args: task_id: Unique identifier for the task - + Returns: Dict with session_name (always), bb_session_id + cdp_url (cloud only) """ if task_id is None: task_id = "default" - + # Start the cleanup thread if not running (handles inactivity timeouts) _start_browser_cleanup_thread() - + # Update activity timestamp for this session _update_session_activity(task_id) - + with _cleanup_lock: # Check if we already have a session for this task if task_id in _active_sessions: return _active_sessions[task_id] - + # Create session outside the lock (network call in cloud mode) if _is_local_mode(): session_info = _create_local_session(task_id) else: session_info = _create_browserbase_session(task_id) - + with _cleanup_lock: _active_sessions[task_id] = session_info - + return session_info def _get_session_name(task_id: Optional[str] = None) -> str: """ Get the session name for agent-browser CLI. - + Args: task_id: Unique identifier for the task - + Returns: Session name for agent-browser """ @@ -714,22 +679,22 @@ def _get_session_name(task_id: Optional[str] = None) -> str: def _get_browserbase_config() -> Dict[str, str]: """ Get Browserbase configuration from environment. - + Returns: Dict with api_key and project_id - + Raises: ValueError: If required env vars are not set """ api_key = os.environ.get("BROWSERBASE_API_KEY") project_id = os.environ.get("BROWSERBASE_PROJECT_ID") - + if not api_key or not project_id: raise ValueError( "BROWSERBASE_API_KEY and BROWSERBASE_PROJECT_ID environment variables are required. " "Get your credentials at https://browserbase.com" ) - + return { "api_key": api_key, "project_id": project_id @@ -739,12 +704,12 @@ def _get_browserbase_config() -> Dict[str, str]: def _find_agent_browser() -> str: """ Find the agent-browser CLI executable. - + Checks in order: PATH, local node_modules/.bin/, npx fallback. - + Returns: Path to agent-browser executable - + Raises: FileNotFoundError: If agent-browser is not installed """ @@ -753,18 +718,18 @@ def _find_agent_browser() -> str: which_result = shutil.which("agent-browser") if which_result: return which_result - + # Check local node_modules/.bin/ (npm install in repo root) repo_root = Path(__file__).parent.parent local_bin = repo_root / "node_modules" / ".bin" / "agent-browser" if local_bin.exists(): return str(local_bin) - + # Check common npx locations npx_path = shutil.which("npx") if npx_path: return "npx agent-browser" - + raise FileNotFoundError( "agent-browser CLI not found. Install it with: npm install -g agent-browser\n" "Or run 'npm install' in the repo root to install locally.\n" @@ -772,6 +737,27 @@ def _find_agent_browser() -> str: ) +def _extract_screenshot_path_from_text(text: str) -> Optional[str]: + """Extract a screenshot file path from agent-browser human-readable output.""" + if not text: + return None + + patterns = [ + r"Screenshot saved to ['\"](?P/[^'\"]+?\.png)['\"]", + r"Screenshot saved to (?P/\S+?\.png)(?:\s|$)", + r"(?P/\S+?\.png)(?:\s|$)", + ] + + for pattern in patterns: + match = re.search(pattern, text) + if match: + path = match.group("path").strip().strip("'\"") + if path: + return path + + return None + + def _run_browser_command( task_id: str, command: str, @@ -780,25 +766,25 @@ def _run_browser_command( ) -> Dict[str, Any]: """ Run an agent-browser CLI command using our pre-created Browserbase session. - + Args: task_id: Task identifier to get the right session command: The command to run (e.g., "open", "click") args: Additional arguments for the command timeout: Command timeout in seconds - + Returns: Parsed JSON response from agent-browser """ args = args or [] - + # Build the command try: browser_cmd = _find_agent_browser() except FileNotFoundError as e: logger.warning("agent-browser CLI not found: %s", e) return {"success": False, "error": str(e)} - + from tools.interrupt import is_interrupted if is_interrupted(): return {"success": False, "error": "Interrupted"} @@ -807,9 +793,10 @@ def _run_browser_command( try: session_info = _get_session_info(task_id) except Exception as e: - logger.warning("Failed to create browser session for task=%s: %s", task_id, e) + logger.warning( + "Failed to create browser session for task=%s: %s", task_id, e) return {"success": False, "error": f"Failed to create browser session: {str(e)}"} - + # Build the command with the appropriate backend flag. # Cloud mode: --cdp connects to Browserbase. # Local mode: --session launches a local headless Chromium. @@ -827,7 +814,7 @@ def _run_browser_command( "--json", command ] + args - + try: # Give each task its own socket directory to prevent concurrency conflicts. # Without this, parallel workers fight over the same default socket path, @@ -839,13 +826,31 @@ def _run_browser_command( os.makedirs(task_socket_dir, mode=0o700, exist_ok=True) logger.debug("browser cmd=%s task=%s socket_dir=%s (%d chars)", command, task_id, task_socket_dir, len(task_socket_dir)) - + browser_env = {**os.environ} - # Ensure PATH includes standard dirs (systemd services may have minimal PATH) - if "/usr/bin" not in browser_env.get("PATH", "").split(":"): - browser_env["PATH"] = f"{browser_env.get('PATH', '')}:{_SANE_PATH}" + + # Ensure PATH includes Hermes-managed Node first, then standard system dirs. + hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) + hermes_node_bin = str(hermes_home / "node" / "bin") + + existing_path = browser_env.get("PATH", "") + path_parts = [p for p in existing_path.split(":") if p] + candidate_dirs = [hermes_node_bin] + [p for p in _SANE_PATH.split(":") if p] + + for part in reversed(candidate_dirs): + if os.path.isdir(part) and part not in path_parts: + path_parts.insert(0, part) + + browser_env["PATH"] = ":".join(path_parts) browser_env["AGENT_BROWSER_SOCKET_DIR"] = task_socket_dir - + + node_path = shutil.which("node", path=browser_env["PATH"]) + if node_path: + logger.debug("browser subprocess using node at: %s", node_path) + else: + logger.warning("node not found in browser PATH: %s", + browser_env["PATH"]) + result = subprocess.run( cmd_parts, capture_output=True, @@ -853,12 +858,13 @@ def _run_browser_command( timeout=timeout, env=browser_env, ) - + # Log stderr for diagnostics — use warning level on failure so it's visible if result.stderr and result.stderr.strip(): level = logging.WARNING if result.returncode != 0 else logging.DEBUG - logger.log(level, "browser '%s' stderr: %s", command, result.stderr.strip()[:500]) - + logger.log(level, "browser '%s' stderr: %s", + command, result.stderr.strip()[:500]) + # Log empty output as warning — common sign of broken agent-browser if not result.stdout.strip() and result.returncode == 0: logger.warning("browser '%s' returned empty stdout with rc=0. " @@ -866,11 +872,11 @@ def _run_browser_command( command, " ".join(cmd_parts[:4]) + "...", (result.stderr or "")[:200]) - # Parse JSON output - if result.stdout.strip(): + stdout_text = result.stdout.strip() + + if stdout_text: try: - parsed = json.loads(result.stdout.strip()) - # Warn if snapshot came back empty (common sign of daemon/CDP issues) + parsed = json.loads(stdout_text) if command == "snapshot" and parsed.get("success"): snap_data = parsed.get("data", {}) if not snap_data.get("snapshot") and not snap_data.get("refs"): @@ -879,23 +885,46 @@ def _run_browser_command( "returncode=%s", result.returncode) return parsed except json.JSONDecodeError: - # Non-JSON output indicates agent-browser crash or version mismatch - raw = result.stdout.strip()[:500] + raw = stdout_text[:2000] logger.warning("browser '%s' returned non-JSON output (rc=%s): %s", - command, result.returncode, raw[:200]) + command, result.returncode, raw[:500]) + + if command == "screenshot": + stderr_text = (result.stderr or "").strip() + combined_text = "\n".join( + part for part in [stdout_text, stderr_text] if part + ) + recovered_path = _extract_screenshot_path_from_text( + combined_text) + + if recovered_path and Path(recovered_path).exists(): + logger.info( + "browser 'screenshot' recovered file from non-JSON output: %s", + recovered_path, + ) + return { + "success": True, + "data": { + "path": recovered_path, + "raw": raw, + }, + } + return { - "success": True, - "data": {"raw": raw} + "success": False, + "error": f"Non-JSON output from agent-browser for '{command}': {raw}" } - + # Check for errors if result.returncode != 0: - error_msg = result.stderr.strip() if result.stderr else f"Command failed with code {result.returncode}" - logger.warning("browser '%s' failed (rc=%s): %s", command, result.returncode, error_msg[:300]) + error_msg = result.stderr.strip( + ) if result.stderr else f"Command failed with code {result.returncode}" + logger.warning("browser '%s' failed (rc=%s): %s", + command, result.returncode, error_msg[:300]) return {"success": False, "error": error_msg} - + return {"success": True, "data": {}} - + except subprocess.TimeoutExpired: logger.warning("browser '%s' timed out after %ds (task=%s, socket_dir=%s)", command, timeout, task_id, task_socket_dir) @@ -955,17 +984,17 @@ def _extract_relevant_content( def _truncate_snapshot(snapshot_text: str, max_chars: int = 8000) -> str: """ Simple truncation fallback for snapshots. - + Args: snapshot_text: The snapshot text to truncate max_chars: Maximum characters to keep - + Returns: Truncated text with indicator if truncated """ if len(snapshot_text) <= max_chars: return snapshot_text - + return snapshot_text[:max_chars] + "\n\n[... content truncated ...]" @@ -976,39 +1005,39 @@ def _truncate_snapshot(snapshot_text: str, max_chars: int = 8000) -> str: def browser_navigate(url: str, task_id: Optional[str] = None) -> str: """ Navigate to a URL in the browser. - + Args: url: The URL to navigate to task_id: Task identifier for session isolation - + Returns: JSON string with navigation result (includes stealth features info on first nav) """ effective_task_id = task_id or "default" - + # Get session info to check if this is a new session # (will create one with features logged if not exists) session_info = _get_session_info(effective_task_id) is_first_nav = session_info.get("_first_nav", True) - + # Auto-start recording if configured and this is first navigation if is_first_nav: session_info["_first_nav"] = False _maybe_start_recording(effective_task_id) - + result = _run_browser_command(effective_task_id, "open", [url], timeout=60) - + if result.get("success"): data = result.get("data", {}) title = data.get("title", "") final_url = data.get("url", url) - + response = { "success": True, "url": final_url, "title": title } - + # Detect common "blocked" page patterns from title/url blocked_patterns = [ "access denied", "access to this page has been denied", @@ -1018,7 +1047,7 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str: "just a moment", "attention required" ] title_lower = title.lower() - + if any(pattern in title_lower for pattern in blocked_patterns): response["bot_detection_warning"] = ( f"Page title '{title}' suggests bot detection. The site may have blocked this request. " @@ -1026,7 +1055,7 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str: "3) Enable advanced stealth (BROWSERBASE_ADVANCED_STEALTH=true, requires Scale plan), " "4) Some sites have very aggressive bot detection that may be unavoidable." ) - + # Include feature info on first navigation so model knows what's active if is_first_nav and "features" in session_info: features = session_info["features"] @@ -1037,7 +1066,7 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str: "Consider upgrading Browserbase plan for proxy support." ) response["stealth_features"] = active_features - + return json.dumps(response, ensure_ascii=False) else: return json.dumps({ @@ -1053,41 +1082,41 @@ def browser_snapshot( ) -> str: """ Get a text-based snapshot of the current page's accessibility tree. - + Args: full: If True, return complete snapshot. If False, return compact view. task_id: Task identifier for session isolation user_task: The user's current task (for task-aware extraction) - + Returns: JSON string with page snapshot """ effective_task_id = task_id or "default" - + # Build command args based on full flag args = [] if not full: args.extend(["-c"]) # Compact mode - + result = _run_browser_command(effective_task_id, "snapshot", args) - + if result.get("success"): data = result.get("data", {}) snapshot_text = data.get("snapshot", "") refs = data.get("refs", {}) - + # Check if snapshot needs summarization if len(snapshot_text) > SNAPSHOT_SUMMARIZE_THRESHOLD and user_task: snapshot_text = _extract_relevant_content(snapshot_text, user_task) elif len(snapshot_text) > SNAPSHOT_SUMMARIZE_THRESHOLD: snapshot_text = _truncate_snapshot(snapshot_text) - + response = { "success": True, "snapshot": snapshot_text, "element_count": len(refs) if refs else 0 } - + return json.dumps(response, ensure_ascii=False) else: return json.dumps({ @@ -1099,22 +1128,22 @@ def browser_snapshot( def browser_click(ref: str, task_id: Optional[str] = None) -> str: """ Click on an element. - + Args: ref: Element reference (e.g., "@e5") task_id: Task identifier for session isolation - + Returns: JSON string with click result """ effective_task_id = task_id or "default" - + # Ensure ref starts with @ if not ref.startswith("@"): ref = f"@{ref}" - + result = _run_browser_command(effective_task_id, "click", [ref]) - + if result.get("success"): return json.dumps({ "success": True, @@ -1130,24 +1159,24 @@ def browser_click(ref: str, task_id: Optional[str] = None) -> str: def browser_type(ref: str, text: str, task_id: Optional[str] = None) -> str: """ Type text into an input field. - + Args: ref: Element reference (e.g., "@e3") text: Text to type task_id: Task identifier for session isolation - + Returns: JSON string with type result """ effective_task_id = task_id or "default" - + # Ensure ref starts with @ if not ref.startswith("@"): ref = f"@{ref}" - + # Use fill command (clears then types) result = _run_browser_command(effective_task_id, "fill", [ref, text]) - + if result.get("success"): return json.dumps({ "success": True, @@ -1164,25 +1193,25 @@ def browser_type(ref: str, text: str, task_id: Optional[str] = None) -> str: def browser_scroll(direction: str, task_id: Optional[str] = None) -> str: """ Scroll the page. - + Args: direction: "up" or "down" task_id: Task identifier for session isolation - + Returns: JSON string with scroll result """ effective_task_id = task_id or "default" - + # Validate direction if direction not in ["up", "down"]: return json.dumps({ "success": False, "error": f"Invalid direction '{direction}'. Use 'up' or 'down'." }, ensure_ascii=False) - + result = _run_browser_command(effective_task_id, "scroll", [direction]) - + if result.get("success"): return json.dumps({ "success": True, @@ -1198,16 +1227,16 @@ def browser_scroll(direction: str, task_id: Optional[str] = None) -> str: def browser_back(task_id: Optional[str] = None) -> str: """ Navigate back in browser history. - + Args: task_id: Task identifier for session isolation - + Returns: JSON string with navigation result """ effective_task_id = task_id or "default" result = _run_browser_command(effective_task_id, "back", []) - + if result.get("success"): data = result.get("data", {}) return json.dumps({ @@ -1224,17 +1253,17 @@ def browser_back(task_id: Optional[str] = None) -> str: def browser_press(key: str, task_id: Optional[str] = None) -> str: """ Press a keyboard key. - + Args: key: Key to press (e.g., "Enter", "Tab") task_id: Task identifier for session isolation - + Returns: JSON string with key press result """ effective_task_id = task_id or "default" result = _run_browser_command(effective_task_id, "press", [key]) - + if result.get("success"): return json.dumps({ "success": True, @@ -1250,69 +1279,51 @@ def browser_press(key: str, task_id: Optional[str] = None) -> str: def browser_close(task_id: Optional[str] = None) -> str: """ Close the browser session. - + Args: task_id: Task identifier for session isolation - + Returns: JSON string with close result """ effective_task_id = task_id or "default" - - # Stop auto-recording before closing - _maybe_stop_recording(effective_task_id) - - result = _run_browser_command(effective_task_id, "close", []) - - # Close the backend session (Browserbase API in cloud mode, nothing extra in local mode) - session_key = task_id if task_id and task_id in _active_sessions else "default" - if session_key in _active_sessions: - session_info = _active_sessions[session_key] - bb_session_id = session_info.get("bb_session_id") - if bb_session_id: - # Cloud mode: release the Browserbase session via API - try: - config = _get_browserbase_config() - _close_browserbase_session(bb_session_id, config["api_key"], config["project_id"]) - except Exception as e: - logger.warning("Could not close BrowserBase session: %s", e) - del _active_sessions[session_key] - - if result.get("success"): - return json.dumps({ - "success": True, - "closed": True - }, ensure_ascii=False) - else: - # Even if close fails, session was released - return json.dumps({ - "success": True, - "closed": True, - "warning": result.get("error", "Session may not have been active") - }, ensure_ascii=False) + with _cleanup_lock: + had_session = effective_task_id in _active_sessions + + cleanup_browser(effective_task_id) + + response = { + "success": True, + "closed": True, + } + if not had_session: + response["warning"] = "Session may not have been active" + return json.dumps(response, ensure_ascii=False) def browser_console(clear: bool = False, task_id: Optional[str] = None) -> str: """Get browser console messages and JavaScript errors. - + Returns both console output (log/warn/error/info from the page's JS) and uncaught exceptions (crashes, unhandled promise rejections). - + Args: clear: If True, clear the message/error buffers after reading task_id: Task identifier for session isolation - + Returns: JSON string with console messages and JS errors """ effective_task_id = task_id or "default" - + console_args = ["--clear"] if clear else [] error_args = ["--clear"] if clear else [] - - console_result = _run_browser_command(effective_task_id, "console", console_args) - errors_result = _run_browser_command(effective_task_id, "errors", error_args) - + + console_result = _run_browser_command( + effective_task_id, "console", console_args) + errors_result = _run_browser_command( + effective_task_id, "errors", error_args) + messages = [] if console_result.get("success"): for msg in console_result.get("data", {}).get("messages", []): @@ -1321,7 +1332,7 @@ def browser_console(clear: bool = False, task_id: Optional[str] = None) -> str: "text": msg.get("text", ""), "source": "console", }) - + errors = [] if errors_result.get("success"): for err in errors_result.get("data", {}).get("errors", []): @@ -1329,7 +1340,7 @@ def browser_console(clear: bool = False, task_id: Optional[str] = None) -> str: "message": err.get("message", ""), "source": "exception", }) - + return json.dumps({ "success": True, "console_messages": messages, @@ -1344,32 +1355,38 @@ def _maybe_start_recording(task_id: str): if task_id in _recording_sessions: return try: - hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) + hermes_home = Path(os.environ.get( + "HERMES_HOME", Path.home() / ".hermes")) config_path = hermes_home / "config.yaml" record_enabled = False if config_path.exists(): import yaml with open(config_path) as f: cfg = yaml.safe_load(f) or {} - record_enabled = cfg.get("browser", {}).get("record_sessions", False) - + record_enabled = cfg.get("browser", {}).get( + "record_sessions", False) + if not record_enabled: return - + recordings_dir = hermes_home / "browser_recordings" recordings_dir.mkdir(parents=True, exist_ok=True) _cleanup_old_recordings(max_age_hours=72) - + import time timestamp = time.strftime("%Y%m%d_%H%M%S") - recording_path = recordings_dir / f"session_{timestamp}_{task_id[:16]}.webm" - - result = _run_browser_command(task_id, "record", ["start", str(recording_path)]) + recording_path = recordings_dir / \ + f"session_{timestamp}_{task_id[:16]}.webm" + + result = _run_browser_command( + task_id, "record", ["start", str(recording_path)]) if result.get("success"): _recording_sessions.add(task_id) - logger.info("Auto-recording browser session %s to %s", task_id, recording_path) + logger.info("Auto-recording browser session %s to %s", + task_id, recording_path) else: - logger.debug("Could not start auto-recording: %s", result.get("error")) + logger.debug("Could not start auto-recording: %s", + result.get("error")) except Exception as e: logger.debug("Auto-recording setup failed: %s", e) @@ -1382,7 +1399,8 @@ def _maybe_stop_recording(task_id: str): result = _run_browser_command(task_id, "record", ["stop"]) if result.get("success"): path = result.get("data", {}).get("path", "") - logger.info("Saved browser recording for session %s: %s", task_id, path) + logger.info( + "Saved browser recording for session %s: %s", task_id, path) except Exception as e: logger.debug("Could not stop recording for %s: %s", task_id, e) finally: @@ -1392,15 +1410,15 @@ def _maybe_stop_recording(task_id: str): def browser_get_images(task_id: Optional[str] = None) -> str: """ Get all images on the current page. - + Args: task_id: Task identifier for session isolation - + Returns: JSON string with list of images (src and alt) """ effective_task_id = task_id or "default" - + # Use eval to run JavaScript that extracts images js_code = """JSON.stringify( [...document.images].map(img => ({ @@ -1410,20 +1428,20 @@ def browser_get_images(task_id: Optional[str] = None) -> str: height: img.naturalHeight })).filter(img => img.src && !img.src.startsWith('data:')) )""" - + result = _run_browser_command(effective_task_id, "eval", [js_code]) - + if result.get("success"): data = result.get("data", {}) raw_result = data.get("result", "[]") - + try: # Parse the JSON string returned by JavaScript if isinstance(raw_result, str): images = json.loads(raw_result) else: images = raw_result - + return json.dumps({ "success": True, "images": images, @@ -1446,51 +1464,53 @@ def browser_get_images(task_id: Optional[str] = None) -> str: def browser_vision(question: str, annotate: bool = False, task_id: Optional[str] = None) -> str: """ Take a screenshot of the current page and analyze it with vision AI. - + This tool captures what's visually displayed in the browser and sends it to Gemini for analysis. Useful for understanding visual content that the text-based snapshot may not capture (CAPTCHAs, verification challenges, images, complex layouts, etc.). - + The screenshot is saved persistently and its file path is returned alongside the analysis, so it can be shared with users via MEDIA: in the response. - + Args: question: What you want to know about the page visually annotate: If True, overlay numbered [N] labels on interactive elements task_id: Task identifier for session isolation - + Returns: JSON string with vision analysis results and screenshot_path """ import base64 import uuid as uuid_mod from pathlib import Path - + effective_task_id = task_id or "default" - # Save screenshot to persistent location so it can be shared with users hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) screenshots_dir = hermes_home / "browser_screenshots" - screenshot_path = screenshots_dir / f"browser_screenshot_{uuid_mod.uuid4().hex}.png" - + screenshot_path = screenshots_dir / \ + f"browser_screenshot_{uuid_mod.uuid4().hex}.png" + try: screenshots_dir.mkdir(parents=True, exist_ok=True) - + # Prune old screenshots (older than 24 hours) to prevent unbounded disk growth _cleanup_old_screenshots(screenshots_dir, max_age_hours=24) - + # Take screenshot using agent-browser - screenshot_args = [str(screenshot_path)] + screenshot_args = [] if annotate: - screenshot_args.insert(0, "--annotate") + screenshot_args.append("--annotate") + screenshot_args.append("--full") + screenshot_args.append(str(screenshot_path)) result = _run_browser_command( - effective_task_id, - "screenshot", + effective_task_id, + "screenshot", screenshot_args, timeout=30 ) - + if not result.get("success"): error_detail = result.get("error", "Unknown error") mode = "local" if _is_local_mode() else "cloud" @@ -1498,7 +1518,11 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str] "success": False, "error": f"Failed to take screenshot ({mode} mode): {error_detail}" }, ensure_ascii=False) - + + actual_screenshot_path = result.get("data", {}).get("path") + if actual_screenshot_path: + screenshot_path = Path(actual_screenshot_path) + # Check if screenshot file was created if not screenshot_path.exists(): mode = "local" if _is_local_mode() else "cloud" @@ -1511,12 +1535,12 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str] f"or a stale daemon process." ), }, ensure_ascii=False) - + # Read and convert to base64 image_data = screenshot_path.read_bytes() image_base64 = base64.b64encode(image_data).decode("ascii") data_url = f"data:image/png;base64,{image_base64}" - + vision_prompt = ( f"You are analyzing a screenshot of a web browser.\n\n" f"User's question: {question}\n\n" @@ -1547,7 +1571,7 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str] if vision_model: call_kwargs["model"] = vision_model response = call_llm(**call_kwargs) - + analysis = response.choices[0].message.content response_data = { "success": True, @@ -1558,14 +1582,15 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str] if annotate and result.get("data", {}).get("annotations"): response_data["annotations"] = result["data"]["annotations"] return json.dumps(response_data, ensure_ascii=False) - + except Exception as e: # Keep the screenshot if it was captured successfully — the failure is # in the LLM vision analysis, not the capture. Deleting a valid # screenshot loses evidence the user might need. The 24-hour cleanup # in _cleanup_old_screenshots prevents unbounded disk growth. logger.warning("browser_vision failed: %s", e, exc_info=True) - error_info = {"success": False, "error": f"Error during vision analysis: {str(e)}"} + error_info = {"success": False, + "error": f"Error during vision analysis: {str(e)}"} if screenshot_path.exists(): error_info["screenshot_path"] = str(screenshot_path) error_info["note"] = "Screenshot was captured but vision analysis failed. You can still share it via MEDIA:." @@ -1600,7 +1625,8 @@ def _cleanup_old_recordings(max_age_hours=72): """Remove browser recordings older than max_age_hours to prevent disk bloat.""" import time try: - hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) + hermes_home = Path(os.environ.get( + "HERMES_HOME", Path.home() / ".hermes")) recordings_dir = hermes_home / "browser_recordings" if not recordings_dir.exists(): return @@ -1622,15 +1648,15 @@ def _cleanup_old_recordings(max_age_hours=72): def _close_browserbase_session(session_id: str, api_key: str, project_id: str) -> bool: """ Close a Browserbase session immediately via the API. - + Uses POST /v1/sessions/{id} with status=REQUEST_RELEASE to immediately terminate the session without waiting for keepAlive timeout. - + Args: session_id: The Browserbase session ID api_key: Browserbase API key project_id: Browserbase project ID - + Returns: True if session was successfully closed, False otherwise """ @@ -1648,14 +1674,16 @@ def _close_browserbase_session(session_id: str, api_key: str, project_id: str) - }, timeout=10 ) - + if response.status_code in (200, 201, 204): - logger.debug("Successfully closed BrowserBase session %s", session_id) + logger.debug( + "Successfully closed BrowserBase session %s", session_id) return True else: - logger.warning("Failed to close session %s: HTTP %s - %s", session_id, response.status_code, response.text[:200]) + logger.warning("Failed to close session %s: HTTP %s - %s", + session_id, response.status_code, response.text[:200]) return False - + except Exception as e: logger.error("Exception closing session %s: %s", session_id, e) return False @@ -1664,57 +1692,64 @@ def _close_browserbase_session(session_id: str, api_key: str, project_id: str) - def cleanup_browser(task_id: Optional[str] = None) -> None: """ Clean up browser session for a task. - + Called automatically when a task completes or when inactivity timeout is reached. Closes both the agent-browser session and the Browserbase session. - + Args: task_id: Task identifier to clean up """ if task_id is None: task_id = "default" - + logger.debug("cleanup_browser called for task_id: %s", task_id) logger.debug("Active sessions: %s", list(_active_sessions.keys())) - + # Check if session exists (under lock), but don't remove yet - # _run_browser_command needs it to build the close command. with _cleanup_lock: session_info = _active_sessions.get(task_id) - + if session_info: bb_session_id = session_info.get("bb_session_id", "unknown") - logger.debug("Found session for task %s: bb_session_id=%s", task_id, bb_session_id) - + logger.debug("Found session for task %s: bb_session_id=%s", + task_id, bb_session_id) + # Stop auto-recording before closing (saves the file) _maybe_stop_recording(task_id) - + # Try to close via agent-browser first (needs session in _active_sessions) try: _run_browser_command(task_id, "close", [], timeout=10) - logger.debug("agent-browser close command completed for task %s", task_id) + logger.debug( + "agent-browser close command completed for task %s", task_id) except Exception as e: - logger.warning("agent-browser close failed for task %s: %s", task_id, e) - + logger.warning( + "agent-browser close failed for task %s: %s", task_id, e) + # Now remove from tracking under lock with _cleanup_lock: _active_sessions.pop(task_id, None) _session_last_activity.pop(task_id, None) - + # Cloud mode: close the Browserbase session via API if bb_session_id and not _is_local_mode(): try: config = _get_browserbase_config() - success = _close_browserbase_session(bb_session_id, config["api_key"], config["project_id"]) + success = _close_browserbase_session( + bb_session_id, config["api_key"], config["project_id"]) if not success: - logger.warning("Could not close BrowserBase session %s", bb_session_id) + logger.warning( + "Could not close BrowserBase session %s", bb_session_id) except Exception as e: - logger.error("Exception during BrowserBase session close: %s", e) - + logger.error( + "Exception during BrowserBase session close: %s", e) + # Kill the daemon process and clean up socket directory session_name = session_info.get("session_name", "") if session_name: - socket_dir = os.path.join(_socket_safe_tmpdir(), f"agent-browser-{session_name}") + socket_dir = os.path.join( + _socket_safe_tmpdir(), f"agent-browser-{session_name}") if os.path.exists(socket_dir): # agent-browser writes {session}.pid in the socket dir pid_file = os.path.join(socket_dir, f"{session_name}.pid") @@ -1722,11 +1757,13 @@ def cleanup_browser(task_id: Optional[str] = None) -> None: try: daemon_pid = int(Path(pid_file).read_text().strip()) os.kill(daemon_pid, signal.SIGTERM) - logger.debug("Killed daemon pid %s for %s", daemon_pid, session_name) + logger.debug("Killed daemon pid %s for %s", + daemon_pid, session_name) except (ProcessLookupError, ValueError, PermissionError, OSError): - logger.debug("Could not kill daemon pid for %s (already dead or inaccessible)", session_name) + logger.debug( + "Could not kill daemon pid for %s (already dead or inaccessible)", session_name) shutil.rmtree(socket_dir, ignore_errors=True) - + logger.debug("Removed task %s from active sessions", task_id) else: logger.debug("No active session found for task_id: %s", task_id) @@ -1735,7 +1772,7 @@ def cleanup_browser(task_id: Optional[str] = None) -> None: def cleanup_all_browsers() -> None: """ Clean up all active browser sessions. - + Useful for cleanup on shutdown. """ with _cleanup_lock: @@ -1747,7 +1784,7 @@ def cleanup_all_browsers() -> None: def get_active_browser_sessions() -> Dict[str, Dict[str, str]]: """ Get information about active browser sessions. - + Returns: Dict mapping task_id to session info (session_name, bb_session_id, cdp_url) """ @@ -1768,7 +1805,7 @@ def check_browser_requirements() -> bool: In **cloud mode** (BROWSERBASE_API_KEY set): the CLI *and* both ``BROWSERBASE_API_KEY`` / ``BROWSERBASE_PROJECT_ID`` must be present. - + Returns: True if all requirements are met, False otherwise """ @@ -1801,7 +1838,7 @@ if __name__ == "__main__": mode = "local" if _is_local_mode() else "cloud (Browserbase)" print(f" Mode: {mode}") - + # Check requirements if check_browser_requirements(): print("āœ… All requirements met") @@ -1811,18 +1848,19 @@ if __name__ == "__main__": _find_agent_browser() except FileNotFoundError: print(" - agent-browser CLI not found") - print(" Install: npm install -g agent-browser && agent-browser install --with-deps") + print( + " Install: npm install -g agent-browser && agent-browser install --with-deps") if not _is_local_mode(): if not os.environ.get("BROWSERBASE_API_KEY"): print(" - BROWSERBASE_API_KEY not set (required for cloud mode)") if not os.environ.get("BROWSERBASE_PROJECT_ID"): print(" - BROWSERBASE_PROJECT_ID not set (required for cloud mode)") print(" Tip: unset BROWSERBASE_API_KEY to use free local mode instead") - + print("\nšŸ“‹ Available Browser Tools:") for schema in BROWSER_TOOL_SCHEMAS: print(f" šŸ”¹ {schema['name']}: {schema['description'][:60]}...") - + print("\nšŸ’” Usage:") print(" from tools.browser_tool import browser_navigate, browser_snapshot") print(" result = browser_navigate('https://example.com', task_id='my_task')") @@ -1832,7 +1870,6 @@ if __name__ == "__main__": # --------------------------------------------------------------------------- # Registry # --------------------------------------------------------------------------- -from tools.registry import registry _BROWSER_SCHEMA_MAP = {s["name"]: s for s in BROWSER_TOOL_SCHEMAS} @@ -1840,7 +1877,8 @@ registry.register( name="browser_navigate", toolset="browser", schema=_BROWSER_SCHEMA_MAP["browser_navigate"], - handler=lambda args, **kw: browser_navigate(url=args.get("url", ""), task_id=kw.get("task_id")), + handler=lambda args, **kw: browser_navigate( + url=args.get("url", ""), task_id=kw.get("task_id")), check_fn=check_browser_requirements, ) registry.register( @@ -1855,7 +1893,8 @@ registry.register( name="browser_click", toolset="browser", schema=_BROWSER_SCHEMA_MAP["browser_click"], - handler=lambda args, **kw: browser_click(**args, task_id=kw.get("task_id")), + handler=lambda args, **kw: browser_click(** + args, task_id=kw.get("task_id")), check_fn=check_browser_requirements, ) registry.register( @@ -1869,7 +1908,8 @@ registry.register( name="browser_scroll", toolset="browser", schema=_BROWSER_SCHEMA_MAP["browser_scroll"], - handler=lambda args, **kw: browser_scroll(**args, task_id=kw.get("task_id")), + handler=lambda args, **kw: browser_scroll(** + args, task_id=kw.get("task_id")), check_fn=check_browser_requirements, ) registry.register( @@ -1883,7 +1923,8 @@ registry.register( name="browser_press", toolset="browser", schema=_BROWSER_SCHEMA_MAP["browser_press"], - handler=lambda args, **kw: browser_press(key=args.get("key", ""), task_id=kw.get("task_id")), + handler=lambda args, **kw: browser_press( + key=args.get("key", ""), task_id=kw.get("task_id")), check_fn=check_browser_requirements, ) registry.register( @@ -1904,13 +1945,20 @@ registry.register( name="browser_vision", toolset="browser", schema=_BROWSER_SCHEMA_MAP["browser_vision"], - handler=lambda args, **kw: browser_vision(question=args.get("question", ""), annotate=args.get("annotate", False), task_id=kw.get("task_id")), + handler=lambda args, **kw: browser_vision( + question=args.get("question", ""), + annotate=args.get("annotate", False), + task_id=kw.get("task_id"), + ), check_fn=check_browser_requirements, ) registry.register( name="browser_console", toolset="browser", schema=_BROWSER_SCHEMA_MAP["browser_console"], - handler=lambda args, **kw: browser_console(clear=args.get("clear", False), task_id=kw.get("task_id")), + handler=lambda args, **kw: browser_console( + clear=args.get("clear", False), + task_id=kw.get("task_id"), + ), check_fn=check_browser_requirements, )