hermes-agent/tools/computer_use/tool.py
Rodrigo fbdca64f73 fix(computer-use): skip capture_after when action failed (ok=False)
_maybe_follow_capture() issued a follow-up screenshot unconditionally
when capture_after=True, even when res.ok=False. The model then received
a normal-looking screenshot alongside an error message, and in practice
it often ignored ok=False and proceeded as if the action had succeeded.

Fix: return _text_response(res) early when res.ok is False so the model
receives only the error and can decide how to recover.

Tests added:
- test_capture_after_skipped_when_action_failed: patches click to return
  ok=False and asserts no capture call is issued.
- test_capture_after_fires_when_action_succeeds: ensures the happy path
  still triggers the follow-up capture.
2026-05-22 01:19:01 -07:00

749 lines
29 KiB
Python

"""Entry point for the `computer_use` tool.
Universal (any-model) macOS desktop control via cua-driver's background
computer-use primitive. Replaces #4562's Anthropic-native `computer_20251124`
approach — the schema here is standard OpenAI function-calling so every
tool-capable model can drive it.
Return contract
---------------
For text-only results (wait, key, list_apps, focus_app, failures, etc.):
JSON string.
For captures / actions with `capture_after=True`:
A dict wrapped as the OpenAI-style multi-part tool-message content:
{
"_multimodal": True,
"content": [
{"type": "text", "text": "<human-readable summary + SOM index>"},
{"type": "image_url",
"image_url": {"url": "data:image/png;base64,<b64>"}},
],
"text_summary": "<text used for fallback string content>",
}
run_agent.py's tool-message builder inspects `_multimodal` and emits a
list-shaped `content` for OpenAI-compatible providers. The Anthropic
adapter splices the base64 image into a `tool_result` block (see
`agent/anthropic_adapter.py`). Every provider that supports multi-part
tool content gets the image; text-only providers see the summary only.
"""
from __future__ import annotations
import json
import logging
import os
import re
import sys
import threading
from typing import Any, Dict, List, Optional, Tuple
from tools.computer_use.backend import (
ActionResult,
CaptureResult,
ComputerUseBackend,
UIElement,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Approval & safety
# ---------------------------------------------------------------------------
_approval_callback = None
def set_approval_callback(cb) -> None:
"""Register a callback for computer_use approval prompts (used by CLI).
Matches the terminal_tool._approval_callback pattern. The callback
receives (action, args, summary) and returns one of:
"approve_once" | "approve_session" | "always_approve" | "deny".
"""
global _approval_callback
_approval_callback = cb
# Actions that read, not mutate. Always allowed.
_SAFE_ACTIONS = frozenset({"capture", "wait", "list_apps"})
# Actions that mutate user-visible state. Go through approval.
_DESTRUCTIVE_ACTIONS = frozenset({
"click", "double_click", "right_click", "middle_click",
"drag", "scroll", "type", "key", "set_value", "focus_app",
})
# Hard-blocked key combinations. Mirrored from #4562 — these are destructive
# regardless of approval level (e.g. logout kills the session Hermes runs in).
_BLOCKED_KEY_COMBOS = {
frozenset({"cmd", "shift", "backspace"}), # empty trash
frozenset({"cmd", "option", "backspace"}), # force delete
frozenset({"cmd", "ctrl", "q"}), # lock screen
frozenset({"cmd", "shift", "q"}), # log out
frozenset({"cmd", "option", "shift", "q"}), # force log out
}
_KEY_ALIASES = {"command": "cmd", "control": "ctrl", "alt": "option", "": "cmd", "": "option"}
def _canon_key_combo(keys: str) -> frozenset:
parts = [p.strip().lower() for p in re.split(r"\s*\+\s*", keys) if p.strip()]
parts = [_KEY_ALIASES.get(p, p) for p in parts]
return frozenset(parts)
# Dangerous text patterns for the `type` action. Same list as #4562.
_BLOCKED_TYPE_PATTERNS = [
re.compile(r"curl\s+[^|]*\|\s*bash", re.IGNORECASE),
re.compile(r"curl\s+[^|]*\|\s*sh", re.IGNORECASE),
re.compile(r"wget\s+[^|]*\|\s*bash", re.IGNORECASE),
re.compile(r"\bsudo\s+rm\s+-[rf]", re.IGNORECASE),
re.compile(r"\brm\s+-rf\s+/\s*$", re.IGNORECASE),
re.compile(r":\s*\(\)\s*\{\s*:\|:\s*&\s*\}", re.IGNORECASE), # fork bomb
]
def _is_blocked_type(text: str) -> Optional[str]:
for pat in _BLOCKED_TYPE_PATTERNS:
if pat.search(text):
return pat.pattern
return None
# ---------------------------------------------------------------------------
# Backend selection — env-swappable for tests
# ---------------------------------------------------------------------------
# Per-process cached backend; lazily instantiated on first call.
_backend_lock = threading.Lock()
_backend: Optional[ComputerUseBackend] = None
# Session-scoped approval state.
_session_auto_approve = False
_always_allow: set = set() # action names the user unlocked for the session
def _get_backend() -> ComputerUseBackend:
global _backend
with _backend_lock:
if _backend is None:
backend_name = os.environ.get("HERMES_COMPUTER_USE_BACKEND", "cua").lower()
if backend_name in {"cua", "cua-driver", ""}:
from tools.computer_use.cua_backend import CuaDriverBackend
_backend = CuaDriverBackend()
elif backend_name == "noop": # pragma: no cover
_backend = _NoopBackend()
else:
raise RuntimeError(f"Unknown HERMES_COMPUTER_USE_BACKEND={backend_name!r}")
_backend.start()
return _backend
def reset_backend_for_tests() -> None: # pragma: no cover
"""Test helper — tear down the cached backend."""
global _backend, _session_auto_approve, _always_allow
with _backend_lock:
if _backend is not None:
try:
_backend.stop()
except Exception:
pass
_backend = None
_session_auto_approve = False
_always_allow = set()
class _NoopBackend(ComputerUseBackend): # pragma: no cover
"""Test/CI stub. Records calls; returns trivial results."""
def __init__(self) -> None:
self.calls: List[Tuple[str, Dict[str, Any]]] = []
self._started = False
def start(self) -> None: self._started = True
def stop(self) -> None: self._started = False
def is_available(self) -> bool: return True
def capture(self, mode: str = "som", app: Optional[str] = None) -> CaptureResult:
self.calls.append(("capture", {"mode": mode, "app": app}))
return CaptureResult(mode=mode, width=1024, height=768, png_b64=None,
elements=[], app=app or "", window_title="")
def click(self, **kw) -> ActionResult:
self.calls.append(("click", kw))
return ActionResult(ok=True, action="click")
def drag(self, **kw) -> ActionResult:
self.calls.append(("drag", kw))
return ActionResult(ok=True, action="drag")
def scroll(self, **kw) -> ActionResult:
self.calls.append(("scroll", kw))
return ActionResult(ok=True, action="scroll")
def type_text(self, text: str) -> ActionResult:
self.calls.append(("type", {"text": text}))
return ActionResult(ok=True, action="type")
def key(self, keys: str) -> ActionResult:
self.calls.append(("key", {"keys": keys}))
return ActionResult(ok=True, action="key")
def list_apps(self) -> List[Dict[str, Any]]:
self.calls.append(("list_apps", {}))
return []
def focus_app(self, app: str, raise_window: bool = False) -> ActionResult:
self.calls.append(("focus_app", {"app": app, "raise": raise_window}))
return ActionResult(ok=True, action="focus_app")
def set_value(self, value: str, element: Optional[int] = None) -> ActionResult:
self.calls.append(("set_value", {"value": value, "element": element}))
return ActionResult(ok=True, action="set_value")
# ---------------------------------------------------------------------------
# Dispatch
# ---------------------------------------------------------------------------
def handle_computer_use(args: Dict[str, Any], **kwargs) -> Any:
"""Main entry point — dispatched by tools.registry.
Returns either a JSON string (text-only) or a dict marked `_multimodal`
(image + summary) which run_agent.py wraps into the tool message.
"""
action = (args.get("action") or "").strip().lower()
if not action:
return json.dumps({"error": "missing `action`"})
# Safety: validate actions before approval prompt.
if action == "type":
text = args.get("text", "")
pat = _is_blocked_type(text)
if pat:
return json.dumps({
"error": f"blocked pattern in type text: {pat!r}",
"hint": "Dangerous shell patterns cannot be typed via computer_use.",
})
if action == "key":
keys = args.get("keys", "")
combo = _canon_key_combo(keys)
for blocked in _BLOCKED_KEY_COMBOS:
if blocked.issubset(combo) and len(blocked) <= len(combo):
return json.dumps({
"error": f"blocked key combo: {sorted(blocked)}",
"hint": "Destructive system shortcuts are hard-blocked.",
})
# Approval gate (destructive actions only).
if action in _DESTRUCTIVE_ACTIONS:
err = _request_approval(action, args)
if err is not None:
return err
# Dispatch to backend.
try:
backend = _get_backend()
except Exception as e:
return json.dumps({
"error": f"computer_use backend unavailable: {e}",
"hint": "Run `hermes tools` and enable Computer Use to install cua-driver.",
})
try:
return _dispatch(backend, action, args)
except Exception as e:
logger.exception("computer_use %s failed", action)
return json.dumps({"error": f"{action} failed: {e}"})
def _request_approval(action: str, args: Dict[str, Any]) -> Optional[str]:
"""Return None if approved, or a JSON error string if denied."""
global _session_auto_approve, _always_allow
if _session_auto_approve:
return None
if action in _always_allow:
return None
cb = _approval_callback
if cb is None:
# No CLI approval wired — default allow. Gateway approval is handled
# one layer out via the normal tool-approval infra.
return None
summary = _summarize_action(action, args)
try:
verdict = cb(action, args, summary)
except Exception as e:
logger.warning("approval callback failed: %s", e)
verdict = "deny"
if verdict == "approve_once":
return None
if verdict == "approve_session" or verdict == "always_approve":
_always_allow.add(action)
if verdict == "always_approve":
_session_auto_approve = True
return None
return json.dumps({"error": "denied by user", "action": action})
def _summarize_action(action: str, args: Dict[str, Any]) -> str:
if action in {"click", "double_click", "right_click", "middle_click"}:
if args.get("element") is not None:
return f"{action} element #{args['element']}"
coord = args.get("coordinate")
if coord:
return f"{action} at {tuple(coord)}"
return action
if action == "drag":
src = args.get("from_element") or args.get("from_coordinate")
dst = args.get("to_element") or args.get("to_coordinate")
return f"drag {src}{dst}"
if action == "scroll":
return f"scroll {args.get('direction', '?')} x{args.get('amount', 3)}"
if action == "type":
text = args.get("text", "")
return f"type {text[:60]!r}" + ("..." if len(text) > 60 else "")
if action == "key":
return f"key {args.get('keys', '')!r}"
if action == "focus_app":
return f"focus {args.get('app', '')!r}" + (" (raise)" if args.get("raise_window") else "")
return action
def _dispatch(backend: ComputerUseBackend, action: str, args: Dict[str, Any]) -> Any:
capture_after = bool(args.get("capture_after"))
if action == "capture":
mode = str(args.get("mode", "som"))
if mode not in {"som", "vision", "ax"}:
return json.dumps({"error": f"bad mode {mode!r}; use som|vision|ax"})
cap = backend.capture(mode=mode, app=args.get("app"))
return _capture_response(cap, max_elements=_coerce_max_elements(args.get("max_elements")))
if action == "wait":
seconds = float(args.get("seconds", 1.0))
res = backend.wait(seconds)
return _text_response(res)
if action == "list_apps":
apps = backend.list_apps()
return json.dumps({"apps": apps, "count": len(apps)})
if action == "focus_app":
app = args.get("app")
if not app:
return json.dumps({"error": "focus_app requires `app`"})
res = backend.focus_app(app, raise_window=bool(args.get("raise_window")))
return _maybe_follow_capture(backend, res, capture_after)
if action in {"click", "double_click", "right_click", "middle_click"}:
button = args.get("button")
click_count = 1
if action == "double_click":
click_count = 2
elif action == "right_click":
button = "right"
elif action == "middle_click":
button = "middle"
else:
button = button or "left"
element = args.get("element")
coord = args.get("coordinate") or (None, None)
x, y = (coord[0], coord[1]) if coord and coord[0] is not None else (None, None)
res = backend.click(
element=element if element is not None else None,
x=x, y=y, button=button or "left", click_count=click_count,
modifiers=args.get("modifiers"),
)
return _maybe_follow_capture(backend, res, capture_after)
if action == "drag":
has_elements = args.get("from_element") is not None and args.get("to_element") is not None
has_coords = args.get("from_coordinate") and args.get("to_coordinate")
if not has_elements and not has_coords:
return json.dumps({
"error": "drag requires from_coordinate/to_coordinate or from_element/to_element",
})
res = backend.drag(
from_element=args.get("from_element"),
to_element=args.get("to_element"),
from_xy=tuple(args["from_coordinate"]) if args.get("from_coordinate") else None,
to_xy=tuple(args["to_coordinate"]) if args.get("to_coordinate") else None,
button=args.get("button", "left"),
modifiers=args.get("modifiers"),
)
return _maybe_follow_capture(backend, res, capture_after)
if action == "scroll":
coord = args.get("coordinate") or (None, None)
res = backend.scroll(
direction=args.get("direction", "down"),
amount=int(args.get("amount", 3)),
element=args.get("element"),
x=coord[0] if coord and coord[0] is not None else None,
y=coord[1] if coord and coord[1] is not None else None,
modifiers=args.get("modifiers"),
)
return _maybe_follow_capture(backend, res, capture_after)
if action == "type":
res = backend.type_text(args.get("text", ""))
return _maybe_follow_capture(backend, res, capture_after)
if action == "key":
res = backend.key(args.get("keys", ""))
return _maybe_follow_capture(backend, res, capture_after)
if action == "set_value":
value = args.get("value")
if value is None:
return json.dumps({"error": "set_value requires `value`"})
res = backend.set_value(value=str(value), element=args.get("element"))
return _maybe_follow_capture(backend, res, capture_after)
return json.dumps({"error": f"unknown action {action!r}"})
# ---------------------------------------------------------------------------
# Response shaping
# ---------------------------------------------------------------------------
def _text_response(res: ActionResult) -> str:
payload: Dict[str, Any] = {"ok": res.ok, "action": res.action}
if res.message:
payload["message"] = res.message
if res.meta:
payload["meta"] = res.meta
return json.dumps(payload)
# Default cap for the AX `elements` array returned by capture. Dense UIs
# (Electron apps, Obsidian, JetBrains IDEs) can publish 500+ AX nodes, which
# can exhaust session context after a single capture. The model-facing
# `max_elements` argument lets callers raise this when they need the full tree.
_DEFAULT_MAX_ELEMENTS = 100
# Hard upper bound on caller-supplied `max_elements`. Without this, a tool
# call passing a very large integer would silently disable the safeguard and
# reintroduce the original unbounded behavior.
_MAX_ALLOWED_MAX_ELEMENTS = 1000
def _coerce_max_elements(value: Any) -> int:
"""Validate the caller-supplied ``max_elements``.
Falls back to :data:`_DEFAULT_MAX_ELEMENTS` for missing / non-integer /
sub-1 inputs so the cap can never be silently disabled by a malformed
tool-call argument. Clamps oversized values to
:data:`_MAX_ALLOWED_MAX_ELEMENTS` so a caller cannot bypass the
safeguard by passing a very large integer.
"""
if value is None:
return _DEFAULT_MAX_ELEMENTS
try:
n = int(value)
except (TypeError, ValueError):
return _DEFAULT_MAX_ELEMENTS
if n < 1:
return _DEFAULT_MAX_ELEMENTS
if n > _MAX_ALLOWED_MAX_ELEMENTS:
return _MAX_ALLOWED_MAX_ELEMENTS
return n
def _capture_response(cap: CaptureResult, max_elements: int = _DEFAULT_MAX_ELEMENTS) -> Any:
total_elements = len(cap.elements)
visible_elements = cap.elements[:max_elements]
truncated_elements = max(0, total_elements - len(visible_elements))
# Index only what's actually surfaced in the response — otherwise the
# human-readable summary references element indices the model cannot
# find in the JSON `elements` array (e.g. max_elements=10 vs the default
# 40-line index window).
element_index = _format_elements(visible_elements)
summary_lines = [
f"capture mode={cap.mode} {cap.width}x{cap.height}"
+ (f" app={cap.app}" if cap.app else "")
+ (f" window={cap.window_title!r}" if cap.window_title else ""),
f"{total_elements} interactable element(s):",
]
if element_index:
summary_lines.extend(element_index)
# Multimodal and AX paths both reference `summary`; build it once up-front
# so the aux-vision routing branch (which fires before either path is
# selected) has a valid value to hand to _route_capture_through_aux_vision.
# The AX path appends the "truncated to N of M" note to summary_lines
# below and rebuilds; the multimodal path keeps this version untouched.
summary = "\n".join(summary_lines)
if cap.png_b64 and cap.mode != "ax":
# Decide whether to hand the screenshot to the auxiliary.vision
# pipeline (text-only result) or keep the multimodal envelope (main
# model handles vision natively). Issue #24015: previously the
# multimodal envelope was returned unconditionally, so non-vision
# main models tripped HTTP 404 / 400 at the provider boundary even
# when auxiliary.vision was explicitly configured to handle this.
if _should_route_through_aux_vision():
routed = _route_capture_through_aux_vision(cap, summary)
if routed is not None:
return routed
# Aux routing was requested but failed (no vision client, aux
# call raised, etc.). Fall through to the multimodal envelope —
# better to surface a tool-result error from the main model
# than to silently drop the screenshot entirely.
# Detect actual image format from base64 magic bytes so the MIME type
# matches what the data contains (cua-driver may return JPEG or PNG).
# JPEG: base64 starts with /9j/ PNG: starts with iVBOR
_b64_prefix = cap.png_b64[:8]
_mime = "image/jpeg" if _b64_prefix.startswith("/9j/") else "image/png"
# The multimodal response carries the screenshot, not the AX
# elements array, so a "response truncated to N of M elements"
# note would be inaccurate — skip it on this branch.
return {
"_multimodal": True,
"content": [
{"type": "text", "text": summary},
{"type": "image_url",
"image_url": {"url": f"data:{_mime};base64,{cap.png_b64}"}},
],
"text_summary": summary,
"meta": {"mode": cap.mode, "width": cap.width, "height": cap.height,
"elements": total_elements, "png_bytes": cap.png_bytes_len},
}
# AX-only (or image-missing fallback): text path actually carries the
# `elements` array, so the truncation note applies here.
if truncated_elements:
summary_lines.append(
f" (response truncated to {len(visible_elements)} of {total_elements} elements; "
f"raise max_elements or pass app= to narrow)"
)
summary = "\n".join(summary_lines)
payload: Dict[str, Any] = {
"mode": cap.mode,
"width": cap.width,
"height": cap.height,
"app": cap.app,
"window_title": cap.window_title,
"elements": [_element_to_dict(e) for e in visible_elements],
"total_elements": total_elements,
"summary": summary,
}
if truncated_elements:
payload["truncated_elements"] = truncated_elements
return json.dumps(payload)
# ---------------------------------------------------------------------------
# auxiliary.vision routing for captured screenshots (#24015)
# ---------------------------------------------------------------------------
def _should_route_through_aux_vision() -> bool:
"""Return True when ``_capture_response`` should hand the PNG to aux vision.
Reads the active main provider/model and the loaded config and asks the
routing helper. Any failure (config import, runtime override missing,
etc.) returns False so the existing multimodal envelope continues to be
returned — fail open on the routing decision so a broken config can
never silently drop the screenshot for vision-capable main models.
"""
try:
from agent.auxiliary_client import _read_main_model, _read_main_provider
from hermes_cli.config import load_config
from tools.computer_use.vision_routing import (
should_route_capture_to_aux_vision,
)
except Exception as exc: # pragma: no cover - defensive
logger.debug("computer_use: aux-vision routing import failed: %s", exc)
return False
try:
provider = _read_main_provider()
model = _read_main_model()
cfg = load_config()
except Exception as exc: # pragma: no cover - defensive
logger.debug("computer_use: aux-vision routing config read failed: %s", exc)
return False
try:
return bool(should_route_capture_to_aux_vision(provider, model, cfg))
except Exception as exc: # pragma: no cover - defensive
logger.debug("computer_use: aux-vision routing decision failed: %s", exc)
return False
def _route_capture_through_aux_vision(
cap: CaptureResult,
summary: str,
) -> Optional[str]:
"""Pre-analyse the captured PNG via ``vision_analyze`` and return a text result.
The captured base64 PNG is materialised to ``$HERMES_HOME/cache/vision/``
and handed to ``vision_analyze_tool`` with a generic describe prompt.
The resulting text description is merged into the existing AX/SOM
summary so the main model receives a single text payload that mentions
every interactable element AND a description of what the screenshot
looked like.
Returns:
A JSON-encoded text response on success.
``None`` on failure (caller falls back to the multimodal envelope).
"""
if not cap.png_b64:
return None
try:
import base64 as _base64
import os as _os
import uuid as _uuid
from hermes_constants import get_hermes_dir
from model_tools import _run_async
from tools.vision_tools import vision_analyze_tool
except Exception as exc: # pragma: no cover - defensive
logger.debug("computer_use: aux-vision import failed: %s", exc)
return None
temp_image_path = None
try:
try:
raw = _base64.b64decode(cap.png_b64, validate=False)
except Exception as exc:
logger.debug("computer_use: failed to decode capture base64: %s", exc)
return None
# Pick an extension that matches the on-disk bytes so vision_analyze's
# MIME sniffing returns the right content-type.
ext = ".jpg" if cap.png_b64[:8].startswith("/9j/") else ".png"
cache_dir = get_hermes_dir("cache/vision", "temp_vision_images")
temp_image_path = cache_dir / f"computer_use_{_uuid.uuid4().hex}{ext}"
temp_image_path.write_bytes(raw)
prompt = (
"Describe what is visible in this macOS application screenshot in "
"concise but specific terms. Mention the app name and window "
"title if visible, the overall layout, any labelled buttons, "
"menus or text fields, and any prominent text content the user "
"would need to know about. Do not invent details that are not "
"actually visible.\n\n"
f"AX/SOM index for cross-reference:\n{summary}"
)
result_json = _run_async(
vision_analyze_tool(str(temp_image_path), prompt)
)
except Exception as exc:
logger.warning(
"computer_use: auxiliary.vision pre-analysis failed (%s); "
"falling back to native multimodal envelope",
exc,
)
return None
finally:
if temp_image_path is not None:
try:
_os.unlink(str(temp_image_path))
except Exception:
pass
analysis_text = ""
if isinstance(result_json, str):
try:
parsed = json.loads(result_json)
if isinstance(parsed, dict):
analysis_text = str(parsed.get("analysis") or "").strip()
except (TypeError, json.JSONDecodeError):
analysis_text = result_json.strip()
if not analysis_text:
return None
return json.dumps({
"mode": cap.mode,
"width": cap.width,
"height": cap.height,
"app": cap.app,
"window_title": cap.window_title,
"elements": [_element_to_dict(e) for e in cap.elements],
"summary": summary,
"vision_analysis": analysis_text,
"vision_analysis_routed_via": "auxiliary.vision",
})
def _maybe_follow_capture(
backend: ComputerUseBackend, res: ActionResult, do_capture: bool,
) -> Any:
if not do_capture:
return _text_response(res)
# Skip the follow-up capture when the action itself failed: showing a
# normal-looking screenshot after a failure misleads the model into thinking
# the action succeeded. Return the error text instead.
if not res.ok:
return _text_response(res)
try:
# Preserve the app context established by the preceding capture/focus_app so
# that capture_after=True re-captures the same app rather than the frontmost
# window (which may have changed if the action caused a focus shift).
last_app = getattr(backend, "_last_app", None)
cap = backend.capture(mode="som", app=last_app)
except Exception as e:
logger.warning("follow-up capture failed: %s", e)
return _text_response(res)
# Combine action summary with the capture.
resp = _capture_response(cap)
if isinstance(resp, dict) and resp.get("_multimodal"):
prefix = f"[{res.action}] ok={res.ok}" + (f"{res.message}" if res.message else "")
resp["content"][0]["text"] = prefix + "\n\n" + resp["content"][0]["text"]
resp["text_summary"] = prefix + "\n\n" + resp["text_summary"]
return resp
# Fallback: action + text capture merged.
try:
data = json.loads(resp)
except (TypeError, json.JSONDecodeError):
data = {"capture": resp}
data["action"] = res.action
data["ok"] = res.ok
if res.message:
data["message"] = res.message
return json.dumps(data)
def _format_elements(elements: List[UIElement], max_lines: int = 40) -> List[str]:
out: List[str] = []
for e in elements[:max_lines]:
label = e.label.replace("\n", " ")[:60]
out.append(f" #{e.index} {e.role} {label!r} @ {e.bounds}"
+ (f" [{e.app}]" if e.app else ""))
if len(elements) > max_lines:
out.append(f" ... +{len(elements) - max_lines} more (call capture with app= to narrow)")
return out
def _element_to_dict(e: UIElement) -> Dict[str, Any]:
return {
"index": e.index,
"role": e.role,
"label": e.label,
"bounds": list(e.bounds),
"app": e.app,
}
# ---------------------------------------------------------------------------
# Availability check (used by the tool registry check_fn)
# ---------------------------------------------------------------------------
def check_computer_use_requirements() -> bool:
"""Return True iff computer_use can run on this host.
Conditions: macOS + cua-driver binary installed (or override via env).
"""
if sys.platform != "darwin":
return False
from tools.computer_use.cua_backend import cua_driver_binary_available
return cua_driver_binary_available()
def get_computer_use_schema() -> Dict[str, Any]:
from tools.computer_use.schema import COMPUTER_USE_SCHEMA
return COMPUTER_USE_SCHEMA