From c8b30e9efa8a41299ec18867bea15d00b8e8511b Mon Sep 17 00:00:00 2001 From: Shannon Sands Date: Tue, 10 Feb 2026 07:23:08 +0000 Subject: [PATCH] Updated terminal_tool with SlotPoolEnvironment --- atropos/tools/__init__.py | 21 +- atropos/tools/build_registry.py | 64 ----- atropos/tools/hermes_external_tools.py | 90 ------- atropos/tools/sandbox_stubs.py | 99 -------- atropos/tools/toolset_resolver.py | 88 ------- memory-bank/activeContext.md | 42 +++- memory-bank/progress.md | 16 +- tools/terminal_tool.py | 320 ++++++++++++++++++++++++- 8 files changed, 373 insertions(+), 367 deletions(-) delete mode 100644 atropos/tools/build_registry.py delete mode 100644 atropos/tools/hermes_external_tools.py delete mode 100644 atropos/tools/sandbox_stubs.py delete mode 100644 atropos/tools/toolset_resolver.py diff --git a/atropos/tools/__init__.py b/atropos/tools/__init__.py index 4e6f1101bc..4e2f818bde 100644 --- a/atropos/tools/__init__.py +++ b/atropos/tools/__init__.py @@ -1,12 +1,22 @@ """ Tool abstractions for atropos-agent. -Provides base Tool class and common tool implementations. +Provides base Tool class, ToolCall/ToolResult types, and specialized tools. + +Kept modules: +- base.py: ToolSchema, ToolCall, ToolResult, Tool ABC, ToolRegistry +- tool_executor.py: Batched execution queue with slot routing +- terminal_stateful_tool.py: Persistent terminal sessions +- tmux_tool.py: Tmux-based streaming terminal + +Removed (replaced by hermes-agent equivalents): +- build_registry.py → model_tools.py + toolsets.py +- sandbox_stubs.py → atropos/backends/ execute() methods +- hermes_external_tools.py → environments/agent_loop.py handle_function_call() +- toolset_resolver.py → toolsets.py """ from .base import Tool, ToolCall, ToolRegistry, ToolResult, ToolSchema -from .build_registry import build_tool_registry -from .sandbox_stubs import BashTool, ReadFileTool, TerminalTool, WriteFileTool from .terminal_stateful_tool import TerminalStatefulTool from .tmux_tool import TmuxTool @@ -16,11 +26,6 @@ __all__ = [ "ToolRegistry", "ToolResult", "ToolSchema", - "BashTool", - "ReadFileTool", - "WriteFileTool", - "TerminalTool", "TerminalStatefulTool", "TmuxTool", - "build_tool_registry", ] diff --git a/atropos/tools/build_registry.py b/atropos/tools/build_registry.py deleted file mode 100644 index c26553be72..0000000000 --- a/atropos/tools/build_registry.py +++ /dev/null @@ -1,64 +0,0 @@ -""" -Unified tool registry builder for Hermes-Agent Atropos integration. - -This composes: -- sandbox tool stubs (terminal/bash/read_file/write_file + stateful terminal/tmux) -- Hermes external tools (web/vision/image/moa/skills/browser), executed via ToolServer - -ToolExecutor only needs the schema + `external` routing bit; ToolServer executes -the external tools via Hermes' existing implementations. -""" - -from __future__ import annotations - -from typing import List, Optional - -from .base import ToolRegistry -from .hermes_external_tools import build_external_tools -from .sandbox_stubs import BashTool, ReadFileTool, TerminalTool, WriteFileTool -from .terminal_stateful_tool import TerminalStatefulTool -from .tmux_tool import TmuxTool -from .toolset_resolver import resolve_multiple_toolsets - - -def build_tool_registry( - *, - enabled_toolsets: Optional[List[str]] = None, - disabled_toolsets: Optional[List[str]] = None, - tool_server_url: Optional[str] = None, -) -> ToolRegistry: - """ - Build a ToolRegistry for AgentEnv / ToolExecutor / ToolServer. - - If `tool_server_url` is not provided, external tools will be omitted so we do - not advertise tools that cannot execute. - """ - enabled_toolsets = enabled_toolsets or ["default"] - - # Resolve tool names using Hermes toolsets plus Atropos additions. - selected = set(resolve_multiple_toolsets(enabled_toolsets)) - if disabled_toolsets: - selected -= set(resolve_multiple_toolsets(disabled_toolsets)) - - reg = ToolRegistry() - - # Always register sandbox tools if selected. - sandbox_by_name = { - "terminal": TerminalTool(), - "bash": BashTool(), - "read_file": ReadFileTool(), - "write_file": WriteFileTool(), - "terminal_stateful": TerminalStatefulTool(), - "tmux": TmuxTool(), - } - for name, tool in sandbox_by_name.items(): - if name in selected: - reg.register(tool) - - # External tools: only include when ToolServer is configured. - if tool_server_url: - for tool in build_external_tools(selected_tool_names=selected): - if tool.name in selected: - reg.register(tool) - - return reg diff --git a/atropos/tools/hermes_external_tools.py b/atropos/tools/hermes_external_tools.py deleted file mode 100644 index 3172913863..0000000000 --- a/atropos/tools/hermes_external_tools.py +++ /dev/null @@ -1,90 +0,0 @@ -""" -Hermes external tool adapter for Atropos ToolServer. - -These tools reuse Hermes-Agent's existing tool runner (`model_tools.handle_function_call`) -so we don't duplicate external tool implementations. - -Important: -- These are marked `external=True` and should be executed ONLY by ToolServer. -- We run `handle_function_call` in a worker thread because the Hermes implementation - uses `asyncio.run()` internally for some async tools (web_extract, vision, MoA, etc). -""" - -from __future__ import annotations - -import asyncio -import json -from typing import Any, Dict, List, Optional - -import model_tools - -from .base import Tool, ToolResult, ToolSchema - - -def _schema_from_openai_tool_dict(tool: Dict[str, Any], *, external: bool) -> ToolSchema: - fn = tool.get("function") or {} - name = str(fn.get("name") or "") - description = str(fn.get("description") or "") - params = fn.get("parameters") or {} - properties = params.get("properties") or {} - required = params.get("required") or [] - if not isinstance(required, list): - required = [] - return ToolSchema( - name=name, - description=description, - parameters=dict(properties), - required=[str(x) for x in required if isinstance(x, (str, int))], - external=external, - ) - - -class HermesExternalTool(Tool): - def __init__(self, schema: ToolSchema): - self._schema = schema - - @property - def schema(self) -> ToolSchema: - return self._schema - - async def execute(self, task_id: Optional[str] = None, **kwargs: Any) -> ToolResult: - # `model_tools.handle_function_call` returns a JSON string (success or error). - # Run in a thread because some Hermes tool handlers call `asyncio.run()`. - raw = await asyncio.to_thread(model_tools.handle_function_call, self.name, kwargs, task_id) - - try: - parsed = json.loads(raw) - except Exception: - # Keep as plain string. - return ToolResult(success=True, output=str(raw)) - - if isinstance(parsed, dict) and parsed.get("error"): - return ToolResult(success=False, error=str(parsed.get("error")), output="") - - return ToolResult(success=True, output=json.dumps(parsed, ensure_ascii=False)) - - -def build_external_tools( - *, - selected_tool_names: Optional[set[str]] = None, -) -> List[HermesExternalTool]: - """ - Build external tool wrappers from Hermes tool declarations. - - Filters out sandbox-oriented tools (e.g. `terminal`) since those should run - inside the sandbox via ToolExecutor. - """ - # IMPORTANT: Hermes' `model_tools.get_tool_definitions()` only understands Hermes toolsets. - # Atropos envs add extra toolsets (filesystem/sandbox/stateful). To avoid noisy "Unknown toolset" - # prints and accidental filtering, we fetch ALL Hermes tool definitions here and filter by name. - tools = model_tools.get_tool_definitions(enabled_toolsets=None, disabled_toolsets=None, quiet_mode=True) - - wrappers: List[HermesExternalTool] = [] - for t in tools: - schema = _schema_from_openai_tool_dict(t, external=True) - if schema.name in {"terminal"}: - continue - if selected_tool_names is not None and schema.name not in selected_tool_names: - continue - wrappers.append(HermesExternalTool(schema)) - return wrappers diff --git a/atropos/tools/sandbox_stubs.py b/atropos/tools/sandbox_stubs.py deleted file mode 100644 index 1407dec308..0000000000 --- a/atropos/tools/sandbox_stubs.py +++ /dev/null @@ -1,99 +0,0 @@ -""" -Sandbox tool stubs for Atropos ToolExecutor. - -These tools are executed inside the sandbox containers via: -ToolExecutor -> SlotPool -> sandbox_server.py - -They intentionally do NOT execute anything on the host process. If they are -called directly (outside ToolExecutor), they return a clear error. -""" - -from __future__ import annotations - -from typing import Optional - -from .base import Tool, ToolResult, ToolSchema - - -class TerminalTool(Tool): - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="terminal", - description=( - "Execute a command inside the sandbox slot workspace and return stdout/stderr. " - "Filesystem persists within a trajectory slot. Background processes are not supported " - "in stateless mode. Commands run under POSIX /bin/sh and each tool call runs in a fresh " - "shell (no persisted env vars). Avoid bash-only syntax like `source`; prefer `. .venv/bin/activate` " - "or invoke `.venv/bin/python ...` directly." - ), - parameters={ - "command": {"type": "string", "description": "The command to execute"}, - "timeout": { - "type": "integer", - "description": "Command timeout in seconds (optional).", - "minimum": 1, - }, - "background": { - "type": "boolean", - "description": "Not supported in sandbox terminal (always false).", - "default": False, - }, - }, - required=["command"], - external=False, - ) - - async def execute(self, **_kwargs) -> ToolResult: - return ToolResult( - success=False, - error="terminal must be executed via ToolExecutor inside the sandbox", - ) - - -class BashTool(Tool): - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="bash", - description="Execute a bash command inside the sandbox slot workspace.", - parameters={"command": {"type": "string", "description": "The bash command to execute"}}, - required=["command"], - external=False, - ) - - async def execute(self, **_kwargs) -> ToolResult: - return ToolResult(success=False, error="bash must be executed via ToolExecutor inside the sandbox") - - -class ReadFileTool(Tool): - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="read_file", - description="Read a file from the sandbox slot workspace.", - parameters={"path": {"type": "string", "description": "Path to the file"}}, - required=["path"], - external=False, - ) - - async def execute(self, **_kwargs) -> ToolResult: - return ToolResult(success=False, error="read_file must be executed via ToolExecutor inside the sandbox") - - -class WriteFileTool(Tool): - @property - def schema(self) -> ToolSchema: - return ToolSchema( - name="write_file", - description="Write a file into the sandbox slot workspace.", - parameters={ - "path": {"type": "string", "description": "Path to the file"}, - "content": {"type": "string", "description": "File content"}, - }, - required=["path", "content"], - external=False, - ) - - async def execute(self, **_kwargs) -> ToolResult: - return ToolResult(success=False, error="write_file must be executed via ToolExecutor inside the sandbox") diff --git a/atropos/tools/toolset_resolver.py b/atropos/tools/toolset_resolver.py deleted file mode 100644 index 9a450cc2ca..0000000000 --- a/atropos/tools/toolset_resolver.py +++ /dev/null @@ -1,88 +0,0 @@ -""" -Toolset resolution for Hermes-Agent Atropos integration. - -We primarily reuse Hermes-Agent toolsets (`toolsets.py`), but Atropos training/envs -need a few extra sandbox-oriented toolsets that Hermes doesn't expose by default -(e.g. filesystem + stateful terminal). -""" - -from __future__ import annotations - -from typing import Any, Dict, List, Optional, Set - -import toolsets as hermes_toolsets - - -ATROPOS_TOOLSETS: Dict[str, Dict[str, Any]] = { - "filesystem": { - "description": "Read/write files in the sandbox workspace.", - "tools": ["read_file", "write_file"], - "includes": [], - }, - "terminal_stateful": { - "description": "Stateful terminal execution (tmux/TUI support) inside the sandbox.", - "tools": ["terminal_stateful", "tmux"], - "includes": [], - }, - "sandbox": { - "description": "Sandbox tools (terminal + filesystem).", - "tools": [], - "includes": ["terminal", "filesystem"], - }, - "default": { - "description": "Default toolset for Atropos AgentEnv tasks.", - "tools": [], - "includes": ["sandbox"], - }, - "full": { - "description": "All Hermes tools plus Atropos sandbox additions.", - "tools": [], - "includes": ["all", "filesystem", "sandbox", "terminal_stateful"], - }, -} - - -def validate_toolset(name: str) -> bool: - if name in {"all", "*"}: - return True - return hermes_toolsets.validate_toolset(name) or name in ATROPOS_TOOLSETS - - -def resolve_toolset(name: str, visited: Optional[Set[str]] = None) -> List[str]: - if visited is None: - visited = set() - - if name in {"all", "*"}: - # Union Hermes + Atropos toolsets. - all_tools: Set[str] = set() - for tname in hermes_toolsets.get_toolset_names(): - all_tools.update(resolve_toolset(tname, visited=set())) - for tname, spec in ATROPOS_TOOLSETS.items(): - # Avoid recursion: some Atropos toolsets (e.g. "full") include "all". - if tname == "full" or "all" in (spec.get("includes") or []): - continue - all_tools.update(resolve_toolset(tname, visited=set())) - return sorted(all_tools) - - if name in ATROPOS_TOOLSETS: - if name in visited: - return [] - visited.add(name) - spec = ATROPOS_TOOLSETS[name] - tools: Set[str] = set(spec.get("tools", [])) - for inc in spec.get("includes", []): - tools.update(resolve_toolset(inc, visited=set(visited))) - return sorted(tools) - - # Fall back to Hermes toolsets. - # IMPORTANT: do not pre-add `name` to `visited` here; Hermes' resolver uses - # `visited` for its own cycle detection and will treat the presence of `name` - # as a circular dependency. - return sorted(hermes_toolsets.resolve_toolset(name, visited=set(visited))) - - -def resolve_multiple_toolsets(names: List[str]) -> List[str]: - tools: Set[str] = set() - for name in names: - tools.update(resolve_toolset(name, visited=set())) - return sorted(tools) diff --git a/memory-bank/activeContext.md b/memory-bank/activeContext.md index f4ffb138c9..70c7b868d7 100644 --- a/memory-bank/activeContext.md +++ b/memory-bank/activeContext.md @@ -63,23 +63,39 @@ python environments/swe_smith_oracle_env.py process \ - Full token tracking with logprobs via Phase 2 ManagedServer - Key finding: Llama-3-8B template silently drops `tools=` param, Qwen 3 has full Hermes format support -### What Still Needs to Be Done +### Current Task: Integrate Slot Pool Backend into tools/terminal_tool.py -#### 1. Replace hermes-agent tools backend with sandbox backend globally -Per Teknium's feedback: `tools/terminal_tool.py`, `tools/file_tools.py` etc. should be able to use -the Modal/Nomad sandbox backend not just in atropos envs but also in `batch_runner.py` for scaled -data generation. This unifies the tool execution path across CLI, batch, and RL environments. +#### Step 1: Add `_SlotPoolEnvironment` to `tools/terminal_tool.py` +- New class alongside existing `_LocalEnvironment`, `_DockerEnvironment`, etc. +- Routes through `atropos/backends/` (ModalToolBackend or NomadToolBackend) +- N:M slot multiplexing: 5-10 sandboxes × 10 slots each = 50-100 concurrent +- Singleton `_SlotPoolManager` (like `_ModalPoolManager`) manages backend lifecycle +- `execute()` acquires slot → `backend.execute_batch([(slot, "bash", ...)])` → returns `{"output": ..., "returncode": ...}` +- `cleanup()` releases slot back to pool -#### 2. Clean up redundant code -- Remove `atropos/agent/` (replaced by `environments/agent_loop.py`) -- Remove `atropos/envs/agent_env.py` (replaced by `environments/hermes_base_env.py`) -- Remove `atropos/tools/` (use `model_tools.py` + `tools/` directly) +#### Step 2: Wire into `_create_environment()` +- `TERMINAL_ENV=slot_pool` → `_SlotPoolEnvironment(...)` +- Sub-config: `TERMINAL_SLOT_BACKEND=modal` or `TERMINAL_SLOT_BACKEND=nomad` +- Reuse existing `TERMINAL_MODAL_*` and Nomad env vars for configuration -#### 3. Test with Tinker trainer (blocked on billing) -Full RL training loop: Tinker API → atropos rollout API → environment → trainer +#### Step 3: Remove redundant `atropos/tools/` files +- DELETE: `hermes_external_tools.py`, `build_registry.py`, `sandbox_stubs.py`, `toolset_resolver.py` +- KEEP: `base.py` (ToolCall/ToolResult types), `tool_executor.py` (batched queue), `terminal_stateful_tool.py`, `tmux_tool.py` -#### 4. Add more environments -Teknium mentioned needing "endless-terminals" and "terminalbench 2" envs +#### Step 4: Clean up `atropos/envs/` and `atropos/agent/` (defer) +- Remove `atropos/envs/agent_env.py` → replaced by `environments/hermes_base_env.py` +- Remove `atropos/agent/atropos_agent.py` → replaced by `environments/agent_loop.py` + +#### Later +- Test with Tinker trainer (blocked on billing) +- Add more environments (endless-terminals, terminalbench 2) + +### Key Architecture Insight +Two separate sandbox integration points: +1. **`tools/terminal_tool.py` with `TERMINAL_ENV=slot_pool`** — for hermes CLI, batch_runner, any code using `handle_function_call("terminal", ...)`. Uses `_SlotPoolEnvironment` which wraps `atropos/backends/`. +2. **`environments/hermes_base_env.py` with `tool_pool_mode=modal/nomad`** — for RL environments. Uses `_collect_trajectory_sandbox()` which directly acquires slots and creates `sandbox_tool_handler`. + +Both use the same underlying `atropos/backends/` (ModalToolBackend, NomadToolBackend) with the same slot pool. ### Architecture Summary diff --git a/memory-bank/progress.md b/memory-bank/progress.md index 9ea63478d3..e41c33567e 100644 --- a/memory-bank/progress.md +++ b/memory-bank/progress.md @@ -21,11 +21,19 @@ Test results: - [x] Test Phase 1 (OpenAI server type) with Nous API — WORKS - [x] Test Phase 2 (ManagedServer) with RunPod SGLang — WORKS - [x] Port SWE env to `HermesAgentBaseEnv` with multiplexed sandboxing -- [ ] End-to-end test with Modal sandbox (needs live Modal) -- [ ] Remove redundant `atropos/agent/` and `atropos/envs/agent_env.py` -- [ ] Clean up redundant `atropos/tools/` +- [x] End-to-end test: Qwen 3 8B + Modal sandbox + tool calls in sandbox + pytest verification +- [x] Add `_SlotPoolEnvironment` to `tools/terminal_tool.py` (TERMINAL_ENV=slot_pool) +- [x] Remove redundant `atropos/tools/` files (4 of 8) +- [ ] Remove redundant `atropos/agent/` and `atropos/envs/agent_env.py` (deferred) - [ ] Test end-to-end with Tinker trainer (blocked on billing) -- [ ] Test with actual tool calls (model producing tool_calls, not just text) + +### ✅ End-to-End SWE + Modal Sandbox Verified (Feb 10, 2026) +- Qwen 3 8B on RunPod SGLang (endpoint `0tx0ruuuo4f10c`) +- Phase 2 ManagedServer with hermes tool call parser +- 5 terminal commands executed in Modal sandbox: ls, git status, git log, cat parse.py, cat tests/ +- In-sandbox verification: install deps + pytest → score 0.0 (model inspected but didn't fix) +- Full token tracking with logprobs via /generate endpoint +- Key finding: Llama-3-8B template drops tools= silently; Qwen 3 has full Hermes tool format ## Completed Features diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index c2c0217df1..950c7205ab 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -1545,6 +1545,311 @@ class _ModalSandboxEnvironment: pass +# ============================================================================= +# Slot Pool Environment — routes through atropos/backends/ for multiplexed +# sandbox execution. Supports Modal, Nomad (Docker + Singularity/Apptainer). +# +# Usage: TERMINAL_ENV=slot_pool TERMINAL_SLOT_BACKEND=modal +# ============================================================================= + +class _SlotPoolAsyncWorker: + """Background thread with its own event loop for running async backend ops.""" + + def __init__(self): + self._loop = None + self._thread = None + + def start(self): + import asyncio as _aio + self._loop = _aio.new_event_loop() + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() + + def _run(self): + import asyncio as _aio + _aio.set_event_loop(self._loop) + self._loop.run_forever() + + def run(self, coro, timeout=300): + """Run an async coroutine synchronously on the worker thread.""" + import asyncio as _aio + if self._loop is None or self._thread is None: + raise RuntimeError("SlotPoolAsyncWorker not started") + future = _aio.run_coroutine_threadsafe(coro, self._loop) + return future.result(timeout=timeout) + + def stop(self): + if self._loop: + self._loop.call_soon_threadsafe(self._loop.stop) + if self._thread: + self._thread.join(timeout=5) + + +class _SlotPoolManager: + """ + Singleton manager for the slot-pool sandbox backend. + + Wraps atropos/backends/ (ModalToolBackend or NomadToolBackend) and provides + synchronous acquire/execute/release operations via a background async worker. + + Config via environment variables: + TERMINAL_SLOT_BACKEND = modal | nomad (default: modal) + # Modal settings (reuses TERMINAL_MODAL_* vars): + TERMINAL_MODAL_IMAGE = python:3.11 + TERMINAL_MODAL_SLOTS = 10 + TERMINAL_MODAL_MIN = 1 + TERMINAL_MODAL_MAX = 5 + # Nomad settings: + TERMINAL_NOMAD_ADDRESS = http://localhost:4646 + TERMINAL_NOMAD_DRIVER = docker | singularity + TERMINAL_NOMAD_IMAGE = atropos-sandbox:local + """ + + _instance: Optional["_SlotPoolManager"] = None + _lock = threading.Lock() + + @classmethod + def get_instance(cls) -> "_SlotPoolManager": + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = cls() + cls._instance._start() + return cls._instance + + @classmethod + def reset_instance(cls): + with cls._lock: + if cls._instance is not None: + cls._instance._stop() + cls._instance = None + + def __init__(self): + self._backend = None + self._worker = _SlotPoolAsyncWorker() + self._slots: Dict[str, Any] = {} # task_id → Slot + self._slot_lock = threading.Lock() + self._started = False + + def _start(self): + """Initialize the backend and async worker.""" + self._worker.start() + + backend_type = os.getenv("TERMINAL_SLOT_BACKEND", "modal").strip().lower() + print(f"[SlotPool] Starting {backend_type} backend...") + + if backend_type == "modal": + self._backend = self._create_modal_backend() + elif backend_type == "nomad": + self._backend = self._create_nomad_backend() + else: + raise ValueError( + f"Unknown TERMINAL_SLOT_BACKEND: {backend_type}. Use 'modal' or 'nomad'." + ) + + self._worker.run(self._backend.start(), timeout=120) + self._started = True + print(f"[SlotPool] {backend_type} backend started") + + def _create_modal_backend(self): + from atropos.backends.modal_backend import ModalSandboxConfig, ModalToolBackend + + config = ModalSandboxConfig( + name="default", + app_name=os.getenv("TERMINAL_SLOT_APP_NAME", "hermes-slot-pool"), + image=os.getenv("TERMINAL_MODAL_IMAGE") or os.getenv("TERMINAL_DOCKER_IMAGE", "python:3.11"), + gpu=os.getenv("TERMINAL_MODAL_GPU") or None, + cpu=float(os.getenv("TERMINAL_MODAL_CPU", "1.0")), + memory=int(os.getenv("TERMINAL_MODAL_MEMORY", "2048")), + slots_per_sandbox=int(os.getenv("TERMINAL_MODAL_SLOTS", "10")), + min_sandboxes=int(os.getenv("TERMINAL_MODAL_MIN", "1")), + max_sandboxes=int(os.getenv("TERMINAL_MODAL_MAX", "5")), + idle_timeout=int(os.getenv("TERMINAL_MODAL_IDLE_TIMEOUT", "120")), + max_lifetime=int(os.getenv("TERMINAL_MODAL_MAX_LIFETIME", "3600")), + acquire_timeout_s=float(os.getenv("TERMINAL_MODAL_ACQUIRE_TIMEOUT", "60.0")), + execution_timeout_s=float(os.getenv("TERMINAL_MODAL_EXEC_TIMEOUT", "300.0")), + workspace_base=os.getenv("TERMINAL_MODAL_WORKSPACE", "/data"), + ) + return ModalToolBackend(config) + + def _create_nomad_backend(self): + from atropos.backends.nomad_backend import NomadBackendConfig, NomadToolBackend + + config = NomadBackendConfig( + nomad_address=os.getenv("TERMINAL_NOMAD_ADDRESS", "http://localhost:4646"), + job_id=os.getenv("TERMINAL_NOMAD_JOB_ID", "hermes-slot-pool"), + image=os.getenv("TERMINAL_NOMAD_IMAGE") or os.getenv("TERMINAL_DOCKER_IMAGE", "atropos-sandbox:local"), + driver=os.getenv("TERMINAL_NOMAD_DRIVER", "docker"), + slots_per_container=int(os.getenv("TERMINAL_NOMAD_SLOTS", "10")), + min_containers=int(os.getenv("TERMINAL_NOMAD_MIN", "1")), + max_containers=int(os.getenv("TERMINAL_NOMAD_MAX", "10")), + ) + return NomadToolBackend(config) + + def _stop(self): + """Shut down the backend and worker.""" + if self._started and self._backend: + try: + # Release all held slots + with self._slot_lock: + for task_id, slot in list(self._slots.items()): + try: + self._worker.run( + self._backend.release(slot, reset_workspace=True), + timeout=10, + ) + except Exception: + pass + self._slots.clear() + + self._worker.run(self._backend.stop(purge=False), timeout=30) + except Exception as e: + print(f"[SlotPool] Warning: shutdown error: {e}") + finally: + self._started = False + + self._worker.stop() + print("[SlotPool] Backend stopped") + + def acquire(self, task_id: str, timeout: float = 60.0): + """Acquire a slot for a task_id. Returns the Slot object.""" + with self._slot_lock: + if task_id in self._slots: + return self._slots[task_id] + + slot = self._worker.run( + self._backend.acquire(task_id), timeout=timeout + ) + + with self._slot_lock: + self._slots[task_id] = slot + + return slot + + def execute(self, task_id: str, command: str, cwd: str = "", timeout: float = 300.0) -> dict: + """Execute a command in the task's slot. Returns {"output": ..., "returncode": ...}.""" + with self._slot_lock: + slot = self._slots.get(task_id) + if slot is None: + return {"output": "Error: no slot acquired for this task", "returncode": 1} + + # Build command with cwd prefix if needed + full_command = f"cd {cwd} && {command}" if cwd else command + + results = self._worker.run( + self._backend.execute_batch( + [(slot, "bash", {"command": full_command})], + timeout_s=timeout, + ), + timeout=timeout + 30, # Extra margin for network + ) + + r = results[0] + output = r.output if r.success else ( + f"{r.output}\n{r.error}" if r.output else r.error + ) + returncode = r.metadata.get("returncode", 0 if r.success else 1) + return {"output": output, "returncode": returncode} + + def release(self, task_id: str, reset_workspace: bool = True): + """Release a task's slot back to the pool.""" + with self._slot_lock: + slot = self._slots.pop(task_id, None) + if slot is None: + return + + try: + self._worker.run( + self._backend.release(slot, reset_workspace=reset_workspace), + timeout=30, + ) + except Exception as e: + print(f"[SlotPool] Warning: release failed for {task_id}: {e}") + + def get_status(self) -> Dict[str, Any]: + """Get pool status.""" + if not self._started or not self._backend: + return {"status": "not started"} + return self._backend.get_status() + + +class _SlotPoolEnvironment: + """ + Slot-pool based execution environment. + + Routes terminal commands through atropos/backends/ (Modal, Nomad/Docker, + Nomad/Singularity) with N:M slot multiplexing. Multiple tasks share a + smaller number of sandboxes via slot assignment. + + Usage: + TERMINAL_ENV=slot_pool + TERMINAL_SLOT_BACKEND=modal # or nomad + TERMINAL_MODAL_IMAGE=python:3.11 + TERMINAL_MODAL_SLOTS=10 + """ + + def __init__( + self, + cwd: str = "/data", + timeout: int = 300, + task_id: str = "", + ): + self.cwd = cwd + self.timeout = timeout + self.task_id = task_id or str(uuid.uuid4()) + self._released = False + + # Acquire a slot from the pool + manager = _SlotPoolManager.get_instance() + manager.acquire(self.task_id, timeout=60.0) + + def execute(self, command: str, cwd: str = "", *, timeout: int | None = None) -> dict: + """Execute a command in the slot's workspace.""" + exec_command = _transform_sudo_command(command) + work_dir = cwd or self.cwd + + try: + return _SlotPoolManager.get_instance().execute( + self.task_id, + exec_command, + cwd=work_dir, + timeout=float(timeout or self.timeout), + ) + except Exception as e: + error_msg = str(e) + if "timeout" in error_msg.lower(): + return {"output": f"Command timed out after {timeout or self.timeout}s", "returncode": 124} + return {"output": f"SlotPool execution error: {error_msg}", "returncode": 1} + + def cleanup(self): + """Release slot back to the pool (workspace reset for reuse).""" + if not self._released: + self._released = True + _SlotPoolManager.get_instance().release(self.task_id, reset_workspace=True) + + def stop(self): + """Same as cleanup for slot pool.""" + self.cleanup() + + def __del__(self): + try: + self.cleanup() + except: + pass + + +def _shutdown_slot_pool(): + """Shutdown the slot pool manager (called at process exit).""" + try: + _SlotPoolManager.reset_instance() + except Exception: + pass + +# Register slot pool shutdown alongside modal pool shutdown +import atexit as _atexit_slot +_atexit_slot.register(_shutdown_slot_pool) + + # Tool description for LLM TERMINAL_TOOL_DESCRIPTION = """Execute commands on a secure Linux environment. @@ -1664,8 +1969,21 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int, ssh_c timeout=timeout ) + elif env_type == "slot_pool": + # Multiplexed sandbox pool via atropos/backends/ (Modal, Nomad/Docker, Nomad/Singularity) + # N:M slot multiplexing for high-throughput parallel execution + workspace = os.getenv("TERMINAL_MODAL_WORKSPACE", "/data") + return _SlotPoolEnvironment( + cwd=cwd or workspace, + timeout=timeout, + task_id=task_id if 'task_id' in dir() else "", + ) + else: - raise ValueError(f"Unknown environment type: {env_type}. Use 'local', 'docker', 'singularity', 'modal', or 'ssh'") + raise ValueError( + f"Unknown environment type: {env_type}. " + "Use 'local', 'docker', 'singularity', 'modal', 'ssh', or 'slot_pool'" + ) def _cleanup_inactive_envs(lifetime_seconds: int = 300):