"""Cua-driver backend (macOS only). Speaks MCP over stdio to `cua-driver`. The Python `mcp` SDK is async, so we run a dedicated asyncio event loop on a background thread and marshal sync calls through it. Install: `/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/trycua/cua/main/libs/cua-driver/scripts/install.sh)"` After install, `cua-driver` is on $PATH and supports `cua-driver mcp` (stdio transport) which is what we invoke. The private SkyLight SPIs cua-driver uses (SLEventPostToPid, SLPSPostEvent- RecordTo, _AXObserverAddNotificationAndCheckRemote) are not Apple-public and can break on OS updates. Pin the installed version via `HERMES_CUA_DRIVER_ VERSION` if you want reproducibility across an OS bump. """ from __future__ import annotations import asyncio import base64 import json import logging import os import platform import re import shutil import subprocess import sys import threading from concurrent.futures import Future from typing import Any, Dict, List, Optional, Tuple from tools.computer_use.backend import ( ActionResult, CaptureResult, ComputerUseBackend, UIElement, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Version pinning # --------------------------------------------------------------------------- PINNED_CUA_DRIVER_VERSION = os.environ.get("HERMES_CUA_DRIVER_VERSION", "0.5.0") _CUA_DRIVER_CMD = os.environ.get("HERMES_CUA_DRIVER_CMD", "cua-driver") _CUA_DRIVER_ARGS = ["mcp"] # stdio MCP transport # Regex to parse list_windows text output lines: # "- AppName (pid 12345) "Title" [window_id: 67890]" _WINDOW_LINE_RE = re.compile( r'^-\s+(.+?)\s+\(pid\s+(\d+)\)\s+.*\[window_id:\s+(\d+)\]', re.MULTILINE, ) # Regex to parse element lines from get_window_state AX tree markdown: # " - [N] AXRole "label"" _ELEMENT_LINE_RE = re.compile( r'^\s*-\s+\[(\d+)\]\s+(\w+)(?:\s+"([^"]*)")?', re.MULTILINE, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _is_macos() -> bool: return sys.platform == "darwin" def _is_arm_mac() -> bool: return _is_macos() and platform.machine() == "arm64" def cua_driver_binary_available() -> bool: """True if `cua-driver` is on $PATH or HERMES_CUA_DRIVER_CMD resolves.""" return bool(shutil.which(_CUA_DRIVER_CMD)) def cua_driver_install_hint() -> str: return ( "cua-driver is not installed. Install with:\n" ' /bin/bash -c "$(curl -fsSL ' 'https://raw.githubusercontent.com/trycua/cua/main/libs/cua-driver/scripts/install.sh)"\n' "Or run `hermes tools` and enable the Computer Use toolset to install it automatically." ) def _parse_windows_from_text(text: str) -> List[Dict[str, Any]]: """Parse window records from list_windows text output.""" windows = [] for m in _WINDOW_LINE_RE.finditer(text): windows.append({ "app_name": m.group(1).strip(), "pid": int(m.group(2)), "window_id": int(m.group(3)), "off_screen": "[off-screen]" in m.group(0), }) return windows def _parse_elements_from_tree(markdown: str) -> List[UIElement]: """Parse UIElement list from get_window_state AX tree markdown.""" elements = [] for m in _ELEMENT_LINE_RE.finditer(markdown): elements.append(UIElement( index=int(m.group(1)), role=m.group(2), label=m.group(3) or "", bounds=(0, 0, 0, 0), )) return elements def _split_tree_text(full_text: str) -> Tuple[str, str]: """Split get_window_state text into (summary_line, tree_markdown).""" lines = full_text.split("\n", 1) summary = lines[0] tree = lines[1] if len(lines) > 1 else "" return summary, tree def _parse_key_combo(keys: str) -> Tuple[Optional[str], List[str]]: """Parse a key string like 'cmd+s' into (key, modifiers). Returns (key, modifiers) where key is the non-modifier key and modifiers is a list of modifier names (cmd, shift, option, ctrl). """ MODIFIER_NAMES = {"cmd", "command", "shift", "option", "alt", "ctrl", "control", "fn"} KEY_ALIASES = {"command": "cmd", "alt": "option", "control": "ctrl"} parts = [p.strip().lower() for p in re.split(r'[+\-]', keys) if p.strip()] modifiers = [] key = None for part in parts: normalized = KEY_ALIASES.get(part, part) if normalized in MODIFIER_NAMES: modifiers.append(normalized) else: key = part # last non-modifier wins return key, modifiers # --------------------------------------------------------------------------- # Asyncio bridge — one long-lived loop on a background thread # --------------------------------------------------------------------------- class _AsyncBridge: """Runs one asyncio loop on a daemon thread; marshals coroutines from the caller.""" def __init__(self) -> None: self._loop: Optional[asyncio.AbstractEventLoop] = None self._thread: Optional[threading.Thread] = None self._ready = threading.Event() def start(self) -> None: if self._thread and self._thread.is_alive(): return self._ready.clear() def _run() -> None: self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) self._ready.set() try: self._loop.run_forever() finally: try: self._loop.close() except Exception: pass self._thread = threading.Thread(target=_run, daemon=True, name="cua-driver-loop") self._thread.start() if not self._ready.wait(timeout=5.0): raise RuntimeError("cua-driver asyncio bridge failed to start") def run(self, coro, timeout: Optional[float] = 30.0) -> Any: if not self._loop or not self._thread or not self._thread.is_alive(): raise RuntimeError("cua-driver bridge not started") fut: Future = asyncio.run_coroutine_threadsafe(coro, self._loop) return fut.result(timeout=timeout) def stop(self) -> None: if self._loop and self._loop.is_running(): self._loop.call_soon_threadsafe(self._loop.stop) if self._thread: self._thread.join(timeout=2.0) self._thread = None self._loop = None # --------------------------------------------------------------------------- # MCP session (lazy, shared across tool calls) # --------------------------------------------------------------------------- class _CuaDriverSession: """Holds the mcp ClientSession. Spawned lazily; re-entered on drop.""" def __init__(self, bridge: _AsyncBridge) -> None: self._bridge = bridge self._session = None self._exit_stack = None self._lock = threading.Lock() self._started = False def _require_started(self) -> None: if not self._started: raise RuntimeError("cua-driver session not started") async def _aenter(self) -> None: from contextlib import AsyncExitStack from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client if not cua_driver_binary_available(): raise RuntimeError(cua_driver_install_hint()) params = StdioServerParameters( command=_CUA_DRIVER_CMD, args=_CUA_DRIVER_ARGS, env={**os.environ}, ) stack = AsyncExitStack() read, write = await stack.enter_async_context(stdio_client(params)) session = await stack.enter_async_context(ClientSession(read, write)) await session.initialize() self._exit_stack = stack self._session = session async def _aexit(self) -> None: if self._exit_stack is not None: try: await self._exit_stack.aclose() except Exception as e: logger.warning("cua-driver shutdown error: %s", e) self._exit_stack = None self._session = None def start(self) -> None: with self._lock: if self._started: return self._bridge.start() self._bridge.run(self._aenter(), timeout=15.0) self._started = True def stop(self) -> None: with self._lock: if not self._started: return try: self._bridge.run(self._aexit(), timeout=5.0) finally: self._started = False async def _call_tool_async(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]: result = await self._session.call_tool(name, args) return _extract_tool_result(result) def call_tool(self, name: str, args: Dict[str, Any], timeout: float = 30.0) -> Dict[str, Any]: self._require_started() return self._bridge.run(self._call_tool_async(name, args), timeout=timeout) def _extract_tool_result(mcp_result: Any) -> Dict[str, Any]: """Convert an mcp CallToolResult into a plain dict. cua-driver returns a mix of text parts, image parts, and structuredContent. We flatten into: { "data": , "images": [b64, ...], "structuredContent": , "isError": bool, } structuredContent is populated from the MCP result's structuredContent field (MCP spec §2024-11-05+) and takes precedence for structured data like list_windows window arrays. """ data: Any = None images: List[str] = [] is_error = bool(getattr(mcp_result, "isError", False)) structured: Optional[Dict] = getattr(mcp_result, "structuredContent", None) or None text_chunks: List[str] = [] for part in getattr(mcp_result, "content", []) or []: ptype = getattr(part, "type", None) if ptype == "text": text_chunks.append(getattr(part, "text", "") or "") elif ptype == "image": b64 = getattr(part, "data", None) if b64: images.append(b64) if text_chunks: joined = "\n".join(t for t in text_chunks if t) try: data = json.loads(joined) if joined.strip().startswith(("{", "[")) else joined except json.JSONDecodeError: data = joined return {"data": data, "images": images, "structuredContent": structured, "isError": is_error} # --------------------------------------------------------------------------- # The backend itself # --------------------------------------------------------------------------- class CuaDriverBackend(ComputerUseBackend): """Default computer-use backend. macOS-only via cua-driver MCP.""" def __init__(self) -> None: self._bridge = _AsyncBridge() self._session = _CuaDriverSession(self._bridge) # Sticky context — updated by capture(), used by action tools. self._active_pid: Optional[int] = None self._active_window_id: Optional[int] = None # ── Lifecycle ────────────────────────────────────────────────── def start(self) -> None: self._session.start() def stop(self) -> None: try: self._session.stop() finally: self._bridge.stop() def is_available(self) -> bool: if not _is_macos(): return False return cua_driver_binary_available() # ── Capture ──────────────────────────────────────────────────── def capture(self, mode: str = "som", app: Optional[str] = None) -> CaptureResult: """Capture the frontmost on-screen window (optionally filtered by app name). Maps hermes `capture(mode, app)` → cua-driver `list_windows` + `get_window_state` (ax/som) or `screenshot` (vision). """ # Step 1: enumerate on-screen windows to find target pid/window_id. lw_out = self._session.call_tool("list_windows", {"on_screen_only": True}) # Prefer structuredContent.windows (MCP 2024-11-05+); fall back to # text-line parsing for older cua-driver builds. sc = lw_out.get("structuredContent") or {} raw_windows = sc.get("windows") if sc else None if raw_windows: windows = [ { "app_name": w.get("app_name", ""), "pid": int(w["pid"]), "window_id": int(w["window_id"]), "off_screen": not w.get("is_on_screen", True), "title": w.get("title", ""), "z_index": w.get("z_index", 0), } for w in raw_windows ] # Sort by z_index descending (lowest z_index = frontmost on macOS). windows.sort(key=lambda w: w["z_index"]) else: raw_text = lw_out["data"] if isinstance(lw_out["data"], str) else "" windows = _parse_windows_from_text(raw_text) if not windows: return CaptureResult(mode=mode, width=0, height=0, png_b64=None, elements=[], app="", window_title="", png_bytes_len=0) # Filter by app name (case-insensitive substring) if requested. if app: app_lower = app.lower() filtered = [w for w in windows if app_lower in w["app_name"].lower()] if filtered: windows = filtered # Pick first on-screen window (sorted by z_index / z-order above). target = next((w for w in windows if not w["off_screen"]), windows[0]) self._active_pid = target["pid"] self._active_window_id = target["window_id"] app_name = target["app_name"] # Step 2: capture. png_b64: Optional[str] = None elements: List[UIElement] = [] width = height = 0 window_title = "" if mode == "vision": # screenshot tool: just the PNG, no AX walk. sc_out = self._session.call_tool( "screenshot", {"window_id": self._active_window_id, "format": "jpeg", "quality": 85}, ) if sc_out["images"]: png_b64 = sc_out["images"][0] else: # get_window_state: AX tree + optional screenshot. gws_out = self._session.call_tool( "get_window_state", {"pid": self._active_pid, "window_id": self._active_window_id}, ) text = gws_out["data"] if isinstance(gws_out["data"], str) else "" summary, tree = _split_tree_text(text) # Parse element count from summary e.g. "✅ AppName — 42 elements, turn 3..." m = re.search(r'(\d+)\s+elements?', summary) if tree and not gws_out["images"]: # ax mode — no screenshot elements = _parse_elements_from_tree(tree) elif gws_out["images"]: png_b64 = gws_out["images"][0] elements = _parse_elements_from_tree(tree) # Extract window title from the AX tree first AXWindow line. wt = re.search(r'AXWindow\s+"([^"]+)"', tree) if wt: window_title = wt.group(1) png_bytes_len = 0 if png_b64: try: png_bytes_len = len(base64.b64decode(png_b64, validate=False)) except Exception: png_bytes_len = len(png_b64) * 3 // 4 return CaptureResult( mode=mode, width=width, height=height, png_b64=png_b64, elements=elements, app=app_name, window_title=window_title, png_bytes_len=png_bytes_len, ) # ── Pointer ──────────────────────────────────────────────────── def click( self, *, element: Optional[int] = None, x: Optional[int] = None, y: Optional[int] = None, button: str = "left", click_count: int = 1, modifiers: Optional[List[str]] = None, ) -> ActionResult: pid = self._active_pid if pid is None: return ActionResult(ok=False, action="click", message="No active window — call capture() first.") # Choose tool based on button and click_count. if button == "right": tool = "right_click" elif click_count == 2: tool = "double_click" else: tool = "click" args: Dict[str, Any] = {"pid": pid} if element is not None: if self._active_window_id is None: return ActionResult(ok=False, action=tool, message="No active window_id for element_index click.") args["element_index"] = element args["window_id"] = self._active_window_id elif x is not None and y is not None: args["x"] = x args["y"] = y else: return ActionResult(ok=False, action=tool, message="click requires element= or x/y.") if modifiers: args["modifier"] = modifiers return self._action(tool, args) def drag( self, *, from_element: Optional[int] = None, to_element: Optional[int] = None, from_xy: Optional[Tuple[int, int]] = None, to_xy: Optional[Tuple[int, int]] = None, button: str = "left", modifiers: Optional[List[str]] = None, ) -> ActionResult: # cua-driver does not expose a drag tool. return ActionResult(ok=False, action="drag", message="drag is not supported by the cua-driver backend.") def scroll( self, *, direction: str, amount: int = 3, element: Optional[int] = None, x: Optional[int] = None, y: Optional[int] = None, modifiers: Optional[List[str]] = None, ) -> ActionResult: pid = self._active_pid if pid is None: return ActionResult(ok=False, action="scroll", message="No active window — call capture() first.") args: Dict[str, Any] = { "pid": pid, "direction": direction, "amount": max(1, min(50, amount)), } if element is not None and self._active_window_id is not None: args["element_index"] = element args["window_id"] = self._active_window_id elif x is not None and y is not None: args["x"] = x args["y"] = y return self._action("scroll", args) # ── Keyboard ─────────────────────────────────────────────────── def type_text(self, text: str) -> ActionResult: pid = self._active_pid if pid is None: return ActionResult(ok=False, action="type_text", message="No active window — call capture() first.") # Safari WebKit AXTextField does not accept AX attribute writes (type_text), # so use type_text_chars which synthesises individual key events instead. # This works universally across all macOS apps in background mode. return self._action("type_text_chars", {"pid": pid, "text": text}) def key(self, keys: str) -> ActionResult: pid = self._active_pid if pid is None: return ActionResult(ok=False, action="key", message="No active window — call capture() first.") key_name, modifiers = _parse_key_combo(keys) if not key_name: return ActionResult(ok=False, action="key", message=f"Could not parse key from '{keys}'.") if modifiers: # hotkey requires at least one modifier + one key. return self._action("hotkey", {"pid": pid, "keys": modifiers + [key_name]}) else: return self._action("press_key", {"pid": pid, "key": key_name}) # ── Value setter ──────────────────────────────────────────────── def set_value(self, value: str, element: Optional[int] = None) -> ActionResult: """Set a value on an element. Handles AXPopUpButton selects natively.""" pid = self._active_pid window_id = self._active_window_id if pid is None or window_id is None: return ActionResult(ok=False, action="set_value", message="No active window — call capture() first.") if element is None: return ActionResult(ok=False, action="set_value", message="set_value requires element= (element index).") args: Dict[str, Any] = { "pid": pid, "window_id": window_id, "element_index": element, "value": value, } return self._action("set_value", args) # ── Introspection ────────────────────────────────────────────── def list_apps(self) -> List[Dict[str, Any]]: out = self._session.call_tool("list_apps", {}) data = out["data"] if isinstance(data, list): return data if isinstance(data, dict): return data.get("apps", []) # list_apps returns plain text — parse app lines. if isinstance(data, str): apps = [] for line in data.splitlines(): m = re.search(r'(.+?)\s+\(pid\s+(\d+)\)', line) if m: apps.append({"name": m.group(1).strip(), "pid": int(m.group(2))}) return apps return [] def focus_app(self, app: str, raise_window: bool = False) -> ActionResult: """Target an app for subsequent actions without stealing system focus. cua-driver background-automation never needs to bring a window to the front: capture(app=...) already selects the right window via list_windows. We implement focus_app as a pure window-selector — enumerate on-screen windows, find the best match for *app*, and store its pid/window_id so that subsequent click/type calls hit the right process. raise_window=True is intentionally ignored: stealing the user's focus is exactly what this backend is designed to avoid. """ lw_out = self._session.call_tool("list_windows", {"on_screen_only": True}) sc = lw_out.get("structuredContent") or {} raw_windows = sc.get("windows") if sc else None if raw_windows: windows = [ { "app_name": w.get("app_name", ""), "pid": int(w["pid"]), "window_id": int(w["window_id"]), "z_index": w.get("z_index", 0), } for w in raw_windows ] windows.sort(key=lambda w: w["z_index"]) else: raw_text = lw_out["data"] if isinstance(lw_out["data"], str) else "" windows = _parse_windows_from_text(raw_text) app_lower = app.lower() matched = [w for w in windows if app_lower in w["app_name"].lower()] target = matched[0] if matched else (windows[0] if windows else None) if target: self._active_pid = target["pid"] self._active_window_id = target["window_id"] return ActionResult( ok=True, action="focus_app", message=f"Targeted {target['app_name']} (pid {self._active_pid}, " f"window {self._active_window_id}) without raising window.", ) return ActionResult(ok=False, action="focus_app", message=f"No on-screen window found for app '{app}'.") # ── Internal ─────────────────────────────────────────────────── def _action(self, name: str, args: Dict[str, Any]) -> ActionResult: try: out = self._session.call_tool(name, args) except Exception as e: logger.exception("cua-driver %s call failed", name) return ActionResult(ok=False, action=name, message=f"cua-driver error: {e}") ok = not out["isError"] message = "" data = out["data"] if isinstance(data, dict): message = str(data.get("message", "")) elif isinstance(data, str): message = data return ActionResult(ok=ok, action=name, message=message, meta=data if isinstance(data, dict) else {}) def _parse_element(d: Dict[str, Any]) -> UIElement: bounds = d.get("bounds") or (0, 0, 0, 0) if isinstance(bounds, dict): bounds = ( int(bounds.get("x", 0)), int(bounds.get("y", 0)), int(bounds.get("w", bounds.get("width", 0))), int(bounds.get("h", bounds.get("height", 0))), ) elif isinstance(bounds, (list, tuple)) and len(bounds) == 4: bounds = tuple(int(v) for v in bounds) else: bounds = (0, 0, 0, 0) return UIElement( index=int(d.get("index", 0)), role=str(d.get("role", "") or ""), label=str(d.get("label", "") or ""), bounds=bounds, # type: ignore[arg-type] app=str(d.get("app", "") or ""), pid=int(d.get("pid", 0) or 0), window_id=int(d.get("windowId", 0) or 0), attributes={k: v for k, v in d.items() if k not in ("index", "role", "label", "bounds", "app", "pid", "windowId")}, )