mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
* docs: browser CDP supervisor design (for upcoming PR) Design doc ahead of implementation — dialog + iframe detection/interaction via a persistent CDP supervisor. Covers backend capability matrix (verified live 2026-04-23), architecture, lifecycle, policy, agent surface, PR split, non-goals, and test plan. Supersedes #12550. No code changes in this commit. * feat(browser): add persistent CDP supervisor for dialog + frame detection Single persistent CDP WebSocket per Hermes task_id that subscribes to Page/Runtime/Target events and maintains thread-safe state for pending dialogs, frame tree, and console errors. Supervisor lives in its own daemon thread running an asyncio loop; external callers use sync API (snapshot(), respond_to_dialog()) that bridges onto the loop. Auto-attaches to OOPIF child targets via Target.setAutoAttach{flatten:true} and enables Page+Runtime on each so iframe-origin dialogs surface through the same supervisor. Dialog policies: must_respond (default, 300s safety timeout), auto_dismiss, auto_accept. Frame tree capped at 30 entries + OOPIF depth 2 to keep snapshot payloads bounded on ad-heavy pages. E2E verified against real Chrome via smoke test — detects + responds to main-frame alerts, iframe-contentWindow alerts, preserves frame tree, graceful no-dialog error path, clean shutdown. No agent-facing tool wiring in this commit (comes next). * feat(browser): add browser_dialog tool wired to CDP supervisor Agent-facing response-only tool. Schema: action: 'accept' | 'dismiss' (required) prompt_text: response for prompt() dialogs (optional) dialog_id: disambiguate when multiple dialogs queued (optional) Handler: SUPERVISOR_REGISTRY.get(task_id).respond_to_dialog(...) check_fn shares _browser_cdp_check with browser_cdp so both surface and hide together. When no supervisor is attached (Camofox, default Playwright, or no browser session started yet), tool is hidden; if somehow invoked it returns a clear error pointing the agent to browser_navigate / /browser connect. Registered in _HERMES_CORE_TOOLS and the browser / hermes-acp / hermes-api-server toolsets alongside browser_cdp. * feat(browser): wire CDP supervisor into session lifecycle + browser_snapshot Supervisor lifecycle: * _get_session_info lazy-starts the supervisor after a session row is materialized — covers every backend code path (Browserbase, cdp_url override, /browser connect, future providers) with one hook. * cleanup_browser(task_id) stops the supervisor for that task first (before the backend tears down CDP). * cleanup_all_browsers() calls SUPERVISOR_REGISTRY.stop_all(). * /browser connect eagerly starts the supervisor for task 'default' so the first snapshot already shows pending_dialogs. * /browser disconnect stops the supervisor. CDP URL resolution for the supervisor: 1. BROWSER_CDP_URL / browser.cdp_url override. 2. Fallback: session_info['cdp_url'] from cloud providers (Browserbase). browser_snapshot merges supervisor state (pending_dialogs + frame_tree) into its JSON output when a supervisor is active — the agent reads pending_dialogs from the snapshot it already requests, then calls browser_dialog to respond. No extra tool surface. Config defaults: * browser.dialog_policy: 'must_respond' (new) * browser.dialog_timeout_s: 300 (new) No version bump — new keys deep-merge into existing browser section. Deadlock fix in supervisor event dispatch: * _on_dialog_opening and _on_target_attached used to await CDP calls while the reader was still processing an event — but only the reader can set the response Future, so the call timed out. * Both now fire asyncio.create_task(...) so the reader stays pumping. * auto_dismiss/auto_accept now actually close the dialog immediately. Tests (tests/tools/test_browser_supervisor.py, 11 tests, real Chrome): * supervisor start/snapshot * main-frame alert detection + dismiss * iframe.contentWindow alert * prompt() with prompt_text reply * respond with no pending dialog -> clean error * auto_dismiss clears on event * registry idempotency * registry stop -> snapshot reports inactive * browser_dialog tool no-supervisor error * browser_dialog invalid action * browser_dialog end-to-end via tool handler xdist-safe: chrome_cdp fixture uses a per-worker port. Skipped when google-chrome/chromium isn't installed. * docs(browser): document browser_dialog tool + CDP supervisor - user-guide/features/browser.md: new browser_dialog section with workflow, availability gate, and dialog_policy table - reference/tools-reference.md: row for browser_dialog, tool count bumped 53 -> 54, browser tools count 11 -> 12 - reference/toolsets-reference.md: browser_dialog added to browser toolset row with note on pending_dialogs / frame_tree snapshot fields Full design doc lives at developer-guide/browser-supervisor.md (committed earlier). * fix(browser): reconnect loop + recent_dialogs for Browserbase visibility Found via Browserbase E2E test that revealed two production-critical issues: 1. **Supervisor WebSocket drops when other clients disconnect.** Browserbase's CDP proxy tears down our long-lived WebSocket whenever a short-lived client (e.g. agent-browser CLI's per-command CDP connection) disconnects. Fixed with a reconnecting _run loop that re-attaches with exponential backoff on drops. _page_session_id and _child_sessions are reset on each reconnect; pending_dialogs and frames are preserved across reconnects. 2. **Browserbase auto-dismisses dialogs server-side within ~10ms.** Their Playwright-based CDP proxy dismisses alert/confirm/prompt before our Page.handleJavaScriptDialog call can respond. So pending_dialogs is empty by the time the agent reads a snapshot on Browserbase. Added a recent_dialogs ring buffer (capacity 20) that retains a DialogRecord for every dialog that opened, with a closed_by tag: * 'agent' — agent called browser_dialog * 'auto_policy' — local auto_dismiss/auto_accept fired * 'watchdog' — must_respond timeout auto-dismissed (300s default) * 'remote' — browser/backend closed it on us (Browserbase) Agents on Browserbase now see the dialog history with closed_by='remote' so they at least know a dialog fired, even though they couldn't respond. 3. **Page.javascriptDialogClosed matching bug.** The event doesn't include a 'message' field (CDP spec has only 'result' and 'userInput') but our _on_dialog_closed was matching on message. Fixed to match by session_id + oldest-first, with a safety assumption that only one dialog is in flight per session (the JS thread is blocked while a dialog is up). Docs + tests updated: * browser.md: new availability matrix showing the three backends and which mode (pending / recent / response) each supports * developer-guide/browser-supervisor.md: three-field snapshot schema with closed_by semantics * test_browser_supervisor.py: +test_recent_dialogs_ring_buffer (12/12 passing against real Chrome) E2E verified both backends: * Local Chrome via /browser connect: detect + respond full workflow (smoke_supervisor.py all 7 scenarios pass) * Browserbase: detect via recent_dialogs with closed_by='remote' (smoke_supervisor_browserbase_v2.py passes) Camofox remains out of scope (REST-only, no CDP) — tracked for upstream PR 3. * feat(browser): XHR bridge for dialog response on Browserbase (FIXED) Browserbase's CDP proxy auto-dismisses native JS dialogs within ~10ms, so Page.handleJavaScriptDialog calls lose the race. Solution: bypass native dialogs entirely. The supervisor now injects Page.addScriptToEvaluateOnNewDocument with a JavaScript override for window.alert/confirm/prompt. Those overrides perform a synchronous XMLHttpRequest to a magic host ('hermes-dialog-bridge.invalid'). We intercept those XHRs via Fetch.enable with a requestStage=Request pattern. Flow when a page calls alert('hi'): 1. window.alert override intercepts, builds XHR GET to http://hermes-dialog-bridge.invalid/?kind=alert&message=hi 2. Sync XHR blocks the page's JS thread (mirrors real dialog semantics) 3. Fetch.requestPaused fires on our WebSocket; supervisor surfaces it as a pending dialog with bridge_request_id set 4. Agent reads pending_dialogs from browser_snapshot, calls browser_dialog 5. Supervisor calls Fetch.fulfillRequest with JSON body: {accept: true|false, prompt_text: '...', dialog_id: 'd-N'} 6. The injected script parses the body, returns the appropriate value from the override (undefined for alert, bool for confirm, string|null for prompt) This works identically on Browserbase AND local Chrome — no native dialog ever fires, so Browserbase's auto-dismiss has nothing to race. Dialog policies (must_respond / auto_dismiss / auto_accept) all still work. Bridge is installed on every attached session (main page + OOPIF child sessions) so iframe dialogs are captured too. Native-dialog path kept as a fallback for backends that don't auto-dismiss (so a page that somehow bypasses our override — e.g. iframes that load after Fetch.enable but before the init-script runs — still gets observed via Page.javascriptDialogOpening). E2E VERIFIED: * Local Chrome: 13/13 pytest tests green (12 original + new test_bridge_captures_prompt_and_returns_reply_text that asserts window.__ret === 'AGENT-SUPPLIED-REPLY' after agent responds) * Browserbase: smoke_bb_bridge_v2.py runs 4/4 PASS: - alert('BB-ALERT-MSG') dismiss → page.alert_ret = undefined ✓ - prompt('BB-PROMPT-MSG', 'default-xyz') accept with 'AGENT-REPLY' → page.prompt_ret === 'AGENT-REPLY' ✓ - confirm('BB-CONFIRM-MSG') accept → page.confirm_ret === true ✓ - confirm('BB-CONFIRM-MSG') dismiss → page.confirm_ret === false ✓ Docs updated in browser.md and developer-guide/browser-supervisor.md — availability matrix now shows Browserbase at full parity with local Chrome for both detection and response. * feat(browser): cross-origin iframe interaction via browser_cdp(frame_id=...) Adds iframe interaction to the CDP supervisor PR (was queued as PR 2). Design: browser_cdp gets an optional frame_id parameter. When set, the tool looks up the frame in the supervisor's frame_tree, grabs its child cdp_session_id (OOPIF session), and dispatches the CDP call through the supervisor's already-connected WebSocket via run_coroutine_threadsafe. Why not stateless: on Browserbase, each fresh browser_cdp WebSocket must re-negotiate against a signed connectUrl. The session info carries a specific URL that can expire while the supervisor's long-lived connection stays valid. Routing via the supervisor sidesteps this. Agent workflow: 1. browser_snapshot → frame_tree.children[] shows OOPIFs with is_oopif=true 2. browser_cdp(method='Runtime.evaluate', frame_id=<OOPIF frame_id>, params={'expression': 'document.title', 'returnByValue': True}) 3. Supervisor dispatches the call on the OOPIF's child session Supervisor state fixes needed along the way: * _on_frame_detached now skips reason='swap' (frame migrating processes) * _on_frame_detached also skips when the frame is an OOPIF with a live child session — Browserbase fires spurious remove events when a same-origin iframe gets promoted to OOPIF * _on_target_detached clears cdp_session_id but KEEPS the frame record so the agent still sees the OOPIF in frame_tree during transient session flaps E2E VERIFIED on Browserbase (smoke_bb_iframe_agent_path.py): browser_cdp(method='Runtime.evaluate', params={'expression': 'document.title', 'returnByValue': True}, frame_id=<OOPIF>) → {'success': True, 'result': {'value': 'Example Domain'}} The iframe is <iframe src='https://example.com/'> inside a top-level data: URL page on a real Browserbase session. The agent Runtime.evaluates INSIDE the cross-origin iframe and gets example.com's title back. Tests (tests/tools/test_browser_supervisor.py — 16 pass total): * test_browser_cdp_frame_id_routes_via_supervisor — injects fake OOPIF, verifies routing via supervisor, Runtime.evaluate returns 1+1=2 * test_browser_cdp_frame_id_missing_supervisor — clean error when no supervisor attached * test_browser_cdp_frame_id_not_in_frame_tree — clean error on bad frame_id Docs (browser.md and developer-guide/browser-supervisor.md) updated with the iframe workflow, availability matrix now shows OOPIF eval as shipped for local Chrome + Browserbase. * test(browser): real-OOPIF E2E verified manually + chrome_cdp uses --site-per-process When asked 'did you test the iframe stuff' I had only done a mocked pytest (fake injected OOPIF) plus a Browserbase E2E. Closed the local-Chrome real-OOPIF gap by writing /tmp/dialog-iframe-test/ smoke_local_oopif.py: * 2 http servers on different hostnames (localhost:18905 + 127.0.0.1:18906) * Chrome with --site-per-process so the cross-origin iframe becomes a real OOPIF in its own process * Navigate, find OOPIF in supervisor.frame_tree, call browser_cdp(method='Runtime.evaluate', frame_id=<OOPIF>) which routes through the supervisor's child session * Asserts iframe document.title === 'INNER-FRAME-XYZ' (from the inner page, retrieved via OOPIF eval) PASSED on 2026-04-23. Tried to embed this as a pytest but hit an asyncio version quirk between venv (3.11) and the system python (3.13) — Page.navigate hangs in the pytest harness but works in standalone. Left a self-documenting skip test that points to the smoke script + describes the verification. chrome_cdp fixture now passes --site-per-process so future iframe tests can rely on OOPIF behavior. Result: 16 pass + 1 documented-skip = 17 tests in tests/tools/test_browser_supervisor.py. * docs(browser): add dialog_policy + dialog_timeout_s to configuration.md, fix tool count Pre-merge docs audit revealed two gaps: 1. user-guide/configuration.md browser config example was missing the two new dialog_* knobs. Added with a short table explaining must_respond / auto_dismiss / auto_accept semantics and a link to the feature page for the full workflow. 2. reference/tools-reference.md header said '54 built-in tools' — real count on main is 54, this branch adds browser_dialog so it's 55. Fixed the header. (browser count was already correctly bumped 11 -> 12 in the earlier docs commit.) No code changes.
1362 lines
55 KiB
Python
1362 lines
55 KiB
Python
"""Persistent CDP supervisor for browser dialog + frame detection.
|
|
|
|
One ``CDPSupervisor`` runs per Hermes ``task_id`` that has a reachable CDP
|
|
endpoint. It holds a single persistent WebSocket to the backend, subscribes
|
|
to ``Page`` / ``Runtime`` / ``Target`` events on every attached session
|
|
(top-level page and every OOPIF / worker target that auto-attaches), and
|
|
surfaces observable state — pending dialogs and frame tree — through a
|
|
thread-safe snapshot object that tool handlers consume synchronously.
|
|
|
|
The supervisor is NOT in the agent's tool schema. Its output reaches the
|
|
agent via two channels:
|
|
|
|
1. ``browser_snapshot`` merges supervisor state into its return payload
|
|
(see ``tools/browser_tool.py``).
|
|
2. ``browser_dialog`` tool responds to a pending dialog by calling
|
|
``respond_to_dialog()`` on the active supervisor.
|
|
|
|
Design spec: ``website/docs/developer-guide/browser-supervisor.md``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import websockets
|
|
from websockets.asyncio.client import ClientConnection
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ── Config defaults ───────────────────────────────────────────────────────────
|
|
|
|
DIALOG_POLICY_MUST_RESPOND = "must_respond"
|
|
DIALOG_POLICY_AUTO_DISMISS = "auto_dismiss"
|
|
DIALOG_POLICY_AUTO_ACCEPT = "auto_accept"
|
|
|
|
_VALID_POLICIES = frozenset(
|
|
{DIALOG_POLICY_MUST_RESPOND, DIALOG_POLICY_AUTO_DISMISS, DIALOG_POLICY_AUTO_ACCEPT}
|
|
)
|
|
|
|
DEFAULT_DIALOG_POLICY = DIALOG_POLICY_MUST_RESPOND
|
|
DEFAULT_DIALOG_TIMEOUT_S = 300.0
|
|
|
|
# Snapshot caps for frame_tree — keep payloads bounded on ad-heavy pages.
|
|
FRAME_TREE_MAX_ENTRIES = 30
|
|
FRAME_TREE_MAX_OOPIF_DEPTH = 2
|
|
|
|
# Ring buffer of recent console-level events (used later by PR 2 diagnostics).
|
|
CONSOLE_HISTORY_MAX = 50
|
|
|
|
# Keep the last N closed dialogs in ``recent_dialogs`` so agents on backends
|
|
# that auto-dismiss server-side (e.g. Browserbase) can still observe that a
|
|
# dialog fired, even if they couldn't respond to it in time.
|
|
RECENT_DIALOGS_MAX = 20
|
|
|
|
# Magic host the injected dialog bridge XHRs to. Intercepted via the CDP
|
|
# Fetch domain before any network resolution happens, so the hostname never
|
|
# has to exist. Keep this ASCII + URL-safe; we also gate Fetch patterns on it.
|
|
DIALOG_BRIDGE_HOST = "hermes-dialog-bridge.invalid"
|
|
DIALOG_BRIDGE_URL_PATTERN = f"http://{DIALOG_BRIDGE_HOST}/*"
|
|
|
|
# Script injected into every frame via Page.addScriptToEvaluateOnNewDocument.
|
|
# Overrides alert/confirm/prompt to round-trip through a sync XHR that we
|
|
# intercept via Fetch.requestPaused. Works on Browserbase (whose CDP proxy
|
|
# auto-dismisses REAL native dialogs) because the native dialogs never fire
|
|
# in the first place — the overrides take precedence.
|
|
_DIALOG_BRIDGE_SCRIPT = r"""
|
|
(() => {
|
|
if (window.__hermesDialogBridgeInstalled) return;
|
|
window.__hermesDialogBridgeInstalled = true;
|
|
const ENDPOINT = "http://hermes-dialog-bridge.invalid/";
|
|
function ask(kind, message, defaultPrompt) {
|
|
try {
|
|
const xhr = new XMLHttpRequest();
|
|
// Use GET with query params so we don't need to worry about request
|
|
// body encoding in the Fetch interceptor.
|
|
const params = new URLSearchParams({
|
|
kind: String(kind || ""),
|
|
message: String(message == null ? "" : message),
|
|
default_prompt: String(defaultPrompt == null ? "" : defaultPrompt),
|
|
});
|
|
xhr.open("GET", ENDPOINT + "?" + params.toString(), false); // sync
|
|
xhr.send(null);
|
|
if (xhr.status !== 200) return null;
|
|
const body = xhr.responseText || "";
|
|
let parsed;
|
|
try { parsed = JSON.parse(body); } catch (e) { return null; }
|
|
if (kind === "alert") return undefined;
|
|
if (kind === "confirm") return Boolean(parsed && parsed.accept);
|
|
if (kind === "prompt") {
|
|
if (!parsed || !parsed.accept) return null;
|
|
return parsed.prompt_text == null ? "" : String(parsed.prompt_text);
|
|
}
|
|
return null;
|
|
} catch (e) {
|
|
// If the bridge is unreachable, fall back to the native call so the
|
|
// page still sees *some* behavior (the backend will auto-dismiss).
|
|
return null;
|
|
}
|
|
}
|
|
const realAlert = window.alert;
|
|
const realConfirm = window.confirm;
|
|
const realPrompt = window.prompt;
|
|
window.alert = function(message) { ask("alert", message, ""); };
|
|
window.confirm = function(message) {
|
|
const r = ask("confirm", message, "");
|
|
return r === null ? false : Boolean(r);
|
|
};
|
|
window.prompt = function(message, def) {
|
|
const r = ask("prompt", message, def == null ? "" : def);
|
|
return r === null ? null : String(r);
|
|
};
|
|
// onbeforeunload — we can't really synchronously prompt the user from this
|
|
// event without racing navigation. Leave native behavior for now; the
|
|
// supervisor's native-dialog fallback path still surfaces them in
|
|
// recent_dialogs.
|
|
})();
|
|
"""
|
|
|
|
|
|
# ── Data model ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@dataclass
|
|
class PendingDialog:
|
|
"""A JS dialog currently open on some frame's session."""
|
|
|
|
id: str
|
|
type: str # "alert" | "confirm" | "prompt" | "beforeunload"
|
|
message: str
|
|
default_prompt: str
|
|
opened_at: float
|
|
cdp_session_id: str # which attached CDP session the dialog fired in
|
|
frame_id: Optional[str] = None
|
|
# When set, the dialog was captured via the bridge XHR path (Fetch domain).
|
|
# Response must be delivered via Fetch.fulfillRequest, NOT
|
|
# Page.handleJavaScriptDialog — the native dialog never fired.
|
|
bridge_request_id: Optional[str] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"id": self.id,
|
|
"type": self.type,
|
|
"message": self.message,
|
|
"default_prompt": self.default_prompt,
|
|
"opened_at": self.opened_at,
|
|
"frame_id": self.frame_id,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class DialogRecord:
|
|
"""A historical record of a dialog that was opened and then handled.
|
|
|
|
Retained in ``recent_dialogs`` for a short window so agents on backends
|
|
that auto-dismiss dialogs server-side (Browserbase) can still observe
|
|
that a dialog fired, even though they couldn't respond to it.
|
|
"""
|
|
|
|
id: str
|
|
type: str
|
|
message: str
|
|
opened_at: float
|
|
closed_at: float
|
|
closed_by: str # "agent" | "auto_policy" | "remote" | "watchdog"
|
|
frame_id: Optional[str] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"id": self.id,
|
|
"type": self.type,
|
|
"message": self.message,
|
|
"opened_at": self.opened_at,
|
|
"closed_at": self.closed_at,
|
|
"closed_by": self.closed_by,
|
|
"frame_id": self.frame_id,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class FrameInfo:
|
|
"""One frame in the page's frame tree.
|
|
|
|
``is_oopif`` means the frame has its own CDP target (separate process,
|
|
reachable via ``cdp_session_id``). Same-origin / srcdoc iframes share
|
|
the parent process and have ``is_oopif=False`` + ``cdp_session_id=None``.
|
|
"""
|
|
|
|
frame_id: str
|
|
url: str
|
|
origin: str
|
|
parent_frame_id: Optional[str]
|
|
is_oopif: bool
|
|
cdp_session_id: Optional[str] = None
|
|
name: str = ""
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
d = {
|
|
"frame_id": self.frame_id,
|
|
"url": self.url,
|
|
"origin": self.origin,
|
|
"is_oopif": self.is_oopif,
|
|
}
|
|
if self.cdp_session_id:
|
|
d["session_id"] = self.cdp_session_id
|
|
if self.parent_frame_id:
|
|
d["parent_frame_id"] = self.parent_frame_id
|
|
if self.name:
|
|
d["name"] = self.name
|
|
return d
|
|
|
|
|
|
@dataclass
|
|
class ConsoleEvent:
|
|
"""Ring buffer entry for console + exception traffic."""
|
|
|
|
ts: float
|
|
level: str # "log" | "error" | "warning" | "exception"
|
|
text: str
|
|
url: Optional[str] = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SupervisorSnapshot:
|
|
"""Read-only snapshot of supervisor state.
|
|
|
|
Frozen dataclass so tool handlers can freely dereference without
|
|
worrying about mutation under their feet.
|
|
"""
|
|
|
|
pending_dialogs: Tuple[PendingDialog, ...]
|
|
recent_dialogs: Tuple[DialogRecord, ...]
|
|
frame_tree: Dict[str, Any]
|
|
console_errors: Tuple[ConsoleEvent, ...]
|
|
active: bool # False if supervisor is detached/stopped
|
|
cdp_url: str
|
|
task_id: str
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Serialize for inclusion in ``browser_snapshot`` output."""
|
|
out: Dict[str, Any] = {
|
|
"pending_dialogs": [d.to_dict() for d in self.pending_dialogs],
|
|
"frame_tree": self.frame_tree,
|
|
}
|
|
if self.recent_dialogs:
|
|
out["recent_dialogs"] = [d.to_dict() for d in self.recent_dialogs]
|
|
return out
|
|
|
|
|
|
# ── Supervisor core ───────────────────────────────────────────────────────────
|
|
|
|
|
|
class CDPSupervisor:
|
|
"""One supervisor per (task_id, cdp_url) pair.
|
|
|
|
Lifecycle:
|
|
* ``start()`` — kicked off by ``SupervisorRegistry.get_or_start``; spawns
|
|
a daemon thread running its own asyncio loop, connects the WebSocket,
|
|
attaches to the first page target, enables domains, starts
|
|
auto-attaching to child targets.
|
|
* ``snapshot()`` — sync, thread-safe, called from tool handlers.
|
|
* ``respond_to_dialog(action, ...)`` — sync bridge; schedules a coroutine
|
|
on the supervisor's loop and waits (with timeout) for the CDP ack.
|
|
* ``stop()`` — cancels task, closes WebSocket, joins thread.
|
|
|
|
All CDP I/O lives on the supervisor's own loop. External callers never
|
|
touch the loop directly; they go through the sync API above.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
task_id: str,
|
|
cdp_url: str,
|
|
*,
|
|
dialog_policy: str = DEFAULT_DIALOG_POLICY,
|
|
dialog_timeout_s: float = DEFAULT_DIALOG_TIMEOUT_S,
|
|
) -> None:
|
|
if dialog_policy not in _VALID_POLICIES:
|
|
raise ValueError(
|
|
f"Invalid dialog_policy {dialog_policy!r}; "
|
|
f"must be one of {sorted(_VALID_POLICIES)}"
|
|
)
|
|
self.task_id = task_id
|
|
self.cdp_url = cdp_url
|
|
self.dialog_policy = dialog_policy
|
|
self.dialog_timeout_s = float(dialog_timeout_s)
|
|
|
|
# State protected by ``_state_lock`` for cross-thread reads.
|
|
self._state_lock = threading.Lock()
|
|
self._pending_dialogs: Dict[str, PendingDialog] = {}
|
|
self._recent_dialogs: List[DialogRecord] = []
|
|
self._frames: Dict[str, FrameInfo] = {}
|
|
self._console_events: List[ConsoleEvent] = []
|
|
self._active = False
|
|
|
|
# Supervisor loop machinery — populated in start().
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._ready_event = threading.Event()
|
|
self._start_error: Optional[BaseException] = None
|
|
self._stop_requested = False
|
|
|
|
# CDP call tracking (runs on supervisor loop only).
|
|
self._next_call_id = 1
|
|
self._pending_calls: Dict[int, asyncio.Future] = {}
|
|
self._ws: Optional[ClientConnection] = None
|
|
self._page_session_id: Optional[str] = None
|
|
self._child_sessions: Dict[str, Dict[str, Any]] = {} # session_id -> info
|
|
|
|
# Dialog auto-dismiss watchdog handles (per dialog id).
|
|
self._dialog_watchdogs: Dict[str, asyncio.TimerHandle] = {}
|
|
# Monotonic id generator for dialogs (human-readable in snapshots).
|
|
self._dialog_seq = 0
|
|
|
|
# ── Public sync API ──────────────────────────────────────────────────────
|
|
|
|
def start(self, timeout: float = 15.0) -> None:
|
|
"""Launch the background loop and wait until attachment is complete.
|
|
|
|
Raises whatever exception attach failed with (connect error, bad
|
|
WebSocket URL, CDP domain enable failure, etc.). On success, the
|
|
supervisor is fully wired up — pending-dialog events will be captured
|
|
as of the moment ``start()`` returns.
|
|
"""
|
|
if self._thread and self._thread.is_alive():
|
|
return
|
|
self._ready_event.clear()
|
|
self._start_error = None
|
|
self._stop_requested = False
|
|
self._thread = threading.Thread(
|
|
target=self._thread_main,
|
|
name=f"cdp-supervisor-{self.task_id}",
|
|
daemon=True,
|
|
)
|
|
self._thread.start()
|
|
if not self._ready_event.wait(timeout=timeout):
|
|
self.stop()
|
|
raise TimeoutError(
|
|
f"CDP supervisor did not attach within {timeout}s "
|
|
f"(cdp_url={self.cdp_url[:80]}...)"
|
|
)
|
|
if self._start_error is not None:
|
|
err = self._start_error
|
|
self.stop()
|
|
raise err
|
|
|
|
def stop(self, timeout: float = 5.0) -> None:
|
|
"""Cancel the supervisor task and join the thread."""
|
|
self._stop_requested = True
|
|
loop = self._loop
|
|
if loop is not None and loop.is_running():
|
|
# Close the WebSocket from inside the loop — this makes ``async for
|
|
# raw in self._ws`` return cleanly, ``_run`` hits its ``finally``,
|
|
# pending tasks get cancelled in order, THEN the thread exits.
|
|
async def _close_ws():
|
|
ws = self._ws
|
|
self._ws = None
|
|
if ws is not None:
|
|
try:
|
|
await ws.close()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
fut = asyncio.run_coroutine_threadsafe(_close_ws(), loop)
|
|
try:
|
|
fut.result(timeout=2.0)
|
|
except Exception:
|
|
pass
|
|
except RuntimeError:
|
|
pass # loop already shutting down
|
|
if self._thread is not None:
|
|
self._thread.join(timeout=timeout)
|
|
with self._state_lock:
|
|
self._active = False
|
|
|
|
def snapshot(self) -> SupervisorSnapshot:
|
|
"""Return an immutable snapshot of current state."""
|
|
with self._state_lock:
|
|
dialogs = tuple(self._pending_dialogs.values())
|
|
recent = tuple(self._recent_dialogs[-RECENT_DIALOGS_MAX:])
|
|
frames_tree = self._build_frame_tree_locked()
|
|
console = tuple(self._console_events[-CONSOLE_HISTORY_MAX:])
|
|
active = self._active
|
|
return SupervisorSnapshot(
|
|
pending_dialogs=dialogs,
|
|
recent_dialogs=recent,
|
|
frame_tree=frames_tree,
|
|
console_errors=console,
|
|
active=active,
|
|
cdp_url=self.cdp_url,
|
|
task_id=self.task_id,
|
|
)
|
|
|
|
def respond_to_dialog(
|
|
self,
|
|
action: str,
|
|
*,
|
|
prompt_text: Optional[str] = None,
|
|
dialog_id: Optional[str] = None,
|
|
timeout: float = 10.0,
|
|
) -> Dict[str, Any]:
|
|
"""Accept/dismiss a pending dialog. Sync bridge onto the supervisor loop.
|
|
|
|
Returns ``{"ok": True, "dialog": {...}}`` on success,
|
|
``{"ok": False, "error": "..."}`` on a recoverable error (no dialog,
|
|
ambiguous dialog_id, supervisor inactive).
|
|
"""
|
|
if action not in ("accept", "dismiss"):
|
|
return {"ok": False, "error": f"action must be 'accept' or 'dismiss', got {action!r}"}
|
|
|
|
with self._state_lock:
|
|
if not self._active:
|
|
return {"ok": False, "error": "supervisor is not active"}
|
|
pending = list(self._pending_dialogs.values())
|
|
if not pending:
|
|
return {"ok": False, "error": "no dialog is currently open"}
|
|
if dialog_id:
|
|
dialog = self._pending_dialogs.get(dialog_id)
|
|
if dialog is None:
|
|
return {
|
|
"ok": False,
|
|
"error": f"dialog_id {dialog_id!r} not found "
|
|
f"(known: {sorted(self._pending_dialogs)})",
|
|
}
|
|
elif len(pending) > 1:
|
|
return {
|
|
"ok": False,
|
|
"error": (
|
|
f"{len(pending)} pending dialogs; specify dialog_id. "
|
|
f"Candidates: {[d.id for d in pending]}"
|
|
),
|
|
}
|
|
else:
|
|
dialog = pending[0]
|
|
snapshot_copy = dialog
|
|
|
|
loop = self._loop
|
|
if loop is None:
|
|
return {"ok": False, "error": "supervisor loop is not running"}
|
|
|
|
async def _do_respond():
|
|
return await self._handle_dialog_cdp(
|
|
snapshot_copy, accept=(action == "accept"), prompt_text=prompt_text or ""
|
|
)
|
|
|
|
try:
|
|
fut = asyncio.run_coroutine_threadsafe(_do_respond(), loop)
|
|
fut.result(timeout=timeout)
|
|
except Exception as e:
|
|
return {"ok": False, "error": f"{type(e).__name__}: {e}"}
|
|
return {"ok": True, "dialog": snapshot_copy.to_dict()}
|
|
|
|
# ── Supervisor loop internals ────────────────────────────────────────────
|
|
|
|
def _thread_main(self) -> None:
|
|
"""Entry point for the supervisor's dedicated thread."""
|
|
loop = asyncio.new_event_loop()
|
|
self._loop = loop
|
|
try:
|
|
asyncio.set_event_loop(loop)
|
|
loop.run_until_complete(self._run())
|
|
except BaseException as e: # noqa: BLE001 — propagate via _start_error
|
|
if not self._ready_event.is_set():
|
|
self._start_error = e
|
|
self._ready_event.set()
|
|
else:
|
|
logger.warning("CDP supervisor %s crashed: %s", self.task_id, e)
|
|
finally:
|
|
# Flush any remaining tasks before closing the loop so we don't
|
|
# emit "Task was destroyed but it is pending" warnings.
|
|
try:
|
|
pending = [t for t in asyncio.all_tasks(loop) if not t.done()]
|
|
for t in pending:
|
|
t.cancel()
|
|
if pending:
|
|
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
loop.close()
|
|
except Exception:
|
|
pass
|
|
with self._state_lock:
|
|
self._active = False
|
|
|
|
async def _run(self) -> None:
|
|
"""Top-level supervisor coroutine.
|
|
|
|
Holds a reconnecting loop so we survive the remote closing the
|
|
WebSocket — Browserbase in particular tears down the CDP socket
|
|
every time a short-lived client (e.g. agent-browser's per-command
|
|
CDP client) disconnects. We drop our state snapshot keys that
|
|
depend on specific CDP session ids, re-attach, and keep going.
|
|
"""
|
|
attempt = 0
|
|
last_success_at = 0.0
|
|
backoff = 0.5
|
|
while not self._stop_requested:
|
|
try:
|
|
self._ws = await asyncio.wait_for(
|
|
websockets.connect(self.cdp_url, max_size=50 * 1024 * 1024),
|
|
timeout=10.0,
|
|
)
|
|
except Exception as e:
|
|
attempt += 1
|
|
if not self._ready_event.is_set():
|
|
# Never connected once — fatal for start().
|
|
self._start_error = e
|
|
self._ready_event.set()
|
|
return
|
|
logger.warning(
|
|
"CDP supervisor %s: connect failed (attempt %s): %s",
|
|
self.task_id, attempt, e,
|
|
)
|
|
await asyncio.sleep(min(backoff, 10.0))
|
|
backoff = min(backoff * 2, 10.0)
|
|
continue
|
|
|
|
reader_task = asyncio.create_task(self._read_loop(), name="cdp-reader")
|
|
try:
|
|
# Reset per-connection session state so stale ids don't hang
|
|
# around after a reconnect.
|
|
self._page_session_id = None
|
|
self._child_sessions.clear()
|
|
# We deliberately keep `_pending_dialogs` and `_frames` —
|
|
# they're reconciled as the supervisor resubscribes and
|
|
# receives fresh events. Worst case: an agent sees a stale
|
|
# dialog entry that the new session's handleJavaScriptDialog
|
|
# call rejects with "no dialog is showing" (logged, not
|
|
# surfaced).
|
|
await self._attach_initial_page()
|
|
with self._state_lock:
|
|
self._active = True
|
|
last_success_at = time.time()
|
|
backoff = 0.5 # reset after a successful attach
|
|
if not self._ready_event.is_set():
|
|
self._ready_event.set()
|
|
# Run until the reader returns.
|
|
await reader_task
|
|
except BaseException as e:
|
|
if not self._ready_event.is_set():
|
|
# Never got to ready — propagate to start().
|
|
self._start_error = e
|
|
self._ready_event.set()
|
|
raise
|
|
logger.warning(
|
|
"CDP supervisor %s: session dropped after %.1fs: %s",
|
|
self.task_id,
|
|
time.time() - last_success_at,
|
|
e,
|
|
)
|
|
finally:
|
|
with self._state_lock:
|
|
self._active = False
|
|
if not reader_task.done():
|
|
reader_task.cancel()
|
|
try:
|
|
await reader_task
|
|
except (asyncio.CancelledError, Exception):
|
|
pass
|
|
for handle in list(self._dialog_watchdogs.values()):
|
|
handle.cancel()
|
|
self._dialog_watchdogs.clear()
|
|
ws = self._ws
|
|
self._ws = None
|
|
if ws is not None:
|
|
try:
|
|
await ws.close()
|
|
except Exception:
|
|
pass
|
|
|
|
if self._stop_requested:
|
|
return
|
|
|
|
# Reconnect: brief backoff, then reattach.
|
|
logger.debug(
|
|
"CDP supervisor %s: reconnecting in %.1fs...", self.task_id, backoff,
|
|
)
|
|
await asyncio.sleep(backoff)
|
|
backoff = min(backoff * 2, 10.0)
|
|
|
|
async def _attach_initial_page(self) -> None:
|
|
"""Find a page target, attach flattened session, enable domains, install dialog bridge."""
|
|
resp = await self._cdp("Target.getTargets")
|
|
targets = resp.get("result", {}).get("targetInfos", [])
|
|
page_target = next((t for t in targets if t.get("type") == "page"), None)
|
|
if page_target is None:
|
|
created = await self._cdp("Target.createTarget", {"url": "about:blank"})
|
|
target_id = created["result"]["targetId"]
|
|
else:
|
|
target_id = page_target["targetId"]
|
|
|
|
attach = await self._cdp(
|
|
"Target.attachToTarget",
|
|
{"targetId": target_id, "flatten": True},
|
|
)
|
|
self._page_session_id = attach["result"]["sessionId"]
|
|
await self._cdp("Page.enable", session_id=self._page_session_id)
|
|
await self._cdp("Runtime.enable", session_id=self._page_session_id)
|
|
await self._cdp(
|
|
"Target.setAutoAttach",
|
|
{"autoAttach": True, "waitForDebuggerOnStart": False, "flatten": True},
|
|
session_id=self._page_session_id,
|
|
)
|
|
# Install the dialog bridge — overrides native alert/confirm/prompt with
|
|
# a synchronous XHR we intercept via Fetch domain. This is how we make
|
|
# dialog response work on Browserbase (whose CDP proxy auto-dismisses
|
|
# real native dialogs before we can call handleJavaScriptDialog).
|
|
await self._install_dialog_bridge(self._page_session_id)
|
|
|
|
async def _install_dialog_bridge(self, session_id: str) -> None:
|
|
"""Install the dialog-bridge init script + Fetch interceptor on a session.
|
|
|
|
Two CDP calls:
|
|
1. ``Page.addScriptToEvaluateOnNewDocument`` — the JS override runs
|
|
in every frame before any page script. Replaces alert/confirm/
|
|
prompt with a sync XHR to our bridge URL.
|
|
2. ``Fetch.enable`` scoped to the bridge URL — we catch those XHRs,
|
|
surface them as pending dialogs, then fulfill once the agent
|
|
responds.
|
|
|
|
Idempotent at the CDP level: Chromium de-duplicates identical
|
|
add-script calls by source, and Fetch.enable replaces prior patterns.
|
|
"""
|
|
try:
|
|
await self._cdp(
|
|
"Page.addScriptToEvaluateOnNewDocument",
|
|
{"source": _DIALOG_BRIDGE_SCRIPT, "runImmediately": True},
|
|
session_id=session_id,
|
|
timeout=5.0,
|
|
)
|
|
except Exception as e:
|
|
logger.debug(
|
|
"dialog bridge: addScriptToEvaluateOnNewDocument failed on sid=%s: %s",
|
|
(session_id or "")[:16], e,
|
|
)
|
|
try:
|
|
await self._cdp(
|
|
"Fetch.enable",
|
|
{
|
|
"patterns": [
|
|
{
|
|
"urlPattern": DIALOG_BRIDGE_URL_PATTERN,
|
|
"requestStage": "Request",
|
|
}
|
|
],
|
|
"handleAuthRequests": False,
|
|
},
|
|
session_id=session_id,
|
|
timeout=5.0,
|
|
)
|
|
except Exception as e:
|
|
logger.debug(
|
|
"dialog bridge: Fetch.enable failed on sid=%s: %s",
|
|
(session_id or "")[:16], e,
|
|
)
|
|
# Also try to inject into the already-loaded document so existing
|
|
# pages pick up the override on reconnect. Best-effort.
|
|
try:
|
|
await self._cdp(
|
|
"Runtime.evaluate",
|
|
{"expression": _DIALOG_BRIDGE_SCRIPT, "returnByValue": True},
|
|
session_id=session_id,
|
|
timeout=3.0,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
async def _cdp(
|
|
self,
|
|
method: str,
|
|
params: Optional[Dict[str, Any]] = None,
|
|
*,
|
|
session_id: Optional[str] = None,
|
|
timeout: float = 10.0,
|
|
) -> Dict[str, Any]:
|
|
"""Send a CDP command and await its response."""
|
|
if self._ws is None:
|
|
raise RuntimeError("supervisor WebSocket is not connected")
|
|
call_id = self._next_call_id
|
|
self._next_call_id += 1
|
|
payload: Dict[str, Any] = {"id": call_id, "method": method}
|
|
if params:
|
|
payload["params"] = params
|
|
if session_id:
|
|
payload["sessionId"] = session_id
|
|
fut: asyncio.Future = asyncio.get_running_loop().create_future()
|
|
self._pending_calls[call_id] = fut
|
|
await self._ws.send(json.dumps(payload))
|
|
try:
|
|
return await asyncio.wait_for(fut, timeout=timeout)
|
|
finally:
|
|
self._pending_calls.pop(call_id, None)
|
|
|
|
async def _read_loop(self) -> None:
|
|
"""Continuously dispatch incoming CDP frames."""
|
|
assert self._ws is not None
|
|
try:
|
|
async for raw in self._ws:
|
|
if self._stop_requested:
|
|
break
|
|
try:
|
|
msg = json.loads(raw)
|
|
except Exception:
|
|
logger.debug("CDP supervisor: non-JSON frame dropped")
|
|
continue
|
|
if "id" in msg:
|
|
fut = self._pending_calls.pop(msg["id"], None)
|
|
if fut is not None and not fut.done():
|
|
if "error" in msg:
|
|
fut.set_exception(
|
|
RuntimeError(f"CDP error on id={msg['id']}: {msg['error']}")
|
|
)
|
|
else:
|
|
fut.set_result(msg)
|
|
elif "method" in msg:
|
|
await self._on_event(msg["method"], msg.get("params", {}), msg.get("sessionId"))
|
|
except Exception as e:
|
|
logger.debug("CDP read loop exited: %s", e)
|
|
|
|
# ── Event dispatch ──────────────────────────────────────────────────────
|
|
|
|
async def _on_event(
|
|
self, method: str, params: Dict[str, Any], session_id: Optional[str]
|
|
) -> None:
|
|
if method == "Page.javascriptDialogOpening":
|
|
await self._on_dialog_opening(params, session_id)
|
|
elif method == "Page.javascriptDialogClosed":
|
|
await self._on_dialog_closed(params, session_id)
|
|
elif method == "Fetch.requestPaused":
|
|
await self._on_fetch_paused(params, session_id)
|
|
elif method == "Page.frameAttached":
|
|
self._on_frame_attached(params, session_id)
|
|
elif method == "Page.frameNavigated":
|
|
self._on_frame_navigated(params, session_id)
|
|
elif method == "Page.frameDetached":
|
|
self._on_frame_detached(params, session_id)
|
|
elif method == "Target.attachedToTarget":
|
|
await self._on_target_attached(params)
|
|
elif method == "Target.detachedFromTarget":
|
|
self._on_target_detached(params)
|
|
elif method == "Runtime.consoleAPICalled":
|
|
self._on_console(params, level_from="api")
|
|
elif method == "Runtime.exceptionThrown":
|
|
self._on_console(params, level_from="exception")
|
|
|
|
async def _on_dialog_opening(
|
|
self, params: Dict[str, Any], session_id: Optional[str]
|
|
) -> None:
|
|
self._dialog_seq += 1
|
|
dialog = PendingDialog(
|
|
id=f"d-{self._dialog_seq}",
|
|
type=str(params.get("type") or ""),
|
|
message=str(params.get("message") or ""),
|
|
default_prompt=str(params.get("defaultPrompt") or ""),
|
|
opened_at=time.time(),
|
|
cdp_session_id=session_id or self._page_session_id or "",
|
|
frame_id=params.get("frameId"),
|
|
)
|
|
|
|
if self.dialog_policy == DIALOG_POLICY_AUTO_DISMISS:
|
|
# Archive immediately with the policy tag so the ``closed`` event
|
|
# arriving right after our handleJavaScriptDialog call doesn't
|
|
# re-archive it as "remote".
|
|
with self._state_lock:
|
|
self._archive_dialog_locked(dialog, "auto_policy")
|
|
asyncio.create_task(
|
|
self._auto_handle_dialog(dialog, accept=False, prompt_text="")
|
|
)
|
|
elif self.dialog_policy == DIALOG_POLICY_AUTO_ACCEPT:
|
|
with self._state_lock:
|
|
self._archive_dialog_locked(dialog, "auto_policy")
|
|
asyncio.create_task(
|
|
self._auto_handle_dialog(
|
|
dialog, accept=True, prompt_text=dialog.default_prompt
|
|
)
|
|
)
|
|
else:
|
|
# must_respond → add to pending and arm watchdog.
|
|
with self._state_lock:
|
|
self._pending_dialogs[dialog.id] = dialog
|
|
loop = asyncio.get_running_loop()
|
|
handle = loop.call_later(
|
|
self.dialog_timeout_s,
|
|
lambda: asyncio.create_task(self._dialog_timeout_expired(dialog.id)),
|
|
)
|
|
self._dialog_watchdogs[dialog.id] = handle
|
|
|
|
async def _auto_handle_dialog(
|
|
self, dialog: PendingDialog, *, accept: bool, prompt_text: str
|
|
) -> None:
|
|
"""Send handleJavaScriptDialog for auto_dismiss/auto_accept.
|
|
|
|
Dialog has already been archived by the caller (``_on_dialog_opening``);
|
|
this just fires the CDP call so the page unblocks.
|
|
"""
|
|
params: Dict[str, Any] = {"accept": accept}
|
|
if dialog.type == "prompt":
|
|
params["promptText"] = prompt_text
|
|
try:
|
|
await self._cdp(
|
|
"Page.handleJavaScriptDialog",
|
|
params,
|
|
session_id=dialog.cdp_session_id or None,
|
|
timeout=5.0,
|
|
)
|
|
except Exception as e:
|
|
logger.debug("auto-handle CDP call failed for %s: %s", dialog.id, e)
|
|
|
|
async def _dialog_timeout_expired(self, dialog_id: str) -> None:
|
|
with self._state_lock:
|
|
dialog = self._pending_dialogs.get(dialog_id)
|
|
if dialog is None:
|
|
return
|
|
logger.warning(
|
|
"CDP supervisor %s: dialog %s (%s) auto-dismissed after %ss timeout",
|
|
self.task_id,
|
|
dialog_id,
|
|
dialog.type,
|
|
self.dialog_timeout_s,
|
|
)
|
|
try:
|
|
# Archive with watchdog tag BEFORE fulfilling / dismissing.
|
|
with self._state_lock:
|
|
if dialog_id in self._pending_dialogs:
|
|
self._pending_dialogs.pop(dialog_id, None)
|
|
self._archive_dialog_locked(dialog, "watchdog")
|
|
# Unblock the page — via bridge Fetch fulfill for bridge dialogs,
|
|
# else native Page.handleJavaScriptDialog for real dialogs.
|
|
if dialog.bridge_request_id:
|
|
await self._fulfill_bridge_request(dialog, accept=False, prompt_text="")
|
|
else:
|
|
await self._cdp(
|
|
"Page.handleJavaScriptDialog",
|
|
{"accept": False},
|
|
session_id=dialog.cdp_session_id or None,
|
|
timeout=5.0,
|
|
)
|
|
except Exception as e:
|
|
logger.debug("auto-dismiss failed for %s: %s", dialog_id, e)
|
|
|
|
def _archive_dialog_locked(self, dialog: PendingDialog, closed_by: str) -> None:
|
|
"""Move a pending dialog to the recent_dialogs ring buffer. Must hold state_lock."""
|
|
record = DialogRecord(
|
|
id=dialog.id,
|
|
type=dialog.type,
|
|
message=dialog.message,
|
|
opened_at=dialog.opened_at,
|
|
closed_at=time.time(),
|
|
closed_by=closed_by,
|
|
frame_id=dialog.frame_id,
|
|
)
|
|
self._recent_dialogs.append(record)
|
|
if len(self._recent_dialogs) > RECENT_DIALOGS_MAX * 2:
|
|
self._recent_dialogs = self._recent_dialogs[-RECENT_DIALOGS_MAX:]
|
|
|
|
async def _handle_dialog_cdp(
|
|
self, dialog: PendingDialog, *, accept: bool, prompt_text: str
|
|
) -> None:
|
|
"""Send the Page.handleJavaScriptDialog CDP command (agent path only).
|
|
|
|
Routes to the bridge-fulfill path when the dialog was captured via
|
|
the injected XHR override (see ``_on_fetch_paused``).
|
|
"""
|
|
if dialog.bridge_request_id:
|
|
try:
|
|
await self._fulfill_bridge_request(
|
|
dialog, accept=accept, prompt_text=prompt_text
|
|
)
|
|
finally:
|
|
with self._state_lock:
|
|
if dialog.id in self._pending_dialogs:
|
|
self._pending_dialogs.pop(dialog.id, None)
|
|
self._archive_dialog_locked(dialog, "agent")
|
|
handle = self._dialog_watchdogs.pop(dialog.id, None)
|
|
if handle is not None:
|
|
handle.cancel()
|
|
return
|
|
|
|
params: Dict[str, Any] = {"accept": accept}
|
|
if dialog.type == "prompt":
|
|
params["promptText"] = prompt_text
|
|
try:
|
|
await self._cdp(
|
|
"Page.handleJavaScriptDialog",
|
|
params,
|
|
session_id=dialog.cdp_session_id or None,
|
|
timeout=5.0,
|
|
)
|
|
finally:
|
|
# Clear regardless — the CDP error path usually means the dialog
|
|
# already closed (browser auto-dismissed after navigation, etc.).
|
|
with self._state_lock:
|
|
if dialog.id in self._pending_dialogs:
|
|
self._pending_dialogs.pop(dialog.id, None)
|
|
self._archive_dialog_locked(dialog, "agent")
|
|
handle = self._dialog_watchdogs.pop(dialog.id, None)
|
|
if handle is not None:
|
|
handle.cancel()
|
|
|
|
async def _on_dialog_closed(
|
|
self, params: Dict[str, Any], session_id: Optional[str]
|
|
) -> None:
|
|
# ``Page.javascriptDialogClosed`` spec has only ``result`` (bool) and
|
|
# ``userInput`` (string), not the original ``message``. Match by
|
|
# session id and clear the oldest dialog on that session — if Chrome
|
|
# closed one on us (e.g. our disconnect auto-dismissed it, or the
|
|
# browser navigated, or Browserbase's CDP proxy auto-dismissed), there
|
|
# shouldn't be more than one in flight per session anyway because the
|
|
# JS thread is blocked while a dialog is up.
|
|
with self._state_lock:
|
|
candidate_ids = [
|
|
d.id
|
|
for d in self._pending_dialogs.values()
|
|
if d.cdp_session_id == session_id
|
|
# Bridge-captured dialogs aren't cleared by native close events;
|
|
# they're resolved via Fetch.fulfillRequest instead. Only the
|
|
# real-native-dialog path uses Page.javascriptDialogClosed.
|
|
and d.bridge_request_id is None
|
|
]
|
|
if candidate_ids:
|
|
did = candidate_ids[0]
|
|
dialog = self._pending_dialogs.pop(did, None)
|
|
if dialog is not None:
|
|
self._archive_dialog_locked(dialog, "remote")
|
|
handle = self._dialog_watchdogs.pop(did, None)
|
|
if handle is not None:
|
|
handle.cancel()
|
|
|
|
async def _on_fetch_paused(
|
|
self, params: Dict[str, Any], session_id: Optional[str]
|
|
) -> None:
|
|
"""Bridge XHR captured mid-flight — materialize as a pending dialog.
|
|
|
|
The injected script (``_DIALOG_BRIDGE_SCRIPT``) fires a synchronous
|
|
XHR to ``DIALOG_BRIDGE_HOST`` whenever page code calls alert/confirm/
|
|
prompt. We catch it via Fetch.enable pattern; the page's JS thread
|
|
is blocked on the XHR's response until we call Fetch.fulfillRequest
|
|
(which happens from ``respond_to_dialog``) or until the watchdog
|
|
fires (at which point we fulfill with a cancel response).
|
|
"""
|
|
url = str(params.get("request", {}).get("url") or "")
|
|
request_id = params.get("requestId")
|
|
if not request_id:
|
|
return
|
|
# Only care about our bridge URLs. Fetch can still deliver other
|
|
# intercepted requests if patterns were ever broadened.
|
|
if DIALOG_BRIDGE_HOST not in url:
|
|
# Not ours — forward unchanged so the page sees its own request.
|
|
try:
|
|
await self._cdp(
|
|
"Fetch.continueRequest", {"requestId": request_id},
|
|
session_id=session_id, timeout=3.0,
|
|
)
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# Parse query string for dialog metadata. Use urllib to be robust.
|
|
from urllib.parse import urlparse, parse_qs
|
|
q = parse_qs(urlparse(url).query)
|
|
|
|
def _q(name: str) -> str:
|
|
v = q.get(name, [""])
|
|
return v[0] if v else ""
|
|
|
|
kind = _q("kind") or "alert"
|
|
message = _q("message")
|
|
default_prompt = _q("default_prompt")
|
|
|
|
self._dialog_seq += 1
|
|
dialog = PendingDialog(
|
|
id=f"d-{self._dialog_seq}",
|
|
type=kind,
|
|
message=message,
|
|
default_prompt=default_prompt,
|
|
opened_at=time.time(),
|
|
cdp_session_id=session_id or self._page_session_id or "",
|
|
frame_id=params.get("frameId"),
|
|
bridge_request_id=str(request_id),
|
|
)
|
|
|
|
# Apply policy exactly as for native dialogs.
|
|
if self.dialog_policy == DIALOG_POLICY_AUTO_DISMISS:
|
|
with self._state_lock:
|
|
self._archive_dialog_locked(dialog, "auto_policy")
|
|
asyncio.create_task(
|
|
self._fulfill_bridge_request(dialog, accept=False, prompt_text="")
|
|
)
|
|
elif self.dialog_policy == DIALOG_POLICY_AUTO_ACCEPT:
|
|
with self._state_lock:
|
|
self._archive_dialog_locked(dialog, "auto_policy")
|
|
asyncio.create_task(
|
|
self._fulfill_bridge_request(
|
|
dialog, accept=True, prompt_text=default_prompt
|
|
)
|
|
)
|
|
else:
|
|
# must_respond — add to pending + arm watchdog.
|
|
with self._state_lock:
|
|
self._pending_dialogs[dialog.id] = dialog
|
|
loop = asyncio.get_running_loop()
|
|
handle = loop.call_later(
|
|
self.dialog_timeout_s,
|
|
lambda: asyncio.create_task(self._dialog_timeout_expired(dialog.id)),
|
|
)
|
|
self._dialog_watchdogs[dialog.id] = handle
|
|
|
|
async def _fulfill_bridge_request(
|
|
self, dialog: PendingDialog, *, accept: bool, prompt_text: str
|
|
) -> None:
|
|
"""Resolve a bridge XHR via Fetch.fulfillRequest so the page unblocks."""
|
|
if not dialog.bridge_request_id:
|
|
return
|
|
payload = {
|
|
"accept": bool(accept),
|
|
"prompt_text": prompt_text if dialog.type == "prompt" else "",
|
|
"dialog_id": dialog.id,
|
|
}
|
|
body = json.dumps(payload).encode()
|
|
try:
|
|
import base64 as _b64
|
|
await self._cdp(
|
|
"Fetch.fulfillRequest",
|
|
{
|
|
"requestId": dialog.bridge_request_id,
|
|
"responseCode": 200,
|
|
"responseHeaders": [
|
|
{"name": "Content-Type", "value": "application/json"},
|
|
{"name": "Access-Control-Allow-Origin", "value": "*"},
|
|
],
|
|
"body": _b64.b64encode(body).decode(),
|
|
},
|
|
session_id=dialog.cdp_session_id or None,
|
|
timeout=5.0,
|
|
)
|
|
except Exception as e:
|
|
logger.debug("bridge fulfill failed for %s: %s", dialog.id, e)
|
|
|
|
# ── Frame / target tracking ─────────────────────────────────────────────
|
|
|
|
def _on_frame_attached(
|
|
self, params: Dict[str, Any], session_id: Optional[str]
|
|
) -> None:
|
|
frame_id = params.get("frameId")
|
|
if not frame_id:
|
|
return
|
|
with self._state_lock:
|
|
self._frames[frame_id] = FrameInfo(
|
|
frame_id=frame_id,
|
|
url="",
|
|
origin="",
|
|
parent_frame_id=params.get("parentFrameId"),
|
|
is_oopif=False,
|
|
cdp_session_id=session_id,
|
|
)
|
|
|
|
def _on_frame_navigated(
|
|
self, params: Dict[str, Any], session_id: Optional[str]
|
|
) -> None:
|
|
frame = params.get("frame") or {}
|
|
frame_id = frame.get("id")
|
|
if not frame_id:
|
|
return
|
|
with self._state_lock:
|
|
existing = self._frames.get(frame_id)
|
|
info = FrameInfo(
|
|
frame_id=frame_id,
|
|
url=str(frame.get("url") or ""),
|
|
origin=str(frame.get("securityOrigin") or frame.get("origin") or ""),
|
|
parent_frame_id=frame.get("parentId") or (existing.parent_frame_id if existing else None),
|
|
is_oopif=bool(existing.is_oopif if existing else False),
|
|
cdp_session_id=existing.cdp_session_id if existing else session_id,
|
|
name=str(frame.get("name") or (existing.name if existing else "")),
|
|
)
|
|
self._frames[frame_id] = info
|
|
|
|
def _on_frame_detached(
|
|
self, params: Dict[str, Any], session_id: Optional[str]
|
|
) -> None:
|
|
"""Remove a frame from our state only when it's truly gone.
|
|
|
|
CDP emits ``Page.frameDetached`` with a ``reason`` of either
|
|
``"remove"`` (the frame is actually gone from the DOM) or ``"swap"``
|
|
(the frame is migrating to a new process — typical when a
|
|
same-process iframe becomes an OOPIF, or when history navigates).
|
|
Dropping on ``swap`` would hide OOPIFs from the agent the moment
|
|
Chromium promotes them to their own process, so treat swap as a
|
|
no-op.
|
|
|
|
Even with ``reason=remove``, the parent page's perspective is
|
|
"the child frame left MY process tree" — which is what happens
|
|
when a same-origin iframe gets promoted to an OOPIF. If we
|
|
already have a live child CDP session attached for that frame_id,
|
|
the frame is still very much alive; only drop it when we have
|
|
no session record.
|
|
"""
|
|
frame_id = params.get("frameId")
|
|
if not frame_id:
|
|
return
|
|
reason = str(params.get("reason") or "remove").lower()
|
|
if reason == "swap":
|
|
return
|
|
with self._state_lock:
|
|
existing = self._frames.get(frame_id)
|
|
# Keep OOPIF records even when the parent says the frame was
|
|
# "removed" — the iframe is still visible, just in a different
|
|
# process. If the frame truly goes away later, Target.detached
|
|
# + the next Page.frameDetached without a live session will
|
|
# clear it.
|
|
if existing and existing.is_oopif and existing.cdp_session_id:
|
|
return
|
|
self._frames.pop(frame_id, None)
|
|
|
|
async def _on_target_attached(self, params: Dict[str, Any]) -> None:
|
|
info = params.get("targetInfo") or {}
|
|
sid = params.get("sessionId")
|
|
target_type = info.get("type")
|
|
if not sid or target_type not in ("iframe", "worker"):
|
|
return
|
|
self._child_sessions[sid] = {"info": info, "type": target_type}
|
|
|
|
# Record the frame with its OOPIF session id for interaction routing.
|
|
if target_type == "iframe":
|
|
target_id = info.get("targetId")
|
|
with self._state_lock:
|
|
existing = self._frames.get(target_id)
|
|
self._frames[target_id] = FrameInfo(
|
|
frame_id=target_id,
|
|
url=str(info.get("url") or ""),
|
|
origin="", # filled by frameNavigated on the child session
|
|
parent_frame_id=(existing.parent_frame_id if existing else None),
|
|
is_oopif=True,
|
|
cdp_session_id=sid,
|
|
name=str(info.get("title") or (existing.name if existing else "")),
|
|
)
|
|
|
|
# Enable domains on the child off-loop so the reader keeps pumping.
|
|
# Awaiting the CDP replies here would deadlock because only the
|
|
# reader can resolve those replies' Futures.
|
|
asyncio.create_task(self._enable_child_domains(sid))
|
|
|
|
async def _enable_child_domains(self, sid: str) -> None:
|
|
"""Enable Page+Runtime (+nested setAutoAttach) on a child CDP session.
|
|
|
|
Also installs the dialog bridge so iframe-scoped alert/confirm/prompt
|
|
calls round-trip through Fetch too.
|
|
"""
|
|
try:
|
|
await self._cdp("Page.enable", session_id=sid, timeout=3.0)
|
|
await self._cdp("Runtime.enable", session_id=sid, timeout=3.0)
|
|
await self._cdp(
|
|
"Target.setAutoAttach",
|
|
{"autoAttach": True, "waitForDebuggerOnStart": False, "flatten": True},
|
|
session_id=sid,
|
|
timeout=3.0,
|
|
)
|
|
except Exception as e:
|
|
logger.debug("child session %s setup failed: %s", sid[:16], e)
|
|
# Install the dialog bridge on the child so iframe dialogs are captured.
|
|
await self._install_dialog_bridge(sid)
|
|
|
|
def _on_target_detached(self, params: Dict[str, Any]) -> None:
|
|
"""Handle a child CDP session detaching.
|
|
|
|
We deliberately DO NOT drop frames from ``_frames`` here — Browserbase
|
|
fires transient detach events during page transitions even while the
|
|
iframe is still visible to the user, and dropping the record hides
|
|
OOPIFs from the agent between the detach and the next
|
|
``Target.attachedToTarget``. Instead, we just clear the session
|
|
binding so stale ``cdp_session_id`` values aren't used for routing.
|
|
If the iframe truly goes away, ``Page.frameDetached`` will clean up.
|
|
"""
|
|
sid = params.get("sessionId")
|
|
if not sid:
|
|
return
|
|
self._child_sessions.pop(sid, None)
|
|
with self._state_lock:
|
|
for fid, frame in list(self._frames.items()):
|
|
if frame.cdp_session_id == sid:
|
|
# Replace with a copy that has cdp_session_id cleared so
|
|
# routing falls back to top-level page session if retried.
|
|
self._frames[fid] = FrameInfo(
|
|
frame_id=frame.frame_id,
|
|
url=frame.url,
|
|
origin=frame.origin,
|
|
parent_frame_id=frame.parent_frame_id,
|
|
is_oopif=frame.is_oopif,
|
|
cdp_session_id=None,
|
|
name=frame.name,
|
|
)
|
|
|
|
# ── Console / exception ring buffer ─────────────────────────────────────
|
|
|
|
def _on_console(self, params: Dict[str, Any], *, level_from: str) -> None:
|
|
if level_from == "exception":
|
|
details = params.get("exceptionDetails") or {}
|
|
text = str(details.get("text") or "")
|
|
url = details.get("url")
|
|
event = ConsoleEvent(ts=time.time(), level="exception", text=text, url=url)
|
|
else:
|
|
raw_level = str(params.get("type") or "log")
|
|
level = "error" if raw_level in ("error", "assert") else (
|
|
"warning" if raw_level == "warning" else "log"
|
|
)
|
|
args = params.get("args") or []
|
|
parts: List[str] = []
|
|
for a in args[:4]:
|
|
if isinstance(a, dict):
|
|
parts.append(str(a.get("value") or a.get("description") or ""))
|
|
event = ConsoleEvent(ts=time.time(), level=level, text=" ".join(parts))
|
|
with self._state_lock:
|
|
self._console_events.append(event)
|
|
if len(self._console_events) > CONSOLE_HISTORY_MAX * 2:
|
|
# Keep last CONSOLE_HISTORY_MAX; allow 2x slack to reduce churn.
|
|
self._console_events = self._console_events[-CONSOLE_HISTORY_MAX:]
|
|
|
|
# ── Frame tree building (bounded) ───────────────────────────────────────
|
|
|
|
def _build_frame_tree_locked(self) -> Dict[str, Any]:
|
|
"""Build the capped frame_tree payload. Must be called under state lock."""
|
|
frames = self._frames
|
|
if not frames:
|
|
return {"top": None, "children": [], "truncated": False}
|
|
|
|
# Identify a top frame — one with no parent, preferring oopif=False.
|
|
tops = [f for f in frames.values() if not f.parent_frame_id]
|
|
top = next((f for f in tops if not f.is_oopif), tops[0] if tops else None)
|
|
|
|
# BFS from top, capped by FRAME_TREE_MAX_ENTRIES and
|
|
# FRAME_TREE_MAX_OOPIF_DEPTH for OOPIF branches.
|
|
children: List[Dict[str, Any]] = []
|
|
truncated = False
|
|
if top is None:
|
|
return {"top": None, "children": [], "truncated": False}
|
|
|
|
queue: List[Tuple[FrameInfo, int]] = [
|
|
(f, 1) for f in frames.values() if f.parent_frame_id == top.frame_id
|
|
]
|
|
visited: set[str] = {top.frame_id}
|
|
while queue and len(children) < FRAME_TREE_MAX_ENTRIES:
|
|
frame, depth = queue.pop(0)
|
|
if frame.frame_id in visited:
|
|
continue
|
|
visited.add(frame.frame_id)
|
|
if frame.is_oopif and depth > FRAME_TREE_MAX_OOPIF_DEPTH:
|
|
truncated = True
|
|
continue
|
|
children.append(frame.to_dict())
|
|
for f in frames.values():
|
|
if f.parent_frame_id == frame.frame_id and f.frame_id not in visited:
|
|
queue.append((f, depth + 1))
|
|
if queue:
|
|
truncated = True
|
|
|
|
return {
|
|
"top": top.to_dict(),
|
|
"children": children,
|
|
"truncated": truncated,
|
|
}
|
|
|
|
|
|
# ── Registry ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class _SupervisorRegistry:
|
|
"""Process-global (task_id → supervisor) map with idempotent start/stop.
|
|
|
|
One instance, exposed as ``SUPERVISOR_REGISTRY``. Safe to call from any
|
|
thread — mutations go through ``_lock``.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._lock = threading.Lock()
|
|
self._by_task: Dict[str, CDPSupervisor] = {}
|
|
|
|
def get(self, task_id: str) -> Optional[CDPSupervisor]:
|
|
"""Return the supervisor for ``task_id`` if running, else ``None``."""
|
|
with self._lock:
|
|
return self._by_task.get(task_id)
|
|
|
|
def get_or_start(
|
|
self,
|
|
task_id: str,
|
|
cdp_url: str,
|
|
*,
|
|
dialog_policy: str = DEFAULT_DIALOG_POLICY,
|
|
dialog_timeout_s: float = DEFAULT_DIALOG_TIMEOUT_S,
|
|
start_timeout: float = 15.0,
|
|
) -> CDPSupervisor:
|
|
"""Idempotently ensure a supervisor is running for ``(task_id, cdp_url)``.
|
|
|
|
If a supervisor exists for this task but was bound to a different
|
|
``cdp_url``, the old one is stopped and a fresh one is started.
|
|
"""
|
|
with self._lock:
|
|
existing = self._by_task.get(task_id)
|
|
if existing is not None:
|
|
if existing.cdp_url == cdp_url:
|
|
return existing
|
|
# URL changed — tear down old, fall through to re-create.
|
|
self._by_task.pop(task_id, None)
|
|
if existing is not None:
|
|
existing.stop()
|
|
|
|
supervisor = CDPSupervisor(
|
|
task_id=task_id,
|
|
cdp_url=cdp_url,
|
|
dialog_policy=dialog_policy,
|
|
dialog_timeout_s=dialog_timeout_s,
|
|
)
|
|
supervisor.start(timeout=start_timeout)
|
|
with self._lock:
|
|
# Guard against a concurrent get_or_start from another thread.
|
|
already = self._by_task.get(task_id)
|
|
if already is not None and already.cdp_url == cdp_url:
|
|
supervisor.stop()
|
|
return already
|
|
self._by_task[task_id] = supervisor
|
|
return supervisor
|
|
|
|
def stop(self, task_id: str) -> None:
|
|
"""Stop and discard the supervisor for ``task_id`` if it exists."""
|
|
with self._lock:
|
|
supervisor = self._by_task.pop(task_id, None)
|
|
if supervisor is not None:
|
|
supervisor.stop()
|
|
|
|
def stop_all(self) -> None:
|
|
"""Stop every running supervisor. For shutdown / test teardown."""
|
|
with self._lock:
|
|
items = list(self._by_task.items())
|
|
self._by_task.clear()
|
|
for _, supervisor in items:
|
|
supervisor.stop()
|
|
|
|
|
|
SUPERVISOR_REGISTRY = _SupervisorRegistry()
|
|
|
|
|
|
__all__ = [
|
|
"CDPSupervisor",
|
|
"ConsoleEvent",
|
|
"DEFAULT_DIALOG_POLICY",
|
|
"DEFAULT_DIALOG_TIMEOUT_S",
|
|
"DIALOG_POLICY_AUTO_ACCEPT",
|
|
"DIALOG_POLICY_AUTO_DISMISS",
|
|
"DIALOG_POLICY_MUST_RESPOND",
|
|
"DialogRecord",
|
|
"FrameInfo",
|
|
"PendingDialog",
|
|
"SUPERVISOR_REGISTRY",
|
|
"SupervisorSnapshot",
|
|
"_SupervisorRegistry",
|
|
]
|