diff --git a/agent/display.py b/agent/display.py index 474595d76c..906ca372ee 100644 --- a/agent/display.py +++ b/agent/display.py @@ -827,6 +827,10 @@ def _detect_tool_failure(tool_name: str, result: str | None) -> tuple[bool, str] return True, " [full]" # Generic heuristic for non-terminal tools + # Multimodal tool results (dicts with _multimodal=True) are not strings — + # treat them as successes since failures would be JSON-encoded strings. + if not isinstance(result, str): + return False, "" lower = result[:500].lower() if '"error"' in lower or '"failed"' in lower or result.startswith("Error"): return True, " [error]" diff --git a/run_agent.py b/run_agent.py index c7355c90d7..01c5ca8bc5 100644 --- a/run_agent.py +++ b/run_agent.py @@ -9479,9 +9479,15 @@ class AIAgent: logger.error("handle_function_call raised for %s: %s", function_name, tool_error, exc_info=True) tool_duration = time.time() - tool_start_time - result_preview = function_result if self.verbose_logging else ( - function_result[:200] if len(function_result) > 200 else function_result - ) + if isinstance(function_result, str): + result_preview = function_result if self.verbose_logging else ( + function_result[:200] if len(function_result) > 200 else function_result + ) + _result_len = len(function_result) + else: + # Multimodal dict result (_multimodal=True) — not sliceable as string + result_preview = function_result + _result_len = len(str(function_result)) # Log tool errors to the persistent error log so [error] tags # in the UI always have a corresponding detailed entry on disk. @@ -9489,7 +9495,7 @@ class AIAgent: if _is_error_result: logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview) else: - logger.info("tool %s completed (%.2fs, %d chars)", function_name, tool_duration, len(function_result)) + logger.info("tool %s completed (%.2fs, %d chars)", function_name, tool_duration, _result_len) if self.tool_progress_callback: try: @@ -9547,7 +9553,8 @@ class AIAgent: print(f" ✅ Tool {i} completed in {tool_duration:.2f}s") print(self._wrap_verbose("Result: ", function_result)) else: - response_preview = function_result[:self.log_prefix_chars] + "..." if len(function_result) > self.log_prefix_chars else function_result + _fr_str = function_result if isinstance(function_result, str) else str(function_result) + response_preview = _fr_str[:self.log_prefix_chars] + "..." if len(_fr_str) > self.log_prefix_chars else _fr_str print(f" ✅ Tool {i} completed in {tool_duration:.2f}s - {response_preview}") if self._interrupt_requested and i < len(assistant_message.tool_calls): diff --git a/tools/computer_use/cua_backend.py b/tools/computer_use/cua_backend.py index cdae02f555..52f2b551b9 100644 --- a/tools/computer_use/cua_backend.py +++ b/tools/computer_use/cua_backend.py @@ -23,6 +23,7 @@ import json import logging import os import platform +import re import shutil import subprocess import sys @@ -44,16 +45,25 @@ logger = logging.getLogger(__name__) # Version pinning # --------------------------------------------------------------------------- -# The SkyLight SPIs cua-driver calls are private. We pin a known-good version -# so OS updates don't silently change the surface area our agent depends on. -# Users on newer macOS releases may need to bump this and re-run -# `hermes tools` to take the updated binary. PINNED_CUA_DRIVER_VERSION = os.environ.get("HERMES_CUA_DRIVER_VERSION", "0.5.0") -# Env var override for the cua-driver binary path (mostly for tests / CI). _CUA_DRIVER_CMD = os.environ.get("HERMES_CUA_DRIVER_CMD", "cua-driver") _CUA_DRIVER_ARGS = ["mcp"] # stdio MCP transport +# Regex to parse list_windows text output lines: +# "- AppName (pid 12345) "Title" [window_id: 67890]" +_WINDOW_LINE_RE = re.compile( + r'^-\s+(.+?)\s+\(pid\s+(\d+)\)\s+.*\[window_id:\s+(\d+)\]', + re.MULTILINE, +) + +# Regex to parse element lines from get_window_state AX tree markdown: +# " - [N] AXRole "label"" +_ELEMENT_LINE_RE = re.compile( + r'^\s*-\s+\[(\d+)\]\s+(\w+)(?:\s+"([^"]*)")?', + re.MULTILINE, +) + # --------------------------------------------------------------------------- # Helpers @@ -81,6 +91,61 @@ def cua_driver_install_hint() -> str: ) +def _parse_windows_from_text(text: str) -> List[Dict[str, Any]]: + """Parse window records from list_windows text output.""" + windows = [] + for m in _WINDOW_LINE_RE.finditer(text): + windows.append({ + "app_name": m.group(1).strip(), + "pid": int(m.group(2)), + "window_id": int(m.group(3)), + "off_screen": "[off-screen]" in m.group(0), + }) + return windows + + +def _parse_elements_from_tree(markdown: str) -> List[UIElement]: + """Parse UIElement list from get_window_state AX tree markdown.""" + elements = [] + for m in _ELEMENT_LINE_RE.finditer(markdown): + elements.append(UIElement( + index=int(m.group(1)), + role=m.group(2), + label=m.group(3) or "", + bounds=(0, 0, 0, 0), + )) + return elements + + +def _split_tree_text(full_text: str) -> Tuple[str, str]: + """Split get_window_state text into (summary_line, tree_markdown).""" + lines = full_text.split("\n", 1) + summary = lines[0] + tree = lines[1] if len(lines) > 1 else "" + return summary, tree + + +def _parse_key_combo(keys: str) -> Tuple[Optional[str], List[str]]: + """Parse a key string like 'cmd+s' into (key, modifiers). + + Returns (key, modifiers) where key is the non-modifier key and modifiers + is a list of modifier names (cmd, shift, option, ctrl). + """ + MODIFIER_NAMES = {"cmd", "command", "shift", "option", "alt", "ctrl", "control", "fn"} + KEY_ALIASES = {"command": "cmd", "alt": "option", "control": "ctrl"} + + parts = [p.strip().lower() for p in re.split(r'[+\-]', keys) if p.strip()] + modifiers = [] + key = None + for part in parts: + normalized = KEY_ALIASES.get(part, part) + if normalized in MODIFIER_NAMES: + modifiers.append(normalized) + else: + key = part # last non-modifier wins + return key, modifiers + + # --------------------------------------------------------------------------- # Asyncio bridge — one long-lived loop on a background thread # --------------------------------------------------------------------------- @@ -139,8 +204,8 @@ class _CuaDriverSession: def __init__(self, bridge: _AsyncBridge) -> None: self._bridge = bridge - self._session = None # mcp.ClientSession - self._exit_stack = None # AsyncExitStack for stdio_client + ClientSession + self._session = None + self._exit_stack = None self._lock = threading.Lock() self._started = False @@ -159,7 +224,7 @@ class _CuaDriverSession: params = StdioServerParameters( command=_CUA_DRIVER_CMD, args=_CUA_DRIVER_ARGS, - env={**os.environ}, # cua-driver needs HOME / TMPDIR + env={**os.environ}, ) stack = AsyncExitStack() read, write = await stack.enter_async_context(stdio_client(params)) @@ -172,7 +237,7 @@ class _CuaDriverSession: if self._exit_stack is not None: try: await self._exit_stack.aclose() - except Exception as e: # pragma: no cover + except Exception as e: logger.warning("cua-driver shutdown error: %s", e) self._exit_stack = None self._session = None @@ -194,10 +259,8 @@ class _CuaDriverSession: finally: self._started = False - # ── Tool invocation ────────────────────────────────────────────── async def _call_tool_async(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]: result = await self._session.call_tool(name, args) - # Normalize: mcp returns content parts. We want a dict. return _extract_tool_result(result) def call_tool(self, name: str, args: Dict[str, Any], timeout: float = 30.0) -> Dict[str, Any]: @@ -208,12 +271,22 @@ class _CuaDriverSession: def _extract_tool_result(mcp_result: Any) -> Dict[str, Any]: """Convert an mcp CallToolResult into a plain dict. - cua-driver returns a mix of json-text parts and image parts. We flatten: - {"data": , "images": [b64, ...], "isError": bool} + cua-driver returns a mix of text parts, image parts, and structuredContent. + We flatten into: + { + "data": , + "images": [b64, ...], + "structuredContent": , + "isError": bool, + } + structuredContent is populated from the MCP result's structuredContent field + (MCP spec §2024-11-05+) and takes precedence for structured data like + list_windows window arrays. """ data: Any = None images: List[str] = [] is_error = bool(getattr(mcp_result, "isError", False)) + structured: Optional[Dict] = getattr(mcp_result, "structuredContent", None) or None text_chunks: List[str] = [] for part in getattr(mcp_result, "content", []) or []: ptype = getattr(part, "type", None) @@ -229,7 +302,7 @@ def _extract_tool_result(mcp_result: Any) -> Dict[str, Any]: data = json.loads(joined) if joined.strip().startswith(("{", "[")) else joined except json.JSONDecodeError: data = joined - return {"data": data, "images": images, "isError": is_error} + return {"data": data, "images": images, "structuredContent": structured, "isError": is_error} # --------------------------------------------------------------------------- @@ -242,6 +315,9 @@ class CuaDriverBackend(ComputerUseBackend): def __init__(self) -> None: self._bridge = _AsyncBridge() self._session = _CuaDriverSession(self._bridge) + # Sticky context — updated by capture(), used by action tools. + self._active_pid: Optional[int] = None + self._active_window_id: Optional[int] = None # ── Lifecycle ────────────────────────────────────────────────── def start(self) -> None: @@ -260,20 +336,92 @@ class CuaDriverBackend(ComputerUseBackend): # ── Capture ──────────────────────────────────────────────────── def capture(self, mode: str = "som", app: Optional[str] = None) -> CaptureResult: - args: Dict[str, Any] = {"mode": mode} - if app: - args["app"] = app - out = self._session.call_tool("capture", args) - data = out["data"] if isinstance(out["data"], dict) else {} - width = int(data.get("width", 0)) - height = int(data.get("height", 0)) - elements_raw = data.get("elements", []) or [] - elements = [_parse_element(e) for e in elements_raw if isinstance(e, dict)] + """Capture the frontmost on-screen window (optionally filtered by app name). + Maps hermes `capture(mode, app)` → cua-driver `list_windows` + + `get_window_state` (ax/som) or `screenshot` (vision). + """ + # Step 1: enumerate on-screen windows to find target pid/window_id. + lw_out = self._session.call_tool("list_windows", {"on_screen_only": True}) + + # Prefer structuredContent.windows (MCP 2024-11-05+); fall back to + # text-line parsing for older cua-driver builds. + sc = lw_out.get("structuredContent") or {} + raw_windows = sc.get("windows") if sc else None + if raw_windows: + windows = [ + { + "app_name": w.get("app_name", ""), + "pid": int(w["pid"]), + "window_id": int(w["window_id"]), + "off_screen": not w.get("is_on_screen", True), + "title": w.get("title", ""), + "z_index": w.get("z_index", 0), + } + for w in raw_windows + ] + # Sort by z_index descending (lowest z_index = frontmost on macOS). + windows.sort(key=lambda w: w["z_index"]) + else: + raw_text = lw_out["data"] if isinstance(lw_out["data"], str) else "" + windows = _parse_windows_from_text(raw_text) + + if not windows: + return CaptureResult(mode=mode, width=0, height=0, png_b64=None, + elements=[], app="", window_title="", png_bytes_len=0) + + # Filter by app name (case-insensitive substring) if requested. + if app: + app_lower = app.lower() + filtered = [w for w in windows if app_lower in w["app_name"].lower()] + if filtered: + windows = filtered + + # Pick first on-screen window (sorted by z_index / z-order above). + target = next((w for w in windows if not w["off_screen"]), windows[0]) + self._active_pid = target["pid"] + self._active_window_id = target["window_id"] + app_name = target["app_name"] + + # Step 2: capture. png_b64: Optional[str] = None + elements: List[UIElement] = [] + width = height = 0 + window_title = "" + + if mode == "vision": + # screenshot tool: just the PNG, no AX walk. + sc_out = self._session.call_tool( + "screenshot", + {"window_id": self._active_window_id, "format": "jpeg", "quality": 85}, + ) + if sc_out["images"]: + png_b64 = sc_out["images"][0] + else: + # get_window_state: AX tree + optional screenshot. + gws_out = self._session.call_tool( + "get_window_state", + {"pid": self._active_pid, "window_id": self._active_window_id}, + ) + text = gws_out["data"] if isinstance(gws_out["data"], str) else "" + summary, tree = _split_tree_text(text) + + # Parse element count from summary e.g. "✅ AppName — 42 elements, turn 3..." + m = re.search(r'(\d+)\s+elements?', summary) + if tree and not gws_out["images"]: + # ax mode — no screenshot + elements = _parse_elements_from_tree(tree) + elif gws_out["images"]: + png_b64 = gws_out["images"][0] + elements = _parse_elements_from_tree(tree) + + # Extract window title from the AX tree first AXWindow line. + wt = re.search(r'AXWindow\s+"([^"]+)"', tree) + if wt: + window_title = wt.group(1) + png_bytes_len = 0 - if out["images"]: - png_b64 = out["images"][0] + if png_b64: try: png_bytes_len = len(base64.b64decode(png_b64, validate=False)) except Exception: @@ -285,8 +433,8 @@ class CuaDriverBackend(ComputerUseBackend): height=height, png_b64=png_b64, elements=elements, - app=str(data.get("app", "") or ""), - window_title=str(data.get("window_title", "") or ""), + app=app_name, + window_title=window_title, png_bytes_len=png_bytes_len, ) @@ -301,18 +449,36 @@ class CuaDriverBackend(ComputerUseBackend): click_count: int = 1, modifiers: Optional[List[str]] = None, ) -> ActionResult: - args: Dict[str, Any] = {"button": button, "clickCount": click_count} - if element is not None: - args["element"] = int(element) - elif x is not None and y is not None: - args["x"] = int(x) - args["y"] = int(y) - else: + pid = self._active_pid + if pid is None: return ActionResult(ok=False, action="click", - message="click requires element= or x/y") + message="No active window — call capture() first.") + + # Choose tool based on button and click_count. + if button == "right": + tool = "right_click" + elif click_count == 2: + tool = "double_click" + else: + tool = "click" + + args: Dict[str, Any] = {"pid": pid} + if element is not None: + if self._active_window_id is None: + return ActionResult(ok=False, action=tool, + message="No active window_id for element_index click.") + args["element_index"] = element + args["window_id"] = self._active_window_id + elif x is not None and y is not None: + args["x"] = x + args["y"] = y + else: + return ActionResult(ok=False, action=tool, + message="click requires element= or x/y.") if modifiers: - args["modifiers"] = modifiers - return self._action("click", args) + args["modifier"] = modifiers + + return self._action(tool, args) def drag( self, @@ -324,22 +490,9 @@ class CuaDriverBackend(ComputerUseBackend): button: str = "left", modifiers: Optional[List[str]] = None, ) -> ActionResult: - args: Dict[str, Any] = {"button": button} - if from_element is not None: - args["fromElement"] = int(from_element) - elif from_xy is not None: - args["fromX"], args["fromY"] = int(from_xy[0]), int(from_xy[1]) - else: - return ActionResult(ok=False, action="drag", message="drag requires a source") - if to_element is not None: - args["toElement"] = int(to_element) - elif to_xy is not None: - args["toX"], args["toY"] = int(to_xy[0]), int(to_xy[1]) - else: - return ActionResult(ok=False, action="drag", message="drag requires a destination") - if modifiers: - args["modifiers"] = modifiers - return self._action("drag", args) + # cua-driver does not expose a drag tool. + return ActionResult(ok=False, action="drag", + message="drag is not supported by the cua-driver backend.") def scroll( self, @@ -351,33 +504,132 @@ class CuaDriverBackend(ComputerUseBackend): y: Optional[int] = None, modifiers: Optional[List[str]] = None, ) -> ActionResult: - args: Dict[str, Any] = {"direction": direction, "amount": int(amount)} - if element is not None: - args["element"] = int(element) + pid = self._active_pid + if pid is None: + return ActionResult(ok=False, action="scroll", + message="No active window — call capture() first.") + args: Dict[str, Any] = { + "pid": pid, + "direction": direction, + "amount": max(1, min(50, amount)), + } + if element is not None and self._active_window_id is not None: + args["element_index"] = element + args["window_id"] = self._active_window_id elif x is not None and y is not None: - args["x"] = int(x) - args["y"] = int(y) - if modifiers: - args["modifiers"] = modifiers + args["x"] = x + args["y"] = y return self._action("scroll", args) # ── Keyboard ─────────────────────────────────────────────────── def type_text(self, text: str) -> ActionResult: - return self._action("type", {"text": text}) + pid = self._active_pid + if pid is None: + return ActionResult(ok=False, action="type_text", + message="No active window — call capture() first.") + # Safari WebKit AXTextField does not accept AX attribute writes (type_text), + # so use type_text_chars which synthesises individual key events instead. + # This works universally across all macOS apps in background mode. + return self._action("type_text_chars", {"pid": pid, "text": text}) def key(self, keys: str) -> ActionResult: - return self._action("key", {"keys": keys}) + pid = self._active_pid + if pid is None: + return ActionResult(ok=False, action="key", + message="No active window — call capture() first.") + + key_name, modifiers = _parse_key_combo(keys) + if not key_name: + return ActionResult(ok=False, action="key", + message=f"Could not parse key from '{keys}'.") + + if modifiers: + # hotkey requires at least one modifier + one key. + return self._action("hotkey", {"pid": pid, "keys": modifiers + [key_name]}) + else: + return self._action("press_key", {"pid": pid, "key": key_name}) + + # ── Value setter ──────────────────────────────────────────────── + def set_value(self, value: str, element: Optional[int] = None) -> ActionResult: + """Set a value on an element. Handles AXPopUpButton selects natively.""" + pid = self._active_pid + window_id = self._active_window_id + if pid is None or window_id is None: + return ActionResult(ok=False, action="set_value", + message="No active window — call capture() first.") + if element is None: + return ActionResult(ok=False, action="set_value", + message="set_value requires element= (element index).") + args: Dict[str, Any] = { + "pid": pid, + "window_id": window_id, + "element_index": element, + "value": value, + } + return self._action("set_value", args) # ── Introspection ────────────────────────────────────────────── def list_apps(self) -> List[Dict[str, Any]]: - out = self._session.call_tool("listApps", {}) - data = out["data"] if isinstance(out["data"], (list, dict)) else [] + out = self._session.call_tool("list_apps", {}) + data = out["data"] + if isinstance(data, list): + return data if isinstance(data, dict): - data = data.get("apps", []) - return list(data or []) + return data.get("apps", []) + # list_apps returns plain text — parse app lines. + if isinstance(data, str): + apps = [] + for line in data.splitlines(): + m = re.search(r'(.+?)\s+\(pid\s+(\d+)\)', line) + if m: + apps.append({"name": m.group(1).strip(), "pid": int(m.group(2))}) + return apps + return [] def focus_app(self, app: str, raise_window: bool = False) -> ActionResult: - return self._action("focusApp", {"app": app, "raise": bool(raise_window)}) + """Target an app for subsequent actions without stealing system focus. + + cua-driver background-automation never needs to bring a window to the + front: capture(app=...) already selects the right window via + list_windows. We implement focus_app as a pure window-selector — + enumerate on-screen windows, find the best match for *app*, and store + its pid/window_id so that subsequent click/type calls hit the right + process. + + raise_window=True is intentionally ignored: stealing the user's focus + is exactly what this backend is designed to avoid. + """ + lw_out = self._session.call_tool("list_windows", {"on_screen_only": True}) + sc = lw_out.get("structuredContent") or {} + raw_windows = sc.get("windows") if sc else None + if raw_windows: + windows = [ + { + "app_name": w.get("app_name", ""), + "pid": int(w["pid"]), + "window_id": int(w["window_id"]), + "z_index": w.get("z_index", 0), + } + for w in raw_windows + ] + windows.sort(key=lambda w: w["z_index"]) + else: + raw_text = lw_out["data"] if isinstance(lw_out["data"], str) else "" + windows = _parse_windows_from_text(raw_text) + + app_lower = app.lower() + matched = [w for w in windows if app_lower in w["app_name"].lower()] + target = matched[0] if matched else (windows[0] if windows else None) + if target: + self._active_pid = target["pid"] + self._active_window_id = target["window_id"] + return ActionResult( + ok=True, action="focus_app", + message=f"Targeted {target['app_name']} (pid {self._active_pid}, " + f"window {self._active_window_id}) without raising window.", + ) + return ActionResult(ok=False, action="focus_app", + message=f"No on-screen window found for app '{app}'.") # ── Internal ─────────────────────────────────────────────────── def _action(self, name: str, args: Dict[str, Any]) -> ActionResult: diff --git a/tools/computer_use/schema.py b/tools/computer_use/schema.py index 76248f17a1..d8928d0dc5 100644 --- a/tools/computer_use/schema.py +++ b/tools/computer_use/schema.py @@ -40,6 +40,7 @@ COMPUTER_USE_SCHEMA: Dict[str, Any] = { "scroll", "type", "key", + "set_value", "wait", "list_apps", "focus_app", @@ -47,7 +48,9 @@ COMPUTER_USE_SCHEMA: Dict[str, Any] = { "description": ( "Which action to perform. `capture` is free (no side " "effects). All other actions require approval unless " - "auto-approved." + "auto-approved. Use `set_value` for select/popup elements " + "and sliders — it selects the matching option directly " + "without opening the native menu (no focus steal)." ), }, # ── capture ──────────────────────────────────────────── @@ -132,6 +135,16 @@ COMPUTER_USE_SCHEMA: Dict[str, Any] = { "type": "integer", "description": "Scroll wheel ticks. Default 3.", }, + # ── set_value ────────────────────────────────────────── + "value": { + "type": "string", + "description": ( + "For action='set_value': the value to set on the element. " + "For AXPopUpButton / select dropdowns, pass the option's " + "display label (e.g. 'Blue'). For sliders and other " + "AXValue-settable elements, pass the numeric or string value." + ), + }, # ── type / key / wait ────────────────────────────────── "text": { "type": "string", diff --git a/tools/computer_use/tool.py b/tools/computer_use/tool.py index 0730e09e04..51c7656fc1 100644 --- a/tools/computer_use/tool.py +++ b/tools/computer_use/tool.py @@ -74,7 +74,7 @@ _SAFE_ACTIONS = frozenset({"capture", "wait", "list_apps"}) # Actions that mutate user-visible state. Go through approval. _DESTRUCTIVE_ACTIONS = frozenset({ "click", "double_click", "right_click", "middle_click", - "drag", "scroll", "type", "key", "focus_app", + "drag", "scroll", "type", "key", "set_value", "focus_app", }) # Hard-blocked key combinations. Mirrored from #4562 — these are destructive @@ -387,6 +387,13 @@ def _dispatch(backend: ComputerUseBackend, action: str, args: Dict[str, Any]) -> res = backend.key(args.get("keys", "")) return _maybe_follow_capture(backend, res, capture_after) + if action == "set_value": + value = args.get("value") + if value is None: + return json.dumps({"error": "set_value requires `value`"}) + res = backend.set_value(value=str(value), element=args.get("element")) + return _maybe_follow_capture(backend, res, capture_after) + return json.dumps({"error": f"unknown action {action!r}"}) @@ -416,12 +423,17 @@ def _capture_response(cap: CaptureResult) -> Any: summary = "\n".join(summary_lines) if cap.png_b64 and cap.mode != "ax": + # Detect actual image format from base64 magic bytes so the MIME type + # matches what the data contains (cua-driver may return JPEG or PNG). + # JPEG: base64 starts with /9j/ PNG: starts with iVBOR + _b64_prefix = cap.png_b64[:8] + _mime = "image/jpeg" if _b64_prefix.startswith("/9j/") else "image/png" return { "_multimodal": True, "content": [ {"type": "text", "text": summary}, {"type": "image_url", - "image_url": {"url": f"data:image/png;base64,{cap.png_b64}"}}, + "image_url": {"url": f"data:{_mime};base64,{cap.png_b64}"}}, ], "text_summary": summary, "meta": {"mode": cap.mode, "width": cap.width, "height": cap.height,