revert: computer-use cua-driver (PR #16919) (#16927)

Reverts PR #16919 (commits dad10a78d, 413ee1a28, b4a8031b2, afb958829)
which was merged prematurely. Restoring the pre-merge state so #14817
and #15328 can be revisited as standing PRs.

Reverted commits:
- afb958829 fix(computer-use): harden image-rejection fallback + AUTHOR_MAP
- b4a8031b2 fix(computer-use): unwrap _multimodal tool results
- 413ee1a28 feat(computer-use): background focus-safe backend
- dad10a78d feat(computer-use): cua-driver backend, universal any-model schema

Co-authored-by: teknium1 <teknium@users.noreply.github.com>
This commit is contained in:
Teknium 2026-04-28 01:57:21 -07:00 committed by GitHub
parent cf0852f92e
commit e63364b8df
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 35 additions and 3536 deletions

View file

@ -1,43 +0,0 @@
"""Computer use toolset — universal (any-model) macOS desktop control.
Architecture
------------
This toolset drives macOS apps through cua-driver's background computer-use
primitive (SkyLight private SPIs for focus-without-raise + pid-scoped event
posting). Unlike #4562's pyautogui backend, it does NOT steal the user's
cursor, keyboard focus, or Space the agent and the user can co-work on the
same machine.
Unlike #4562's Anthropic-native `computer_20251124` tool, the schema here is
a plain OpenAI function-calling schema that every tool-capable model can
drive. Vision models get SOM (set-of-mark) captures a screenshot with
numbered overlays on every interactable element plus the AX tree so they
click by element index instead of pixel coordinates. Non-vision models can
drive via the AX tree alone.
Wiring
------
* `tool.py` registers the `computer_use` tool via tools.registry.
* `backend.py` abstract `ComputerUseBackend`; swappable implementation.
* `cua_backend.py` default backend; speaks MCP over stdio to `cua-driver`.
* `schema.py` shared schema + docstring for the generic `computer_use`
tool. Model-agnostic.
* `capture.py` screenshot post-processing (PNG coercion, sizing, SOM
overlay if the backend did not).
The outer integration points (multimodal tool-result plumbing, screenshot
eviction in the Anthropic adapter, image-aware token estimation, the
COMPUTER_USE_GUIDANCE prompt block, approval hook, and the skill) live
alongside this package. See agent/anthropic_adapter.py and
agent/prompt_builder.py for the salvaged hunks from PR #4562.
"""
from __future__ import annotations
# Re-export the public surface so `from tools.computer_use import ...` works.
from tools.computer_use.tool import ( # noqa: F401
handle_computer_use,
set_approval_callback,
check_computer_use_requirements,
get_computer_use_schema,
)

View file

@ -1,150 +0,0 @@
"""Abstract backend interface for computer use.
Any implementation (cua-driver over MCP, pyautogui, noop, future Linux/Windows)
must return the shape described below. All methods synchronous; async is
handled inside the backend implementation if needed.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
@dataclass
class UIElement:
"""One interactable element on the current screen."""
index: int # 1-based SOM index
role: str # AX role (AXButton, AXTextField, ...)
label: str = "" # AXTitle / AXDescription / AXValue snippet
bounds: Tuple[int, int, int, int] = (0, 0, 0, 0) # x, y, w, h (logical px)
app: str = "" # owning bundle ID or app name
pid: int = 0 # owning process PID
window_id: int = 0 # SkyLight / CG window ID
attributes: Dict[str, Any] = field(default_factory=dict)
def center(self) -> Tuple[int, int]:
x, y, w, h = self.bounds
return x + w // 2, y + h // 2
@dataclass
class CaptureResult:
"""Result of a screen capture call.
At least one of png_b64 / elements is populated depending on capture mode:
* mode="vision" png_b64 only
* mode="ax" elements only
* mode="som" both (default): PNG already has numbered overlays
drawn by the backend, and `elements` holds the
matching index element mapping.
"""
mode: str
width: int # screenshot width (logical px, pre-Anthropic-scale)
height: int
png_b64: Optional[str] = None
elements: List[UIElement] = field(default_factory=list)
# Optional: the target app/window the elements were captured for.
app: str = ""
window_title: str = ""
# Raw bytes we sent to Anthropic, for token estimation.
png_bytes_len: int = 0
@dataclass
class ActionResult:
"""Result of any action (click / type / scroll / drag / key / wait)."""
ok: bool
action: str
message: str = "" # human-readable summary
# Optional trailing screenshot — set when the caller asked for a
# post-action capture or the backend always returns one.
capture: Optional[CaptureResult] = None
# Arbitrary extra fields for debugging / telemetry.
meta: Dict[str, Any] = field(default_factory=dict)
class ComputerUseBackend(ABC):
"""Lifecycle: `start()` before first use, `stop()` at shutdown."""
@abstractmethod
def start(self) -> None: ...
@abstractmethod
def stop(self) -> None: ...
@abstractmethod
def is_available(self) -> bool:
"""Return True if the backend can be used on this host right now.
Used by check_fn gating and by the post-setup wizard.
"""
# ── Capture ─────────────────────────────────────────────────────
@abstractmethod
def capture(self, mode: str = "som", app: Optional[str] = None) -> CaptureResult: ...
# ── Pointer actions ─────────────────────────────────────────────
@abstractmethod
def click(
self,
*,
element: Optional[int] = None,
x: Optional[int] = None,
y: Optional[int] = None,
button: str = "left", # left | right | middle
click_count: int = 1,
modifiers: Optional[List[str]] = None,
) -> ActionResult: ...
@abstractmethod
def drag(
self,
*,
from_element: Optional[int] = None,
to_element: Optional[int] = None,
from_xy: Optional[Tuple[int, int]] = None,
to_xy: Optional[Tuple[int, int]] = None,
button: str = "left",
modifiers: Optional[List[str]] = None,
) -> ActionResult: ...
@abstractmethod
def scroll(
self,
*,
direction: str, # up | down | left | right
amount: int = 3, # wheel ticks
element: Optional[int] = None,
x: Optional[int] = None,
y: Optional[int] = None,
modifiers: Optional[List[str]] = None,
) -> ActionResult: ...
# ── Keyboard ────────────────────────────────────────────────────
@abstractmethod
def type_text(self, text: str) -> ActionResult: ...
@abstractmethod
def key(self, keys: str) -> ActionResult:
"""Send a key combo, e.g. 'cmd+s', 'ctrl+alt+t', 'return'."""
# ── Introspection ───────────────────────────────────────────────
@abstractmethod
def list_apps(self) -> List[Dict[str, Any]]:
"""Return running apps with bundle IDs, PIDs, window counts."""
@abstractmethod
def focus_app(self, app: str, raise_window: bool = False) -> ActionResult:
"""Route input to `app` (by name or bundle ID). Default: focus without raise."""
# ── Timing ──────────────────────────────────────────────────────
def wait(self, seconds: float) -> ActionResult:
"""Default implementation: time.sleep."""
import time
time.sleep(max(0.0, min(seconds, 30.0)))
return ActionResult(ok=True, action="wait", message=f"waited {seconds:.2f}s")

View file

@ -1,675 +0,0 @@
"""Cua-driver backend (macOS only).
Speaks MCP over stdio to `cua-driver`. The Python `mcp` SDK is async, so we
run a dedicated asyncio event loop on a background thread and marshal sync
calls through it.
Install: `/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/trycua/cua/main/libs/cua-driver/scripts/install.sh)"`
After install, `cua-driver` is on $PATH and supports `cua-driver mcp` (stdio
transport) which is what we invoke.
The private SkyLight SPIs cua-driver uses (SLEventPostToPid, SLPSPostEvent-
RecordTo, _AXObserverAddNotificationAndCheckRemote) are not Apple-public and
can break on OS updates. Pin the installed version via `HERMES_CUA_DRIVER_
VERSION` if you want reproducibility across an OS bump.
"""
from __future__ import annotations
import asyncio
import base64
import json
import logging
import os
import platform
import re
import shutil
import subprocess
import sys
import threading
from concurrent.futures import Future
from typing import Any, Dict, List, Optional, Tuple
from tools.computer_use.backend import (
ActionResult,
CaptureResult,
ComputerUseBackend,
UIElement,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Version pinning
# ---------------------------------------------------------------------------
PINNED_CUA_DRIVER_VERSION = os.environ.get("HERMES_CUA_DRIVER_VERSION", "0.5.0")
_CUA_DRIVER_CMD = os.environ.get("HERMES_CUA_DRIVER_CMD", "cua-driver")
_CUA_DRIVER_ARGS = ["mcp"] # stdio MCP transport
# Regex to parse list_windows text output lines:
# "- AppName (pid 12345) "Title" [window_id: 67890]"
_WINDOW_LINE_RE = re.compile(
r'^-\s+(.+?)\s+\(pid\s+(\d+)\)\s+.*\[window_id:\s+(\d+)\]',
re.MULTILINE,
)
# Regex to parse element lines from get_window_state AX tree markdown:
# " - [N] AXRole "label""
_ELEMENT_LINE_RE = re.compile(
r'^\s*-\s+\[(\d+)\]\s+(\w+)(?:\s+"([^"]*)")?',
re.MULTILINE,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _is_macos() -> bool:
return sys.platform == "darwin"
def _is_arm_mac() -> bool:
return _is_macos() and platform.machine() == "arm64"
def cua_driver_binary_available() -> bool:
"""True if `cua-driver` is on $PATH or HERMES_CUA_DRIVER_CMD resolves."""
return bool(shutil.which(_CUA_DRIVER_CMD))
def cua_driver_install_hint() -> str:
return (
"cua-driver is not installed. Install with:\n"
' /bin/bash -c "$(curl -fsSL '
'https://raw.githubusercontent.com/trycua/cua/main/libs/cua-driver/scripts/install.sh)"\n'
"Or run `hermes tools` and enable the Computer Use toolset to install it automatically."
)
def _parse_windows_from_text(text: str) -> List[Dict[str, Any]]:
"""Parse window records from list_windows text output."""
windows = []
for m in _WINDOW_LINE_RE.finditer(text):
windows.append({
"app_name": m.group(1).strip(),
"pid": int(m.group(2)),
"window_id": int(m.group(3)),
"off_screen": "[off-screen]" in m.group(0),
})
return windows
def _parse_elements_from_tree(markdown: str) -> List[UIElement]:
"""Parse UIElement list from get_window_state AX tree markdown."""
elements = []
for m in _ELEMENT_LINE_RE.finditer(markdown):
elements.append(UIElement(
index=int(m.group(1)),
role=m.group(2),
label=m.group(3) or "",
bounds=(0, 0, 0, 0),
))
return elements
def _split_tree_text(full_text: str) -> Tuple[str, str]:
"""Split get_window_state text into (summary_line, tree_markdown)."""
lines = full_text.split("\n", 1)
summary = lines[0]
tree = lines[1] if len(lines) > 1 else ""
return summary, tree
def _parse_key_combo(keys: str) -> Tuple[Optional[str], List[str]]:
"""Parse a key string like 'cmd+s' into (key, modifiers).
Returns (key, modifiers) where key is the non-modifier key and modifiers
is a list of modifier names (cmd, shift, option, ctrl).
"""
MODIFIER_NAMES = {"cmd", "command", "shift", "option", "alt", "ctrl", "control", "fn"}
KEY_ALIASES = {"command": "cmd", "alt": "option", "control": "ctrl"}
parts = [p.strip().lower() for p in re.split(r'[+\-]', keys) if p.strip()]
modifiers = []
key = None
for part in parts:
normalized = KEY_ALIASES.get(part, part)
if normalized in MODIFIER_NAMES:
modifiers.append(normalized)
else:
key = part # last non-modifier wins
return key, modifiers
# ---------------------------------------------------------------------------
# Asyncio bridge — one long-lived loop on a background thread
# ---------------------------------------------------------------------------
class _AsyncBridge:
"""Runs one asyncio loop on a daemon thread; marshals coroutines from the caller."""
def __init__(self) -> None:
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._thread: Optional[threading.Thread] = None
self._ready = threading.Event()
def start(self) -> None:
if self._thread and self._thread.is_alive():
return
self._ready.clear()
def _run() -> None:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._ready.set()
try:
self._loop.run_forever()
finally:
try:
self._loop.close()
except Exception:
pass
self._thread = threading.Thread(target=_run, daemon=True, name="cua-driver-loop")
self._thread.start()
if not self._ready.wait(timeout=5.0):
raise RuntimeError("cua-driver asyncio bridge failed to start")
def run(self, coro, timeout: Optional[float] = 30.0) -> Any:
if not self._loop or not self._thread or not self._thread.is_alive():
raise RuntimeError("cua-driver bridge not started")
fut: Future = asyncio.run_coroutine_threadsafe(coro, self._loop)
return fut.result(timeout=timeout)
def stop(self) -> None:
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._thread:
self._thread.join(timeout=2.0)
self._thread = None
self._loop = None
# ---------------------------------------------------------------------------
# MCP session (lazy, shared across tool calls)
# ---------------------------------------------------------------------------
class _CuaDriverSession:
"""Holds the mcp ClientSession. Spawned lazily; re-entered on drop."""
def __init__(self, bridge: _AsyncBridge) -> None:
self._bridge = bridge
self._session = None
self._exit_stack = None
self._lock = threading.Lock()
self._started = False
def _require_started(self) -> None:
if not self._started:
raise RuntimeError("cua-driver session not started")
async def _aenter(self) -> None:
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
if not cua_driver_binary_available():
raise RuntimeError(cua_driver_install_hint())
params = StdioServerParameters(
command=_CUA_DRIVER_CMD,
args=_CUA_DRIVER_ARGS,
env={**os.environ},
)
stack = AsyncExitStack()
read, write = await stack.enter_async_context(stdio_client(params))
session = await stack.enter_async_context(ClientSession(read, write))
await session.initialize()
self._exit_stack = stack
self._session = session
async def _aexit(self) -> None:
if self._exit_stack is not None:
try:
await self._exit_stack.aclose()
except Exception as e:
logger.warning("cua-driver shutdown error: %s", e)
self._exit_stack = None
self._session = None
def start(self) -> None:
with self._lock:
if self._started:
return
self._bridge.start()
self._bridge.run(self._aenter(), timeout=15.0)
self._started = True
def stop(self) -> None:
with self._lock:
if not self._started:
return
try:
self._bridge.run(self._aexit(), timeout=5.0)
finally:
self._started = False
async def _call_tool_async(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
result = await self._session.call_tool(name, args)
return _extract_tool_result(result)
def call_tool(self, name: str, args: Dict[str, Any], timeout: float = 30.0) -> Dict[str, Any]:
self._require_started()
return self._bridge.run(self._call_tool_async(name, args), timeout=timeout)
def _extract_tool_result(mcp_result: Any) -> Dict[str, Any]:
"""Convert an mcp CallToolResult into a plain dict.
cua-driver returns a mix of text parts, image parts, and structuredContent.
We flatten into:
{
"data": <text or parsed json>,
"images": [b64, ...],
"structuredContent": <dict|None>,
"isError": bool,
}
structuredContent is populated from the MCP result's structuredContent field
(MCP spec §2024-11-05+) and takes precedence for structured data like
list_windows window arrays.
"""
data: Any = None
images: List[str] = []
is_error = bool(getattr(mcp_result, "isError", False))
structured: Optional[Dict] = getattr(mcp_result, "structuredContent", None) or None
text_chunks: List[str] = []
for part in getattr(mcp_result, "content", []) or []:
ptype = getattr(part, "type", None)
if ptype == "text":
text_chunks.append(getattr(part, "text", "") or "")
elif ptype == "image":
b64 = getattr(part, "data", None)
if b64:
images.append(b64)
if text_chunks:
joined = "\n".join(t for t in text_chunks if t)
try:
data = json.loads(joined) if joined.strip().startswith(("{", "[")) else joined
except json.JSONDecodeError:
data = joined
return {"data": data, "images": images, "structuredContent": structured, "isError": is_error}
# ---------------------------------------------------------------------------
# The backend itself
# ---------------------------------------------------------------------------
class CuaDriverBackend(ComputerUseBackend):
"""Default computer-use backend. macOS-only via cua-driver MCP."""
def __init__(self) -> None:
self._bridge = _AsyncBridge()
self._session = _CuaDriverSession(self._bridge)
# Sticky context — updated by capture(), used by action tools.
self._active_pid: Optional[int] = None
self._active_window_id: Optional[int] = None
# ── Lifecycle ──────────────────────────────────────────────────
def start(self) -> None:
self._session.start()
def stop(self) -> None:
try:
self._session.stop()
finally:
self._bridge.stop()
def is_available(self) -> bool:
if not _is_macos():
return False
return cua_driver_binary_available()
# ── Capture ────────────────────────────────────────────────────
def capture(self, mode: str = "som", app: Optional[str] = None) -> CaptureResult:
"""Capture the frontmost on-screen window (optionally filtered by app name).
Maps hermes `capture(mode, app)` cua-driver `list_windows` +
`get_window_state` (ax/som) or `screenshot` (vision).
"""
# Step 1: enumerate on-screen windows to find target pid/window_id.
lw_out = self._session.call_tool("list_windows", {"on_screen_only": True})
# Prefer structuredContent.windows (MCP 2024-11-05+); fall back to
# text-line parsing for older cua-driver builds.
sc = lw_out.get("structuredContent") or {}
raw_windows = sc.get("windows") if sc else None
if raw_windows:
windows = [
{
"app_name": w.get("app_name", ""),
"pid": int(w["pid"]),
"window_id": int(w["window_id"]),
"off_screen": not w.get("is_on_screen", True),
"title": w.get("title", ""),
"z_index": w.get("z_index", 0),
}
for w in raw_windows
]
# Sort by z_index descending (lowest z_index = frontmost on macOS).
windows.sort(key=lambda w: w["z_index"])
else:
raw_text = lw_out["data"] if isinstance(lw_out["data"], str) else ""
windows = _parse_windows_from_text(raw_text)
if not windows:
return CaptureResult(mode=mode, width=0, height=0, png_b64=None,
elements=[], app="", window_title="", png_bytes_len=0)
# Filter by app name (case-insensitive substring) if requested.
if app:
app_lower = app.lower()
filtered = [w for w in windows if app_lower in w["app_name"].lower()]
if filtered:
windows = filtered
# Pick first on-screen window (sorted by z_index / z-order above).
target = next((w for w in windows if not w["off_screen"]), windows[0])
self._active_pid = target["pid"]
self._active_window_id = target["window_id"]
app_name = target["app_name"]
# Step 2: capture.
png_b64: Optional[str] = None
elements: List[UIElement] = []
width = height = 0
window_title = ""
if mode == "vision":
# screenshot tool: just the PNG, no AX walk.
sc_out = self._session.call_tool(
"screenshot",
{"window_id": self._active_window_id, "format": "jpeg", "quality": 85},
)
if sc_out["images"]:
png_b64 = sc_out["images"][0]
else:
# get_window_state: AX tree + optional screenshot.
gws_out = self._session.call_tool(
"get_window_state",
{"pid": self._active_pid, "window_id": self._active_window_id},
)
text = gws_out["data"] if isinstance(gws_out["data"], str) else ""
summary, tree = _split_tree_text(text)
# Parse element count from summary e.g. "✅ AppName — 42 elements, turn 3..."
m = re.search(r'(\d+)\s+elements?', summary)
if tree and not gws_out["images"]:
# ax mode — no screenshot
elements = _parse_elements_from_tree(tree)
elif gws_out["images"]:
png_b64 = gws_out["images"][0]
elements = _parse_elements_from_tree(tree)
# Extract window title from the AX tree first AXWindow line.
wt = re.search(r'AXWindow\s+"([^"]+)"', tree)
if wt:
window_title = wt.group(1)
png_bytes_len = 0
if png_b64:
try:
png_bytes_len = len(base64.b64decode(png_b64, validate=False))
except Exception:
png_bytes_len = len(png_b64) * 3 // 4
return CaptureResult(
mode=mode,
width=width,
height=height,
png_b64=png_b64,
elements=elements,
app=app_name,
window_title=window_title,
png_bytes_len=png_bytes_len,
)
# ── Pointer ────────────────────────────────────────────────────
def click(
self,
*,
element: Optional[int] = None,
x: Optional[int] = None,
y: Optional[int] = None,
button: str = "left",
click_count: int = 1,
modifiers: Optional[List[str]] = None,
) -> ActionResult:
pid = self._active_pid
if pid is None:
return ActionResult(ok=False, action="click",
message="No active window — call capture() first.")
# Choose tool based on button and click_count.
if button == "right":
tool = "right_click"
elif click_count == 2:
tool = "double_click"
else:
tool = "click"
args: Dict[str, Any] = {"pid": pid}
if element is not None:
if self._active_window_id is None:
return ActionResult(ok=False, action=tool,
message="No active window_id for element_index click.")
args["element_index"] = element
args["window_id"] = self._active_window_id
elif x is not None and y is not None:
args["x"] = x
args["y"] = y
else:
return ActionResult(ok=False, action=tool,
message="click requires element= or x/y.")
if modifiers:
args["modifier"] = modifiers
return self._action(tool, args)
def drag(
self,
*,
from_element: Optional[int] = None,
to_element: Optional[int] = None,
from_xy: Optional[Tuple[int, int]] = None,
to_xy: Optional[Tuple[int, int]] = None,
button: str = "left",
modifiers: Optional[List[str]] = None,
) -> ActionResult:
# cua-driver does not expose a drag tool.
return ActionResult(ok=False, action="drag",
message="drag is not supported by the cua-driver backend.")
def scroll(
self,
*,
direction: str,
amount: int = 3,
element: Optional[int] = None,
x: Optional[int] = None,
y: Optional[int] = None,
modifiers: Optional[List[str]] = None,
) -> ActionResult:
pid = self._active_pid
if pid is None:
return ActionResult(ok=False, action="scroll",
message="No active window — call capture() first.")
args: Dict[str, Any] = {
"pid": pid,
"direction": direction,
"amount": max(1, min(50, amount)),
}
if element is not None and self._active_window_id is not None:
args["element_index"] = element
args["window_id"] = self._active_window_id
elif x is not None and y is not None:
args["x"] = x
args["y"] = y
return self._action("scroll", args)
# ── Keyboard ───────────────────────────────────────────────────
def type_text(self, text: str) -> ActionResult:
pid = self._active_pid
if pid is None:
return ActionResult(ok=False, action="type_text",
message="No active window — call capture() first.")
# Safari WebKit AXTextField does not accept AX attribute writes (type_text),
# so use type_text_chars which synthesises individual key events instead.
# This works universally across all macOS apps in background mode.
return self._action("type_text_chars", {"pid": pid, "text": text})
def key(self, keys: str) -> ActionResult:
pid = self._active_pid
if pid is None:
return ActionResult(ok=False, action="key",
message="No active window — call capture() first.")
key_name, modifiers = _parse_key_combo(keys)
if not key_name:
return ActionResult(ok=False, action="key",
message=f"Could not parse key from '{keys}'.")
if modifiers:
# hotkey requires at least one modifier + one key.
return self._action("hotkey", {"pid": pid, "keys": modifiers + [key_name]})
else:
return self._action("press_key", {"pid": pid, "key": key_name})
# ── Value setter ────────────────────────────────────────────────
def set_value(self, value: str, element: Optional[int] = None) -> ActionResult:
"""Set a value on an element. Handles AXPopUpButton selects natively."""
pid = self._active_pid
window_id = self._active_window_id
if pid is None or window_id is None:
return ActionResult(ok=False, action="set_value",
message="No active window — call capture() first.")
if element is None:
return ActionResult(ok=False, action="set_value",
message="set_value requires element= (element index).")
args: Dict[str, Any] = {
"pid": pid,
"window_id": window_id,
"element_index": element,
"value": value,
}
return self._action("set_value", args)
# ── Introspection ──────────────────────────────────────────────
def list_apps(self) -> List[Dict[str, Any]]:
out = self._session.call_tool("list_apps", {})
data = out["data"]
if isinstance(data, list):
return data
if isinstance(data, dict):
return data.get("apps", [])
# list_apps returns plain text — parse app lines.
if isinstance(data, str):
apps = []
for line in data.splitlines():
m = re.search(r'(.+?)\s+\(pid\s+(\d+)\)', line)
if m:
apps.append({"name": m.group(1).strip(), "pid": int(m.group(2))})
return apps
return []
def focus_app(self, app: str, raise_window: bool = False) -> ActionResult:
"""Target an app for subsequent actions without stealing system focus.
cua-driver background-automation never needs to bring a window to the
front: capture(app=...) already selects the right window via
list_windows. We implement focus_app as a pure window-selector
enumerate on-screen windows, find the best match for *app*, and store
its pid/window_id so that subsequent click/type calls hit the right
process.
raise_window=True is intentionally ignored: stealing the user's focus
is exactly what this backend is designed to avoid.
"""
lw_out = self._session.call_tool("list_windows", {"on_screen_only": True})
sc = lw_out.get("structuredContent") or {}
raw_windows = sc.get("windows") if sc else None
if raw_windows:
windows = [
{
"app_name": w.get("app_name", ""),
"pid": int(w["pid"]),
"window_id": int(w["window_id"]),
"z_index": w.get("z_index", 0),
}
for w in raw_windows
]
windows.sort(key=lambda w: w["z_index"])
else:
raw_text = lw_out["data"] if isinstance(lw_out["data"], str) else ""
windows = _parse_windows_from_text(raw_text)
app_lower = app.lower()
matched = [w for w in windows if app_lower in w["app_name"].lower()]
target = matched[0] if matched else (windows[0] if windows else None)
if target:
self._active_pid = target["pid"]
self._active_window_id = target["window_id"]
return ActionResult(
ok=True, action="focus_app",
message=f"Targeted {target['app_name']} (pid {self._active_pid}, "
f"window {self._active_window_id}) without raising window.",
)
return ActionResult(ok=False, action="focus_app",
message=f"No on-screen window found for app '{app}'.")
# ── Internal ───────────────────────────────────────────────────
def _action(self, name: str, args: Dict[str, Any]) -> ActionResult:
try:
out = self._session.call_tool(name, args)
except Exception as e:
logger.exception("cua-driver %s call failed", name)
return ActionResult(ok=False, action=name, message=f"cua-driver error: {e}")
ok = not out["isError"]
message = ""
data = out["data"]
if isinstance(data, dict):
message = str(data.get("message", ""))
elif isinstance(data, str):
message = data
return ActionResult(ok=ok, action=name, message=message,
meta=data if isinstance(data, dict) else {})
def _parse_element(d: Dict[str, Any]) -> UIElement:
bounds = d.get("bounds") or (0, 0, 0, 0)
if isinstance(bounds, dict):
bounds = (
int(bounds.get("x", 0)),
int(bounds.get("y", 0)),
int(bounds.get("w", bounds.get("width", 0))),
int(bounds.get("h", bounds.get("height", 0))),
)
elif isinstance(bounds, (list, tuple)) and len(bounds) == 4:
bounds = tuple(int(v) for v in bounds)
else:
bounds = (0, 0, 0, 0)
return UIElement(
index=int(d.get("index", 0)),
role=str(d.get("role", "") or ""),
label=str(d.get("label", "") or ""),
bounds=bounds, # type: ignore[arg-type]
app=str(d.get("app", "") or ""),
pid=int(d.get("pid", 0) or 0),
window_id=int(d.get("windowId", 0) or 0),
attributes={k: v for k, v in d.items()
if k not in ("index", "role", "label", "bounds", "app", "pid", "windowId")},
)

View file

@ -1,191 +0,0 @@
"""Schema for the generic `computer_use` tool.
Model-agnostic. Any tool-calling model can drive this. Vision-capable models
should prefer `capture(mode='som')` then `click(element=N)` much more
reliable than pixel coordinates. Pixel coordinates remain supported for
models that were trained on them (e.g. Claude's computer-use RL).
"""
from __future__ import annotations
from typing import Any, Dict
# One consolidated tool with an `action` discriminator. Keeps the schema
# compact and the per-turn token cost low.
COMPUTER_USE_SCHEMA: Dict[str, Any] = {
"name": "computer_use",
"description": (
"Drive the macOS desktop in the background — screenshots, mouse, "
"keyboard, scroll, drag — without stealing the user's cursor, "
"keyboard focus, or Space. Preferred workflow: call with "
"action='capture' (mode='som' gives numbered element overlays), "
"then click by `element` index for reliability. Pixel coordinates "
"are supported for models trained on them. Works on any window — "
"hidden, minimized, on another Space, or behind another app. "
"macOS only; requires cua-driver to be installed."
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [
"capture",
"click",
"double_click",
"right_click",
"middle_click",
"drag",
"scroll",
"type",
"key",
"set_value",
"wait",
"list_apps",
"focus_app",
],
"description": (
"Which action to perform. `capture` is free (no side "
"effects). All other actions require approval unless "
"auto-approved. Use `set_value` for select/popup elements "
"and sliders — it selects the matching option directly "
"without opening the native menu (no focus steal)."
),
},
# ── capture ────────────────────────────────────────────
"mode": {
"type": "string",
"enum": ["som", "vision", "ax"],
"description": (
"Capture mode. `som` (default) is a screenshot with "
"numbered overlays on every interactable element plus "
"the AX tree — best for vision models, lets you click "
"by element index. `vision` is a plain screenshot. "
"`ax` is the accessibility tree only (no image; useful "
"for text-only models)."
),
},
"app": {
"type": "string",
"description": (
"Optional. Limit capture/action to a specific app "
"(by name, e.g. 'Safari', or bundle ID, "
"'com.apple.Safari'). If omitted, operates on the "
"frontmost app's window or the whole screen."
),
},
# ── click / drag / scroll targeting ────────────────────
"element": {
"type": "integer",
"description": (
"The 1-based SOM index returned by the last "
"`capture(mode='som')` call. Strongly preferred over "
"raw coordinates."
),
},
"coordinate": {
"type": "array",
"items": {"type": "integer"},
"minItems": 2,
"maxItems": 2,
"description": (
"Pixel coordinates [x, y] in logical screen space (as "
"returned by capture width/height). Only use this if "
"no element index is available."
),
},
"button": {
"type": "string",
"enum": ["left", "right", "middle"],
"description": "Mouse button. Defaults to left.",
},
"modifiers": {
"type": "array",
"items": {
"type": "string",
"enum": ["cmd", "shift", "option", "alt", "ctrl", "fn"],
},
"description": "Modifier keys held during the action.",
},
# ── drag ───────────────────────────────────────────────
"from_element": {"type": "integer",
"description": "Source element index (drag)."},
"to_element": {"type": "integer",
"description": "Target element index (drag)."},
"from_coordinate": {
"type": "array",
"items": {"type": "integer"},
"minItems": 2, "maxItems": 2,
"description": "Source [x,y] (drag; use when no element available).",
},
"to_coordinate": {
"type": "array",
"items": {"type": "integer"},
"minItems": 2, "maxItems": 2,
"description": "Target [x,y] (drag; use when no element available).",
},
# ── scroll ─────────────────────────────────────────────
"direction": {
"type": "string",
"enum": ["up", "down", "left", "right"],
"description": "Scroll direction.",
},
"amount": {
"type": "integer",
"description": "Scroll wheel ticks. Default 3.",
},
# ── set_value ──────────────────────────────────────────
"value": {
"type": "string",
"description": (
"For action='set_value': the value to set on the element. "
"For AXPopUpButton / select dropdowns, pass the option's "
"display label (e.g. 'Blue'). For sliders and other "
"AXValue-settable elements, pass the numeric or string value."
),
},
# ── type / key / wait ──────────────────────────────────
"text": {
"type": "string",
"description": "Text to type (respects the current layout).",
},
"keys": {
"type": "string",
"description": (
"Key combo, e.g. 'cmd+s', 'ctrl+alt+t', 'return', "
"'escape', 'tab'. Use '+' to combine."
),
},
"seconds": {
"type": "number",
"description": "Seconds to wait. Max 30.",
},
# ── focus_app ──────────────────────────────────────────
"raise_window": {
"type": "boolean",
"description": (
"Only for action='focus_app'. If true, brings the "
"window to front (DISRUPTS the user). Default false "
"— input is routed to the app without raising, "
"matching the background co-work model."
),
},
# ── return shape ───────────────────────────────────────
"capture_after": {
"type": "boolean",
"description": (
"If true, take a follow-up capture after the action "
"and include it in the response. Saves a round-trip "
"when you need to verify an action's effect."
),
},
},
"required": ["action"],
},
}
def get_computer_use_schema() -> Dict[str, Any]:
"""Return the generic OpenAI function-calling schema."""
return COMPUTER_USE_SCHEMA

View file

@ -1,521 +0,0 @@
"""Entry point for the `computer_use` tool.
Universal (any-model) macOS desktop control via cua-driver's background
computer-use primitive. Replaces #4562's Anthropic-native `computer_20251124`
approach the schema here is standard OpenAI function-calling so every
tool-capable model can drive it.
Return contract
---------------
For text-only results (wait, key, list_apps, focus_app, failures, etc.):
JSON string.
For captures / actions with `capture_after=True`:
A dict wrapped as the OpenAI-style multi-part tool-message content:
{
"_multimodal": True,
"content": [
{"type": "text", "text": "<human-readable summary + SOM index>"},
{"type": "image_url",
"image_url": {"url": "data:image/png;base64,<b64>"}},
],
"text_summary": "<text used for fallback string content>",
}
run_agent.py's tool-message builder inspects `_multimodal` and emits a
list-shaped `content` for OpenAI-compatible providers. The Anthropic
adapter splices the base64 image into a `tool_result` block (see
`agent/anthropic_adapter.py`). Every provider that supports multi-part
tool content gets the image; text-only providers see the summary only.
"""
from __future__ import annotations
import json
import logging
import os
import re
import sys
import threading
from typing import Any, Dict, List, Optional, Tuple
from tools.computer_use.backend import (
ActionResult,
CaptureResult,
ComputerUseBackend,
UIElement,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Approval & safety
# ---------------------------------------------------------------------------
_approval_callback = None
def set_approval_callback(cb) -> None:
"""Register a callback for computer_use approval prompts (used by CLI).
Matches the terminal_tool._approval_callback pattern. The callback
receives (action, args, summary) and returns one of:
"approve_once" | "approve_session" | "always_approve" | "deny".
"""
global _approval_callback
_approval_callback = cb
# Actions that read, not mutate. Always allowed.
_SAFE_ACTIONS = frozenset({"capture", "wait", "list_apps"})
# Actions that mutate user-visible state. Go through approval.
_DESTRUCTIVE_ACTIONS = frozenset({
"click", "double_click", "right_click", "middle_click",
"drag", "scroll", "type", "key", "set_value", "focus_app",
})
# Hard-blocked key combinations. Mirrored from #4562 — these are destructive
# regardless of approval level (e.g. logout kills the session Hermes runs in).
_BLOCKED_KEY_COMBOS = {
frozenset({"cmd", "shift", "backspace"}), # empty trash
frozenset({"cmd", "option", "backspace"}), # force delete
frozenset({"cmd", "ctrl", "q"}), # lock screen
frozenset({"cmd", "shift", "q"}), # log out
frozenset({"cmd", "option", "shift", "q"}), # force log out
}
_KEY_ALIASES = {"command": "cmd", "control": "ctrl", "alt": "option", "": "cmd", "": "option"}
def _canon_key_combo(keys: str) -> frozenset:
parts = [p.strip().lower() for p in re.split(r"\s*\+\s*", keys) if p.strip()]
parts = [_KEY_ALIASES.get(p, p) for p in parts]
return frozenset(parts)
# Dangerous text patterns for the `type` action. Same list as #4562.
_BLOCKED_TYPE_PATTERNS = [
re.compile(r"curl\s+[^|]*\|\s*bash", re.IGNORECASE),
re.compile(r"curl\s+[^|]*\|\s*sh", re.IGNORECASE),
re.compile(r"wget\s+[^|]*\|\s*bash", re.IGNORECASE),
re.compile(r"\bsudo\s+rm\s+-[rf]", re.IGNORECASE),
re.compile(r"\brm\s+-rf\s+/\s*$", re.IGNORECASE),
re.compile(r":\s*\(\)\s*\{\s*:\|:\s*&\s*\}", re.IGNORECASE), # fork bomb
]
def _is_blocked_type(text: str) -> Optional[str]:
for pat in _BLOCKED_TYPE_PATTERNS:
if pat.search(text):
return pat.pattern
return None
# ---------------------------------------------------------------------------
# Backend selection — env-swappable for tests
# ---------------------------------------------------------------------------
# Per-process cached backend; lazily instantiated on first call.
_backend_lock = threading.Lock()
_backend: Optional[ComputerUseBackend] = None
# Session-scoped approval state.
_session_auto_approve = False
_always_allow: set = set() # action names the user unlocked for the session
def _get_backend() -> ComputerUseBackend:
global _backend
with _backend_lock:
if _backend is None:
backend_name = os.environ.get("HERMES_COMPUTER_USE_BACKEND", "cua").lower()
if backend_name in ("cua", "cua-driver", ""):
from tools.computer_use.cua_backend import CuaDriverBackend
_backend = CuaDriverBackend()
elif backend_name == "noop": # pragma: no cover
_backend = _NoopBackend()
else:
raise RuntimeError(f"Unknown HERMES_COMPUTER_USE_BACKEND={backend_name!r}")
_backend.start()
return _backend
def reset_backend_for_tests() -> None: # pragma: no cover
"""Test helper — tear down the cached backend."""
global _backend, _session_auto_approve, _always_allow
with _backend_lock:
if _backend is not None:
try:
_backend.stop()
except Exception:
pass
_backend = None
_session_auto_approve = False
_always_allow = set()
class _NoopBackend(ComputerUseBackend): # pragma: no cover
"""Test/CI stub. Records calls; returns trivial results."""
def __init__(self) -> None:
self.calls: List[Tuple[str, Dict[str, Any]]] = []
self._started = False
def start(self) -> None: self._started = True
def stop(self) -> None: self._started = False
def is_available(self) -> bool: return True
def capture(self, mode: str = "som", app: Optional[str] = None) -> CaptureResult:
self.calls.append(("capture", {"mode": mode, "app": app}))
return CaptureResult(mode=mode, width=1024, height=768, png_b64=None,
elements=[], app=app or "", window_title="")
def click(self, **kw) -> ActionResult:
self.calls.append(("click", kw))
return ActionResult(ok=True, action="click")
def drag(self, **kw) -> ActionResult:
self.calls.append(("drag", kw))
return ActionResult(ok=True, action="drag")
def scroll(self, **kw) -> ActionResult:
self.calls.append(("scroll", kw))
return ActionResult(ok=True, action="scroll")
def type_text(self, text: str) -> ActionResult:
self.calls.append(("type", {"text": text}))
return ActionResult(ok=True, action="type")
def key(self, keys: str) -> ActionResult:
self.calls.append(("key", {"keys": keys}))
return ActionResult(ok=True, action="key")
def list_apps(self) -> List[Dict[str, Any]]:
self.calls.append(("list_apps", {}))
return []
def focus_app(self, app: str, raise_window: bool = False) -> ActionResult:
self.calls.append(("focus_app", {"app": app, "raise": raise_window}))
return ActionResult(ok=True, action="focus_app")
# ---------------------------------------------------------------------------
# Dispatch
# ---------------------------------------------------------------------------
def handle_computer_use(args: Dict[str, Any], **kwargs) -> Any:
"""Main entry point — dispatched by tools.registry.
Returns either a JSON string (text-only) or a dict marked `_multimodal`
(image + summary) which run_agent.py wraps into the tool message.
"""
action = (args.get("action") or "").strip().lower()
if not action:
return json.dumps({"error": "missing `action`"})
# Safety: validate actions before approval prompt.
if action == "type":
text = args.get("text", "")
pat = _is_blocked_type(text)
if pat:
return json.dumps({
"error": f"blocked pattern in type text: {pat!r}",
"hint": "Dangerous shell patterns cannot be typed via computer_use.",
})
if action == "key":
keys = args.get("keys", "")
combo = _canon_key_combo(keys)
for blocked in _BLOCKED_KEY_COMBOS:
if blocked.issubset(combo) and len(blocked) <= len(combo):
return json.dumps({
"error": f"blocked key combo: {sorted(blocked)}",
"hint": "Destructive system shortcuts are hard-blocked.",
})
# Approval gate (destructive actions only).
if action in _DESTRUCTIVE_ACTIONS:
err = _request_approval(action, args)
if err is not None:
return err
# Dispatch to backend.
try:
backend = _get_backend()
except Exception as e:
return json.dumps({
"error": f"computer_use backend unavailable: {e}",
"hint": "Run `hermes tools` and enable Computer Use to install cua-driver.",
})
try:
return _dispatch(backend, action, args)
except Exception as e:
logger.exception("computer_use %s failed", action)
return json.dumps({"error": f"{action} failed: {e}"})
def _request_approval(action: str, args: Dict[str, Any]) -> Optional[str]:
"""Return None if approved, or a JSON error string if denied."""
global _session_auto_approve, _always_allow
if _session_auto_approve:
return None
if action in _always_allow:
return None
cb = _approval_callback
if cb is None:
# No CLI approval wired — default allow. Gateway approval is handled
# one layer out via the normal tool-approval infra.
return None
summary = _summarize_action(action, args)
try:
verdict = cb(action, args, summary)
except Exception as e:
logger.warning("approval callback failed: %s", e)
verdict = "deny"
if verdict == "approve_once":
return None
if verdict == "approve_session" or verdict == "always_approve":
_always_allow.add(action)
if verdict == "always_approve":
_session_auto_approve = True
return None
return json.dumps({"error": "denied by user", "action": action})
def _summarize_action(action: str, args: Dict[str, Any]) -> str:
if action in ("click", "double_click", "right_click", "middle_click"):
if args.get("element") is not None:
return f"{action} element #{args['element']}"
coord = args.get("coordinate")
if coord:
return f"{action} at {tuple(coord)}"
return action
if action == "drag":
src = args.get("from_element") or args.get("from_coordinate")
dst = args.get("to_element") or args.get("to_coordinate")
return f"drag {src}{dst}"
if action == "scroll":
return f"scroll {args.get('direction', '?')} x{args.get('amount', 3)}"
if action == "type":
text = args.get("text", "")
return f"type {text[:60]!r}" + ("..." if len(text) > 60 else "")
if action == "key":
return f"key {args.get('keys', '')!r}"
if action == "focus_app":
return f"focus {args.get('app', '')!r}" + (" (raise)" if args.get("raise_window") else "")
return action
def _dispatch(backend: ComputerUseBackend, action: str, args: Dict[str, Any]) -> Any:
capture_after = bool(args.get("capture_after"))
if action == "capture":
mode = str(args.get("mode", "som"))
if mode not in ("som", "vision", "ax"):
return json.dumps({"error": f"bad mode {mode!r}; use som|vision|ax"})
cap = backend.capture(mode=mode, app=args.get("app"))
return _capture_response(cap)
if action == "wait":
seconds = float(args.get("seconds", 1.0))
res = backend.wait(seconds)
return _text_response(res)
if action == "list_apps":
apps = backend.list_apps()
return json.dumps({"apps": apps, "count": len(apps)})
if action == "focus_app":
app = args.get("app")
if not app:
return json.dumps({"error": "focus_app requires `app`"})
res = backend.focus_app(app, raise_window=bool(args.get("raise_window")))
return _maybe_follow_capture(backend, res, capture_after)
if action in ("click", "double_click", "right_click", "middle_click"):
button = args.get("button")
click_count = 1
if action == "double_click":
click_count = 2
elif action == "right_click":
button = "right"
elif action == "middle_click":
button = "middle"
else:
button = button or "left"
element = args.get("element")
coord = args.get("coordinate") or (None, None)
x, y = (coord[0], coord[1]) if coord and coord[0] is not None else (None, None)
res = backend.click(
element=element if element is not None else None,
x=x, y=y, button=button or "left", click_count=click_count,
modifiers=args.get("modifiers"),
)
return _maybe_follow_capture(backend, res, capture_after)
if action == "drag":
res = backend.drag(
from_element=args.get("from_element"),
to_element=args.get("to_element"),
from_xy=tuple(args["from_coordinate"]) if args.get("from_coordinate") else None,
to_xy=tuple(args["to_coordinate"]) if args.get("to_coordinate") else None,
button=args.get("button", "left"),
modifiers=args.get("modifiers"),
)
return _maybe_follow_capture(backend, res, capture_after)
if action == "scroll":
coord = args.get("coordinate") or (None, None)
res = backend.scroll(
direction=args.get("direction", "down"),
amount=int(args.get("amount", 3)),
element=args.get("element"),
x=coord[0] if coord and coord[0] is not None else None,
y=coord[1] if coord and coord[1] is not None else None,
modifiers=args.get("modifiers"),
)
return _maybe_follow_capture(backend, res, capture_after)
if action == "type":
res = backend.type_text(args.get("text", ""))
return _maybe_follow_capture(backend, res, capture_after)
if action == "key":
res = backend.key(args.get("keys", ""))
return _maybe_follow_capture(backend, res, capture_after)
if action == "set_value":
value = args.get("value")
if value is None:
return json.dumps({"error": "set_value requires `value`"})
res = backend.set_value(value=str(value), element=args.get("element"))
return _maybe_follow_capture(backend, res, capture_after)
return json.dumps({"error": f"unknown action {action!r}"})
# ---------------------------------------------------------------------------
# Response shaping
# ---------------------------------------------------------------------------
def _text_response(res: ActionResult) -> str:
payload: Dict[str, Any] = {"ok": res.ok, "action": res.action}
if res.message:
payload["message"] = res.message
if res.meta:
payload["meta"] = res.meta
return json.dumps(payload)
def _capture_response(cap: CaptureResult) -> Any:
element_index = _format_elements(cap.elements)
summary_lines = [
f"capture mode={cap.mode} {cap.width}x{cap.height}"
+ (f" app={cap.app}" if cap.app else "")
+ (f" window={cap.window_title!r}" if cap.window_title else ""),
f"{len(cap.elements)} interactable element(s):",
]
if element_index:
summary_lines.extend(element_index)
summary = "\n".join(summary_lines)
if cap.png_b64 and cap.mode != "ax":
# Detect actual image format from base64 magic bytes so the MIME type
# matches what the data contains (cua-driver may return JPEG or PNG).
# JPEG: base64 starts with /9j/ PNG: starts with iVBOR
_b64_prefix = cap.png_b64[:8]
_mime = "image/jpeg" if _b64_prefix.startswith("/9j/") else "image/png"
return {
"_multimodal": True,
"content": [
{"type": "text", "text": summary},
{"type": "image_url",
"image_url": {"url": f"data:{_mime};base64,{cap.png_b64}"}},
],
"text_summary": summary,
"meta": {"mode": cap.mode, "width": cap.width, "height": cap.height,
"elements": len(cap.elements), "png_bytes": cap.png_bytes_len},
}
# AX-only (or image missing): text path.
return json.dumps({
"mode": cap.mode,
"width": cap.width,
"height": cap.height,
"app": cap.app,
"window_title": cap.window_title,
"elements": [_element_to_dict(e) for e in cap.elements],
"summary": summary,
})
def _maybe_follow_capture(
backend: ComputerUseBackend, res: ActionResult, do_capture: bool,
) -> Any:
if not do_capture:
return _text_response(res)
try:
cap = backend.capture(mode="som")
except Exception as e:
logger.warning("follow-up capture failed: %s", e)
return _text_response(res)
# Combine action summary with the capture.
resp = _capture_response(cap)
if isinstance(resp, dict) and resp.get("_multimodal"):
prefix = f"[{res.action}] ok={res.ok}" + (f"{res.message}" if res.message else "")
resp["content"][0]["text"] = prefix + "\n\n" + resp["content"][0]["text"]
resp["text_summary"] = prefix + "\n\n" + resp["text_summary"]
return resp
# Fallback: action + text capture merged.
try:
data = json.loads(resp)
except (TypeError, json.JSONDecodeError):
data = {"capture": resp}
data["action"] = res.action
data["ok"] = res.ok
if res.message:
data["message"] = res.message
return json.dumps(data)
def _format_elements(elements: List[UIElement], max_lines: int = 40) -> List[str]:
out: List[str] = []
for e in elements[:max_lines]:
label = e.label.replace("\n", " ")[:60]
out.append(f" #{e.index} {e.role} {label!r} @ {e.bounds}"
+ (f" [{e.app}]" if e.app else ""))
if len(elements) > max_lines:
out.append(f" ... +{len(elements) - max_lines} more (call capture with app= to narrow)")
return out
def _element_to_dict(e: UIElement) -> Dict[str, Any]:
return {
"index": e.index,
"role": e.role,
"label": e.label,
"bounds": list(e.bounds),
"app": e.app,
}
# ---------------------------------------------------------------------------
# Availability check (used by the tool registry check_fn)
# ---------------------------------------------------------------------------
def check_computer_use_requirements() -> bool:
"""Return True iff computer_use can run on this host.
Conditions: macOS + cua-driver binary installed (or override via env).
"""
if sys.platform != "darwin":
return False
from tools.computer_use.cua_backend import cua_driver_binary_available
return cua_driver_binary_available()
def get_computer_use_schema() -> Dict[str, Any]:
from tools.computer_use.schema import COMPUTER_USE_SCHEMA
return COMPUTER_USE_SCHEMA