diff --git a/tools/code_execution_tool.py b/tools/code_execution_tool.py index 2dfdc989a..ff5c7f7fe 100644 --- a/tools/code_execution_tool.py +++ b/tools/code_execution_tool.py @@ -5,18 +5,30 @@ Code Execution Tool -- Programmatic Tool Calling (PTC) Lets the LLM write a Python script that calls Hermes tools via RPC, collapsing multi-step tool chains into a single inference turn. -Architecture: - 1. Parent generates a `hermes_tools.py` stub module with RPC functions +Architecture (two transports): + + **Local backend (UDS):** + 1. Parent generates a `hermes_tools.py` stub module with UDS RPC functions 2. Parent opens a Unix domain socket and starts an RPC listener thread 3. Parent spawns a child process that runs the LLM's script - 4. When the script calls a tool function, the call travels over the UDS - back to the parent, which dispatches through handle_function_call - 5. Only the script's stdout is returned to the LLM; intermediate tool - results never enter the context window + 4. Tool calls travel over the UDS back to the parent for dispatch -Platform: Linux / macOS only (Unix domain sockets). Disabled on Windows. + **Remote backends (file-based RPC):** + 1. Parent generates `hermes_tools.py` with file-based RPC stubs + 2. Parent ships both files to the remote environment + 3. Script runs inside the terminal backend (Docker/SSH/Modal/Daytona/etc.) + 4. Tool calls are written as request files; a polling thread on the parent + reads them via execute_oneshot(), dispatches, and writes response files + 5. The script polls for response files and continues + +In both cases, only the script's stdout is returned to the LLM; intermediate +tool results never enter the context window. + +Platform: Linux / macOS only (Unix domain sockets for local). Disabled on Windows. +Remote execution additionally requires Python 3 in the terminal backend. """ +import base64 import json import logging import os @@ -114,11 +126,17 @@ _TOOL_STUBS = { } -def generate_hermes_tools_module(enabled_tools: List[str]) -> str: +def generate_hermes_tools_module(enabled_tools: List[str], + transport: str = "uds") -> str: """ Build the source code for the hermes_tools.py stub module. Only tools in both SANDBOX_ALLOWED_TOOLS and enabled_tools get stubs. + + Args: + enabled_tools: Tool names enabled in the current session. + transport: ``"uds"`` for Unix domain socket (local backend) or + ``"file"`` for file-based RPC (remote backends). """ tools_to_generate = sorted(SANDBOX_ALLOWED_TOOLS & set(enabled_tools)) @@ -135,13 +153,18 @@ def generate_hermes_tools_module(enabled_tools: List[str]) -> str: ) export_names.append(func_name) - header = '''\ -"""Auto-generated Hermes tools RPC stubs.""" -import json, os, socket, shlex, time + if transport == "file": + header = _FILE_TRANSPORT_HEADER + else: + header = _UDS_TRANSPORT_HEADER -_sock = None + return header + "\n".join(stub_functions) +# ---- Shared helpers section (embedded in both transport headers) ---------- + +_COMMON_HELPERS = '''\ + # --------------------------------------------------------------------------- # Convenience helpers (avoid common scripting pitfalls) # --------------------------------------------------------------------------- @@ -176,6 +199,17 @@ def retry(fn, max_attempts=3, delay=2): time.sleep(delay * (2 ** attempt)) raise last_err +''' + +# ---- UDS transport (local backend) --------------------------------------- + +_UDS_TRANSPORT_HEADER = '''\ +"""Auto-generated Hermes tools RPC stubs.""" +import json, os, socket, shlex, time + +_sock = None +''' + _COMMON_HELPERS + '''\ + def _connect(): global _sock if _sock is None: @@ -208,7 +242,57 @@ def _call(tool_name, args): ''' - return header + "\n".join(stub_functions) +# ---- File-based transport (remote backends) ------------------------------- + +_FILE_TRANSPORT_HEADER = '''\ +"""Auto-generated Hermes tools RPC stubs (file-based transport).""" +import json, os, shlex, time + +_RPC_DIR = os.environ.get("HERMES_RPC_DIR", "/tmp/hermes_rpc") +_seq = 0 +''' + _COMMON_HELPERS + '''\ + +def _call(tool_name, args): + """Send a tool call request via file-based RPC and wait for response.""" + global _seq + _seq += 1 + seq_str = f"{_seq:06d}" + req_file = os.path.join(_RPC_DIR, f"req_{seq_str}") + res_file = os.path.join(_RPC_DIR, f"res_{seq_str}") + + # Write request atomically (write to .tmp, then rename) + tmp = req_file + ".tmp" + with open(tmp, "w") as f: + json.dump({"tool": tool_name, "args": args, "seq": _seq}, f) + os.rename(tmp, req_file) + + # Wait for response with adaptive polling + deadline = time.monotonic() + 300 # 5-minute timeout per tool call + poll_interval = 0.05 # Start at 50ms + while not os.path.exists(res_file): + if time.monotonic() > deadline: + raise RuntimeError(f"RPC timeout: no response for {tool_name} after 300s") + time.sleep(poll_interval) + poll_interval = min(poll_interval * 1.2, 0.25) # Back off to 250ms + + with open(res_file) as f: + raw = f.read() + + # Clean up response file + try: + os.unlink(res_file) + except OSError: + pass + + result = json.loads(raw) + if isinstance(result, str): + try: + return json.loads(result) + except (json.JSONDecodeError, TypeError): + return result + return result + +''' # --------------------------------------------------------------------------- @@ -339,6 +423,443 @@ def _rpc_server_loop( logger.debug("RPC conn close error: %s", e) +# --------------------------------------------------------------------------- +# Remote execution support (file-based RPC via terminal backend) +# --------------------------------------------------------------------------- + +def _get_or_create_env(task_id: str): + """Get or create the terminal environment for *task_id*. + + Reuses the same environment (container/sandbox/SSH session) that the + terminal and file tools use, creating one if it doesn't exist yet. + Returns ``(env, env_type)`` tuple. + """ + from tools.terminal_tool import ( + _active_environments, _env_lock, _create_environment, + _get_env_config, _last_activity, _start_cleanup_thread, + _creation_locks, _creation_locks_lock, _task_env_overrides, + ) + + effective_task_id = task_id or "default" + + # Fast path: environment already exists + with _env_lock: + if effective_task_id in _active_environments: + _last_activity[effective_task_id] = time.time() + return _active_environments[effective_task_id], _get_env_config()["env_type"] + + # Slow path: create environment (same pattern as file_tools._get_file_ops) + with _creation_locks_lock: + if effective_task_id not in _creation_locks: + _creation_locks[effective_task_id] = threading.Lock() + task_lock = _creation_locks[effective_task_id] + + with task_lock: + with _env_lock: + if effective_task_id in _active_environments: + _last_activity[effective_task_id] = time.time() + return _active_environments[effective_task_id], _get_env_config()["env_type"] + + config = _get_env_config() + env_type = config["env_type"] + overrides = _task_env_overrides.get(effective_task_id, {}) + + if env_type == "docker": + image = overrides.get("docker_image") or config["docker_image"] + elif env_type == "singularity": + image = overrides.get("singularity_image") or config["singularity_image"] + elif env_type == "modal": + image = overrides.get("modal_image") or config["modal_image"] + elif env_type == "daytona": + image = overrides.get("daytona_image") or config["daytona_image"] + else: + image = "" + + cwd = overrides.get("cwd") or config["cwd"] + + container_config = None + if env_type in ("docker", "singularity", "modal", "daytona"): + container_config = { + "container_cpu": config.get("container_cpu", 1), + "container_memory": config.get("container_memory", 5120), + "container_disk": config.get("container_disk", 51200), + "container_persistent": config.get("container_persistent", True), + "docker_volumes": config.get("docker_volumes", []), + } + + ssh_config = None + if env_type == "ssh": + ssh_config = { + "host": config.get("ssh_host", ""), + "user": config.get("ssh_user", ""), + "port": config.get("ssh_port", 22), + "key": config.get("ssh_key", ""), + "persistent": config.get("ssh_persistent", False), + } + + local_config = None + if env_type == "local": + local_config = { + "persistent": config.get("local_persistent", False), + } + + logger.info("Creating new %s environment for execute_code task %s...", + env_type, effective_task_id[:8]) + env = _create_environment( + env_type=env_type, + image=image, + cwd=cwd, + timeout=config["timeout"], + ssh_config=ssh_config, + container_config=container_config, + local_config=local_config, + task_id=effective_task_id, + host_cwd=config.get("host_cwd"), + ) + + with _env_lock: + _active_environments[effective_task_id] = env + _last_activity[effective_task_id] = time.time() + + _start_cleanup_thread() + logger.info("%s environment ready for execute_code task %s", + env_type, effective_task_id[:8]) + return env, env_type + + +def _ship_file_to_remote(env, remote_path: str, content: str) -> None: + """Write *content* to *remote_path* on the remote environment. + + Uses ``echo … | base64 -d`` rather than stdin piping because some + backends (Modal) don't reliably deliver stdin_data to chained + commands. Base64 output is shell-safe ([A-Za-z0-9+/=]) so single + quotes are fine. + """ + encoded = base64.b64encode(content.encode("utf-8")).decode("ascii") + env.execute_oneshot( + f"echo '{encoded}' | base64 -d > {remote_path}", + cwd="/", + timeout=30, + ) + + +def _rpc_poll_loop( + env, + rpc_dir: str, + task_id: str, + tool_call_log: list, + tool_call_counter: list, + max_tool_calls: int, + allowed_tools: frozenset, + stop_event: threading.Event, +): + """Poll the remote filesystem for tool call requests and dispatch them. + + Runs in a background thread. Uses ``env.execute_oneshot()`` so it can + operate concurrently with the script-execution thread that holds + ``env.execute()`` (important for persistent-shell backends like SSH). + """ + from model_tools import handle_function_call + + poll_interval = 0.1 # 100 ms + + while not stop_event.is_set(): + try: + # List pending request files (skip .tmp partials) + ls_result = env.execute_oneshot( + f"ls -1 {rpc_dir}/req_* 2>/dev/null || true", + cwd="/", + timeout=10, + ) + output = ls_result.get("output", "").strip() + if not output: + stop_event.wait(poll_interval) + continue + + req_files = sorted([ + f.strip() for f in output.split("\n") + if f.strip() + and not f.strip().endswith(".tmp") + and "/req_" in f.strip() + ]) + + for req_file in req_files: + if stop_event.is_set(): + break + + call_start = time.monotonic() + + # Read request + read_result = env.execute_oneshot( + f"cat {req_file}", + cwd="/", + timeout=10, + ) + try: + request = json.loads(read_result.get("output", "")) + except (json.JSONDecodeError, ValueError): + logger.debug("Malformed RPC request in %s", req_file) + # Remove bad request to avoid infinite retry + env.execute_oneshot(f"rm -f {req_file}", cwd="/", timeout=5) + continue + + tool_name = request.get("tool", "") + tool_args = request.get("args", {}) + seq = request.get("seq", 0) + seq_str = f"{seq:06d}" + res_file = f"{rpc_dir}/res_{seq_str}" + + # Enforce allow-list + if tool_name not in allowed_tools: + available = ", ".join(sorted(allowed_tools)) + tool_result = json.dumps({ + "error": ( + f"Tool '{tool_name}' is not available in execute_code. " + f"Available: {available}" + ) + }) + # Enforce tool call limit + elif tool_call_counter[0] >= max_tool_calls: + tool_result = json.dumps({ + "error": ( + f"Tool call limit reached ({max_tool_calls}). " + "No more tool calls allowed in this execution." + ) + }) + else: + # Strip forbidden terminal parameters + if tool_name == "terminal" and isinstance(tool_args, dict): + for param in _TERMINAL_BLOCKED_PARAMS: + tool_args.pop(param, None) + + # Dispatch through the standard tool handler + try: + _real_stdout, _real_stderr = sys.stdout, sys.stderr + devnull = open(os.devnull, "w") + try: + sys.stdout = devnull + sys.stderr = devnull + tool_result = handle_function_call( + tool_name, tool_args, task_id=task_id + ) + finally: + sys.stdout, sys.stderr = _real_stdout, _real_stderr + devnull.close() + except Exception as exc: + logger.error("Tool call failed in remote sandbox: %s", + exc, exc_info=True) + tool_result = json.dumps({"error": str(exc)}) + + tool_call_counter[0] += 1 + call_duration = time.monotonic() - call_start + tool_call_log.append({ + "tool": tool_name, + "args_preview": str(tool_args)[:80], + "duration": round(call_duration, 2), + }) + + # Write response atomically (tmp + rename). + # Use echo piping (not stdin_data) because Modal doesn't + # reliably deliver stdin to chained commands. + encoded_result = base64.b64encode( + tool_result.encode("utf-8") + ).decode("ascii") + env.execute_oneshot( + f"echo '{encoded_result}' | base64 -d > {res_file}.tmp" + f" && mv {res_file}.tmp {res_file}", + cwd="/", + timeout=60, + ) + + # Remove the request file + env.execute_oneshot(f"rm -f {req_file}", cwd="/", timeout=5) + + except Exception as e: + if not stop_event.is_set(): + logger.debug("RPC poll error: %s", e, exc_info=True) + + if not stop_event.is_set(): + stop_event.wait(poll_interval) + + +def _execute_remote( + code: str, + task_id: Optional[str], + enabled_tools: Optional[List[str]], +) -> str: + """Run a script on the remote terminal backend via file-based RPC. + + The script and the generated hermes_tools.py module are shipped to + the remote environment, and tool calls are proxied through a polling + thread that communicates via request/response files. + """ + from tools.terminal_tool import _interrupt_event + + _cfg = _load_config() + timeout = _cfg.get("timeout", DEFAULT_TIMEOUT) + max_tool_calls = _cfg.get("max_tool_calls", DEFAULT_MAX_TOOL_CALLS) + + session_tools = set(enabled_tools) if enabled_tools else set() + sandbox_tools = frozenset(SANDBOX_ALLOWED_TOOLS & session_tools) + if not sandbox_tools: + sandbox_tools = SANDBOX_ALLOWED_TOOLS + + effective_task_id = task_id or "default" + env, env_type = _get_or_create_env(effective_task_id) + + sandbox_id = uuid.uuid4().hex[:12] + sandbox_dir = f"/tmp/hermes_exec_{sandbox_id}" + + tool_call_log: list = [] + tool_call_counter = [0] + exec_start = time.monotonic() + stop_event = threading.Event() + rpc_thread = None + + try: + # Verify Python is available on the remote + py_check = env.execute_oneshot( + "command -v python3 >/dev/null 2>&1 && echo OK", + cwd="/", timeout=15, + ) + if "OK" not in py_check.get("output", ""): + return json.dumps({ + "status": "error", + "error": ( + f"Python 3 is not available in the {env_type} terminal " + "environment. Install Python to use execute_code with " + "remote backends." + ), + "tool_calls_made": 0, + "duration_seconds": 0, + }) + + # Create sandbox directory on remote + env.execute_oneshot( + f"mkdir -p {sandbox_dir}/rpc", cwd="/", timeout=10, + ) + + # Generate and ship files + tools_src = generate_hermes_tools_module( + list(sandbox_tools), transport="file", + ) + _ship_file_to_remote(env, f"{sandbox_dir}/hermes_tools.py", tools_src) + _ship_file_to_remote(env, f"{sandbox_dir}/script.py", code) + + # Start RPC polling thread + rpc_thread = threading.Thread( + target=_rpc_poll_loop, + args=( + env, f"{sandbox_dir}/rpc", effective_task_id, + tool_call_log, tool_call_counter, max_tool_calls, + sandbox_tools, stop_event, + ), + daemon=True, + ) + rpc_thread.start() + + # Build environment variable prefix for the script + env_prefix = ( + f"HERMES_RPC_DIR={sandbox_dir}/rpc " + f"PYTHONDONTWRITEBYTECODE=1" + ) + tz = os.getenv("HERMES_TIMEZONE", "").strip() + if tz: + env_prefix += f" TZ={tz}" + + # Execute the script on the remote backend + logger.info("Executing code on %s backend (task %s)...", + env_type, effective_task_id[:8]) + script_result = env.execute( + f"cd {sandbox_dir} && {env_prefix} python3 script.py", + timeout=timeout, + ) + + stdout_text = script_result.get("output", "") + exit_code = script_result.get("returncode", -1) + status = "success" + + # Check for timeout/interrupt from the backend + if exit_code == 124: + status = "timeout" + elif exit_code == 130: + status = "interrupted" + + except Exception as exc: + duration = round(time.monotonic() - exec_start, 2) + logger.error( + "execute_code remote failed after %ss with %d tool calls: %s: %s", + duration, tool_call_counter[0], type(exc).__name__, exc, + exc_info=True, + ) + return json.dumps({ + "status": "error", + "error": str(exc), + "tool_calls_made": tool_call_counter[0], + "duration_seconds": duration, + }, ensure_ascii=False) + + finally: + # Stop the polling thread + stop_event.set() + if rpc_thread is not None: + rpc_thread.join(timeout=5) + + # Clean up remote sandbox dir + try: + env.execute_oneshot( + f"rm -rf {sandbox_dir}", cwd="/", timeout=15, + ) + except Exception: + logger.debug("Failed to clean up remote sandbox %s", sandbox_dir) + + duration = round(time.monotonic() - exec_start, 2) + + # --- Post-process output (same as local path) --- + + # Truncate stdout to cap + if len(stdout_text) > MAX_STDOUT_BYTES: + head_bytes = int(MAX_STDOUT_BYTES * 0.4) + tail_bytes = MAX_STDOUT_BYTES - head_bytes + head = stdout_text[:head_bytes] + tail = stdout_text[-tail_bytes:] + omitted = len(stdout_text) - len(head) - len(tail) + stdout_text = ( + head + + f"\n\n... [OUTPUT TRUNCATED - {omitted:,} chars omitted " + f"out of {len(stdout_text):,} total] ...\n\n" + + tail + ) + + # Strip ANSI escape sequences + from tools.ansi_strip import strip_ansi + stdout_text = strip_ansi(stdout_text) + + # Redact secrets + from agent.redact import redact_sensitive_text + stdout_text = redact_sensitive_text(stdout_text) + + # Build response + result: Dict[str, Any] = { + "status": status, + "output": stdout_text, + "tool_calls_made": tool_call_counter[0], + "duration_seconds": duration, + } + + if status == "timeout": + result["error"] = f"Script timed out after {timeout}s and was killed." + elif status == "interrupted": + result["output"] = ( + stdout_text + "\n[execution interrupted — user sent a new message]" + ) + elif exit_code != 0: + result["status"] = "error" + result["error"] = f"Script exited with code {exit_code}" + + return json.dumps(result, ensure_ascii=False) + + # --------------------------------------------------------------------------- # Main entry point # --------------------------------------------------------------------------- @@ -352,6 +873,9 @@ def execute_code( Run a Python script in a sandboxed child process with RPC access to a subset of Hermes tools. + Dispatches to the local (UDS) or remote (file-based RPC) path + depending on the configured terminal backend. + Args: code: Python source code to execute. task_id: Session task ID for tool isolation (terminal env, etc.). @@ -369,6 +893,14 @@ def execute_code( if not code or not code.strip(): return json.dumps({"error": "No code provided."}) + # Dispatch: remote backends use file-based RPC, local uses UDS + from tools.terminal_tool import _get_env_config + env_type = _get_env_config()["env_type"] + if env_type != "local": + return _execute_remote(code, task_id, enabled_tools) + + # --- Local execution path (UDS) --- below this line is unchanged --- + # Import interrupt event from terminal_tool (cooperative cancellation) from tools.terminal_tool import _interrupt_event diff --git a/tools/environments/base.py b/tools/environments/base.py index 2b02c3c47..21b698ec0 100644 --- a/tools/environments/base.py +++ b/tools/environments/base.py @@ -91,6 +91,19 @@ class BaseEnvironment(ABC): kw["stdin"] = subprocess.DEVNULL return kw + def execute_oneshot(self, command: str, cwd: str = "", *, + timeout: int | None = None, + stdin_data: str | None = None) -> dict: + """Execute a command bypassing any persistent shell. + + Safe for concurrent use alongside a long-running execute() call. + Backends that maintain a persistent shell (SSH, Local) override this + to route through their oneshot path, avoiding the shell lock. + Non-persistent backends delegate to execute(). + """ + return self.execute(command, cwd=cwd, timeout=timeout, + stdin_data=stdin_data) + def _timeout_result(self, timeout: int | None) -> dict: """Standard return dict when a command times out.""" return { diff --git a/tools/environments/persistent_shell.py b/tools/environments/persistent_shell.py index b1280bf4e..c4344ff5a 100644 --- a/tools/environments/persistent_shell.py +++ b/tools/environments/persistent_shell.py @@ -141,6 +141,19 @@ class PersistentShellMixin: command, cwd, timeout=timeout, stdin_data=stdin_data, ) + def execute_oneshot(self, command: str, cwd: str = "", *, + timeout: int | None = None, + stdin_data: str | None = None) -> dict: + """Always use the oneshot (non-persistent) execution path. + + This bypasses _shell_lock so it can run concurrently with a + long-running command in the persistent shell — used by + execute_code's file-based RPC polling thread. + """ + return self._execute_oneshot( + command, cwd, timeout=timeout, stdin_data=stdin_data, + ) + def cleanup(self): if self.persistent: self._cleanup_persistent_shell()