diff --git a/.env.example b/.env.example index 77d13e8fd9..99156b356a 100644 --- a/.env.example +++ b/.env.example @@ -144,12 +144,87 @@ TERMINAL_LIFETIME_SECONDS=300 # SUDO_PASSWORD=your_password_here # ============================================================================= -# MODAL CLOUD BACKEND (Optional - for TERMINAL_ENV=modal) +# MODAL CLOUD BACKEND (for TERMINAL_ENV=modal) # ============================================================================= -# Modal uses CLI authentication, not environment variables. -# Run: pip install modal && modal setup -# This will authenticate via browser and store credentials locally. -# No API key needed in .env - Modal handles auth automatically. +# Modal provides cloud sandboxes with per-second billing and auto-scaling. +# This implementation uses a warm pool of sandboxes for cost efficiency. +# +# SETUP: +# pip install modal && modal setup +# (Authenticates via browser, stores credentials locally) +# +# FEATURES: +# - Auto-scaling warm sandbox pool (no cold start after first use) +# - Named sandbox recovery (reconnects after restart) +# - Profile-based heterogeneous environments (CPU, GPU, different images) +# - Server-side idle_timeout protection against orphaned sandboxes + +# Modal app name (groups all sandboxes, used for recovery) +TERMINAL_MODAL_APP_NAME=hermes-sandbox + +# Default profile when none specified +TERMINAL_MODAL_DEFAULT_PROFILE=default + +# Profile config file (optional - YAML format, see modal_profiles.yaml) +# TERMINAL_MODAL_PROFILES_FILE=modal_profiles.yaml + +# --- Default Profile Settings (used if no YAML file) --- +# These apply when no profile is specified or for the "default" profile +TERMINAL_MODAL_IMAGE=python:3.11 +TERMINAL_MODAL_MIN_POOL=1 +TERMINAL_MODAL_MAX_POOL=5 +TERMINAL_MODAL_IDLE_TIMEOUT=120 +TERMINAL_MODAL_MAX_LIFETIME=3600 +TERMINAL_MODAL_SCALE_DOWN_IDLE=180 + +# --- Custom Profile Example: pytorch-gpu --- +# Uncomment to enable a GPU profile for ML tasks +# Usage: terminal_tool("python train.py", profile="pytorch-gpu") +# +# TERMINAL_MODAL_PROFILE_pytorch_gpu_IMAGE=pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime +# TERMINAL_MODAL_PROFILE_pytorch_gpu_GPU=T4 +# TERMINAL_MODAL_PROFILE_pytorch_gpu_MEMORY=16384 +# TERMINAL_MODAL_PROFILE_pytorch_gpu_MIN_POOL=0 +# TERMINAL_MODAL_PROFILE_pytorch_gpu_MAX_POOL=2 +# TERMINAL_MODAL_PROFILE_pytorch_gpu_IDLE_TIMEOUT=60 + +# --- Custom Profile Example: node --- +# Uncomment to enable a Node.js profile +# Usage: terminal_tool("npm test", profile="node") +# +# TERMINAL_MODAL_PROFILE_node_IMAGE=node:18 +# TERMINAL_MODAL_PROFILE_node_MIN_POOL=0 +# TERMINAL_MODAL_PROFILE_node_MAX_POOL=3 + +# ============================================================================= +# MODAL SECRETS (Secure credential injection) +# ============================================================================= +# Modal Secrets allow you to securely pass API keys, passwords, and other +# sensitive data to your sandboxes without exposing them in code or logs. +# +# SETUP SECRETS: +# 1. Via Dashboard: https://modal.com/secrets +# 2. Via CLI: modal secret create my-secret KEY1=value1 KEY2=value2 +# 3. Via CLI with env: modal secret create my-secret API_KEY="$API_KEY" +# +# LIST SECRETS: +# modal secret list +# +# DELETE SECRETS: +# modal secret delete my-secret + +# Global secrets applied to ALL profiles (comma-separated secret names) +# These secrets must be created on Modal dashboard or via CLI first +# TERMINAL_MODAL_SECRETS=my-api-keys,database-creds + +# Per-profile secrets (comma-separated secret names) +# TERMINAL_MODAL_PROFILE_pytorch_gpu_SECRETS=huggingface-token,wandb-key + +# Per-profile environment variables (semicolon-separated KEY=VALUE pairs) +# TERMINAL_MODAL_PROFILE_default_ENV_VARS=DEBUG=1;LOG_LEVEL=info + +# Load local .env file into sandbox (useful for development) +# TERMINAL_MODAL_PROFILE_default_USE_DOTENV=true # ============================================================================= # BROWSER TOOL CONFIGURATION (agent-browser + Browserbase) diff --git a/atropos/backends/__init__.py b/atropos/backends/__init__.py index f3b911959b..1988633637 100644 --- a/atropos/backends/__init__.py +++ b/atropos/backends/__init__.py @@ -3,7 +3,7 @@ from __future__ import annotations from typing import Any from .base import ToolBackend -from .modal_backend import ModalBackendConfig, ModalToolBackend +from .modal_backend import ModalSandboxConfig, ModalToolBackend from .nomad_backend import NomadBackendConfig, NomadToolBackend @@ -12,7 +12,7 @@ def create_tool_backend(cfg: Any) -> ToolBackend: if mode == "nomad": return NomadToolBackend(NomadBackendConfig.from_agent_env_config(cfg)) if mode == "modal": - return ModalToolBackend(ModalBackendConfig.from_agent_env_config(cfg)) + return ModalToolBackend(ModalSandboxConfig.from_agent_env_config(cfg)) raise ValueError(f"Unknown tool_pool_mode: {mode}") @@ -21,7 +21,7 @@ __all__ = [ "create_tool_backend", "NomadBackendConfig", "NomadToolBackend", - "ModalBackendConfig", + "ModalSandboxConfig", "ModalToolBackend", ] diff --git a/atropos/backends/modal_backend.py b/atropos/backends/modal_backend.py index 3affe08e35..32c25caecd 100644 --- a/atropos/backends/modal_backend.py +++ b/atropos/backends/modal_backend.py @@ -1,73 +1,1176 @@ -""" -Modal tool backend (stub). - -We intentionally ship a placeholder implementation so AgentEnv can expose a -backend switch without forcing Modal as a hard dependency for Hermes-Agent. - -When org access is available, this backend will be implemented by running a -long-lived Modal worker (or pool) that owns N slots and exposes `execute_batch`. -""" - from __future__ import annotations -from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Tuple +import asyncio +import os +import uuid +from dataclasses import dataclass, field, replace +from typing import Any, Dict, List, Optional, Tuple, ClassVar from ..slots.executor import ExecutionResult -from ..slots.slot import Slot +from ..slots.slot import Slot, SlotState from .base import ToolBackend +import yaml -@dataclass(frozen=True) -class ModalBackendConfig: - # Placeholders for future implementation. - app_name: str = "atropos-sandbox" - function_name: str = "sandbox_server" - volume_name: Optional[str] = None - volume_mount_path: str = "/data" - +@dataclass +class ModalSandboxConfig: + """ + Unified configuration for Modal sandbox pools. + + This single class handles both profile definitions and runtime configuration. + Use `with_app_name()` to set the full app_name for a specific deployment. + + Example profiles: + - "default": Basic Python environment, CPU only + - "pytorch-gpu": PyTorch with T4 GPU for training + - "high-memory": 64GB RAM for large model inference + + Usage: + # Single profile + config = ModalSandboxConfig(name="default") + config = config.with_app_name("my-training") + backend = ModalToolBackend(config) + + # Multi-profile + profiles = ModalSandboxConfig.load_profiles() + backend = ModalToolBackend.with_profiles(profiles=profiles) + """ + # Identity + name: str = "default" + app_name: Optional[str] = None # Full app name (computed via with_app_name) + + # Container image + image: str = "python:3.11" + + # Resource allocation + gpu: Optional[str] = None # None, "T4", "A10G", "A100", "H100" + cpu: float = 1.0 + memory: int = 2048 # MB + + # Pool sizing (slot-based multiplexing) + slots_per_sandbox: int = 10 + min_sandboxes: int = 1 + max_sandboxes: int = 5 + + # Timeouts + idle_timeout: int = 120 # Modal server-side auto-cleanup + max_lifetime: int = 3600 # Max sandbox lifetime + acquire_timeout_s: float = 60.0 # Timeout waiting for slot + execution_timeout_s: float = 30.0 # Default command timeout + + # Credentials + secrets: List[str] = field(default_factory=list) # Modal Secret names + env_vars: Dict[str, str] = field(default_factory=dict) + + # Working directory + workspace_base: str = "/data" + + def with_app_name(self, base_app: str) -> "ModalSandboxConfig": + """Return a copy with computed app_name: {base_app}-{name}.""" + return replace(self, app_name=f"{base_app}-{self.name}") + + def get_app_name(self, fallback: str = "atropos-sandbox") -> str: + """Get app_name, using fallback if not set.""" + return self.app_name or f"{fallback}-{self.name}" + + # ------------------------------------------------------------------------- + # Loading methods + # ------------------------------------------------------------------------- + @classmethod - def from_agent_env_config(cls, cfg: Any) -> "ModalBackendConfig": + def from_dict(cls, name: str, data: Dict[str, Any]) -> "ModalSandboxConfig": + """Create config from dictionary (e.g., from YAML).""" return cls( - app_name=str(getattr(cfg, "modal_app_name", cls.app_name)), - function_name=str(getattr(cfg, "modal_function_name", cls.function_name)), - volume_name=(getattr(cfg, "modal_volume_name", None) or None), - volume_mount_path=str(getattr(cfg, "modal_volume_mount_path", cls.volume_mount_path)), + name=name, + app_name=data.get("app_name"), + image=data.get("image", "python:3.11"), + gpu=data.get("gpu"), + cpu=float(data.get("cpu", 1.0)), + memory=int(data.get("memory", 2048)), + slots_per_sandbox=int(data.get("slots_per_sandbox", 10)), + min_sandboxes=int(data.get("min_sandboxes", 1)), + max_sandboxes=int(data.get("max_sandboxes", 5)), + idle_timeout=int(data.get("idle_timeout", 120)), + max_lifetime=int(data.get("max_lifetime", 3600)), + acquire_timeout_s=float(data.get("acquire_timeout_s", 60.0)), + execution_timeout_s=float(data.get("execution_timeout_s", 30.0)), + secrets=list(data.get("secrets", [])), + env_vars=dict(data.get("env_vars", {})), + workspace_base=data.get("workspace_base", "/data"), ) + + @classmethod + def from_env(cls, profile_name: str = "default") -> "ModalSandboxConfig": + """Create config from environment variables.""" + prefix = f"ATROPOS_MODAL_PROFILE_{profile_name.upper().replace('-', '_')}_" + + def get_env(key: str, default: Any) -> str: + return os.environ.get(f"{prefix}{key}", os.environ.get(f"ATROPOS_MODAL_{key}", str(default))) + + secrets = [] + secrets_str = get_env("SECRETS", "") + if secrets_str: + secrets = [s.strip() for s in secrets_str.split(",") if s.strip()] + + env_vars = {} + env_vars_str = get_env("ENV_VARS", "") + if env_vars_str: + for pair in env_vars_str.split(";"): + if "=" in pair: + k, v = pair.split("=", 1) + env_vars[k.strip()] = v.strip() + + return cls( + name=profile_name, + image=get_env("IMAGE", "python:3.11"), + gpu=get_env("GPU", "") or None, + cpu=float(get_env("CPU", 1.0)), + memory=int(get_env("MEMORY", 2048)), + slots_per_sandbox=int(get_env("SLOTS_PER_SANDBOX", 10)), + min_sandboxes=int(get_env("MIN_SANDBOXES", 1)), + max_sandboxes=int(get_env("MAX_SANDBOXES", 5)), + idle_timeout=int(get_env("IDLE_TIMEOUT", 120)), + max_lifetime=int(get_env("MAX_LIFETIME", 3600)), + acquire_timeout_s=float(get_env("ACQUIRE_TIMEOUT", 60.0)), + execution_timeout_s=float(get_env("EXECUTION_TIMEOUT", 30.0)), + secrets=secrets, + env_vars=env_vars, + workspace_base=get_env("WORKSPACE_BASE", "/data"), + ) + + @classmethod + def from_agent_env_config(cls, cfg: Any) -> "ModalSandboxConfig": + """Create config from AgentEnv configuration object.""" + secrets = [] + secrets_str = str(getattr(cfg, "modal_secrets", "")) + if secrets_str: + secrets = [s.strip() for s in secrets_str.split(",") if s.strip()] + + env_vars = {} + env_vars_str = str(getattr(cfg, "modal_env_vars", "")) + if env_vars_str: + for pair in env_vars_str.split(";"): + if "=" in pair: + k, v = pair.split("=", 1) + env_vars[k.strip()] = v.strip() + + return cls( + name="default", + app_name=str(getattr(cfg, "modal_app_name", None)) or None, + image=str(getattr(cfg, "modal_image", "python:3.11")), + gpu=getattr(cfg, "modal_gpu", None) or None, + cpu=float(getattr(cfg, "modal_cpu", 1.0)), + memory=int(getattr(cfg, "modal_memory", 2048)), + slots_per_sandbox=int(getattr(cfg, "modal_slots_per_sandbox", 10)), + min_sandboxes=int(getattr(cfg, "modal_min_sandboxes", 1)), + max_sandboxes=int(getattr(cfg, "modal_max_sandboxes", 5)), + idle_timeout=int(getattr(cfg, "modal_idle_timeout", 120)), + max_lifetime=int(getattr(cfg, "modal_max_lifetime", 3600)), + acquire_timeout_s=float(getattr(cfg, "modal_acquire_timeout", 60.0)), + execution_timeout_s=float(getattr(cfg, "modal_execution_timeout", 30.0)), + secrets=secrets, + env_vars=env_vars, + workspace_base=str(getattr(cfg, "modal_workspace_base", "/data")), + ) + + @classmethod + def load_profiles(cls, config_file: Optional[str] = None) -> Dict[str, "ModalSandboxConfig"]: + """ + Load profiles from YAML file or environment variables. + + Priority: + 1. Specified config file + 2. ATROPOS_MODAL_PROFILES_FILE env var + 3. Default profiles from env vars + """ + profiles: Dict[str, ModalSandboxConfig] = {} + + # Try loading from YAML + config_path = config_file or os.environ.get("ATROPOS_MODAL_PROFILES_FILE") + if config_path and os.path.exists(config_path): + try: + with open(config_path, "r") as f: + data = yaml.safe_load(f) + + if data and "profiles" in data: + for name, profile_data in data["profiles"].items(): + profiles[name] = cls.from_dict(name, profile_data) + + print(f"[Modal] Loaded {len(profiles)} profile(s) from {config_path}") + return profiles + except Exception as e: + print(f"[Modal] Warning: Could not load profiles from {config_path}: {e}") + + # Load from environment variables + profile_names_str = os.environ.get("ATROPOS_MODAL_PROFILES", "default") + profile_names = [p.strip() for p in profile_names_str.split(",") if p.strip()] + + for name in profile_names: + profiles[name] = cls.from_env(name) + + # Always ensure "default" profile exists + if "default" not in profiles: + profiles["default"] = cls.from_env("default") + + return profiles + + +class _ModalSandboxWithSlots: + """ + A Modal sandbox hosting multiple slots (isolated workspaces). + + Each slot has its own workspace directory for filesystem isolation. + Multiple trajectories can run in the same sandbox via different slots. + """ + + def __init__( + self, + sandbox: Any, # modal.Sandbox + sandbox_id: str, + config: ModalSandboxConfig, + ): + self.sandbox = sandbox + self.sandbox_id = sandbox_id + self.config = config + self.slots: Dict[str, Slot] = {} + self._lock = asyncio.Lock() + + # Create slots + for i in range(config.slots_per_sandbox): + slot_id = f"{sandbox_id}_slot_{i}" + workspace_dir = f"{config.workspace_base}/{slot_id}" + self.slots[slot_id] = Slot( + slot_id=slot_id, + alloc_id=sandbox_id, + container_addr=f"modal://{sandbox_id}", # Virtual address + workspace_dir=workspace_dir, + state=SlotState.AVAILABLE, + ) + + async def initialize_workspaces(self): + """Create workspace directories for all slots.""" + try: + # Create all workspace directories in parallel + commands = [f"mkdir -p {slot.workspace_dir}" for slot in self.slots.values()] + combined_cmd = " && ".join(commands) + + process = self.sandbox.exec("bash", "-c", combined_cmd, timeout=30) + process.wait() + + except Exception as e: + print(f"[Modal] Warning: Could not initialize workspaces: {e}") + + async def acquire_slot(self, trajectory_id: Optional[str] = None) -> Optional[Slot]: + """Acquire an available slot.""" + async with self._lock: + for slot in self.slots.values(): + if slot.is_available: + slot.acquire(trajectory_id) + return slot + return None + + async def release_slot(self, slot: Slot, reset_workspace: bool = False): + """Release a slot back to available.""" + async with self._lock: + if slot.slot_id in self.slots: + if reset_workspace: + await self._reset_workspace(slot) + slot.release() + + async def _reset_workspace(self, slot: Slot): + """Reset a slot's workspace (delete all files).""" + try: + cmd = f"rm -rf {slot.workspace_dir}/* {slot.workspace_dir}/.[!.]* 2>/dev/null || true" + process = self.sandbox.exec("bash", "-c", cmd, timeout=30) + process.wait() + except Exception as e: + print(f"[Modal] Warning: Could not reset workspace {slot.slot_id}: {e}") + + async def execute( + self, + slot: Slot, + tool_name: str, + args: Dict[str, Any], + timeout: float = 30.0, + ) -> ExecutionResult: + """Execute a tool in a slot's workspace.""" + execution_id = str(uuid.uuid4()) + + try: + # Mark slot as executing + if slot.state == SlotState.ACQUIRED: + slot.start_execution(execution_id) + + # Build command based on tool type + if tool_name == "bash": + command = args.get("command", "") + elif tool_name == "read_file": + path = args.get("path", "") + full_path = f"{slot.workspace_dir}/{path}" if not path.startswith("/") else path + command = f"cat {full_path}" + elif tool_name == "write_file": + path = args.get("path", "") + content = args.get("content", "") + full_path = f"{slot.workspace_dir}/{path}" if not path.startswith("/") else path + # Escape content for shell + escaped_content = content.replace("'", "'\\''") + command = f"mkdir -p $(dirname {full_path}) && printf '%s' '{escaped_content}' > {full_path}" + else: + return ExecutionResult( + success=False, + error=f"Unknown tool: {tool_name}", + execution_id=execution_id, + slot_id=slot.slot_id, + ) + + # Execute in workspace directory + full_command = f"cd {slot.workspace_dir} && {command}" + + process = self.sandbox.exec( + "bash", "-c", full_command, + timeout=int(timeout), + ) + + stdout = process.stdout.read() + stderr = process.stderr.read() + process.wait() + + output = stdout + if stderr: + output = f"{stdout}\n{stderr}" if stdout else stderr + + return ExecutionResult( + success=process.returncode == 0, + output=output, + error="" if process.returncode == 0 else f"Exit code: {process.returncode}", + execution_id=execution_id, + slot_id=slot.slot_id, + metadata={"returncode": process.returncode}, + ) + + except Exception as e: + error_msg = str(e) + if "timeout" in error_msg.lower(): + error_msg = f"Command timed out after {timeout}s" + + return ExecutionResult( + success=False, + error=error_msg, + execution_id=execution_id, + slot_id=slot.slot_id, + ) + finally: + if slot.state == SlotState.EXECUTING: + slot.end_execution() + + def available_slots(self) -> int: + """Count available slots.""" + return sum(1 for slot in self.slots.values() if slot.is_available) + + def is_healthy(self) -> bool: + """Check if sandbox is still running.""" + try: + return self.sandbox.poll() is None + except Exception: + return False + + def terminate(self): + """Terminate this sandbox.""" + try: + self.sandbox.terminate() + except Exception: + pass + + +class _ModalSandboxPool: + """ + Pool of Modal sandboxes with slot-based multiplexing. + + Manages multiple sandboxes, each hosting multiple slots. + Provides acquire/release semantics for slots. + Auto-scales sandboxes based on demand. + """ + + def __init__(self, config: ModalSandboxConfig): + self.config = config + self._sandboxes: Dict[str, _ModalSandboxWithSlots] = {} + self._lock = asyncio.Lock() + self._app = None + self._image = None + self._started = False + self._next_sandbox_idx = 0 + + async def start(self): + """Initialize Modal app and create minimum sandboxes.""" + if self._started: + return + + try: + import modal + + app_name = self.config.get_app_name() + self._app = modal.App.lookup(app_name, create_if_missing=True) + self._image = modal.Image.from_registry(self.config.image) + + # Create minimum sandboxes + for _ in range(self.config.min_sandboxes): + await self._create_sandbox() + + self._started = True + print(f"[Modal] Pool started with {len(self._sandboxes)} sandbox(es), " + f"{self.config.slots_per_sandbox} slots each") + + except ImportError: + raise ImportError("Modal package not installed. Run: pip install modal") + + async def stop(self, purge: bool = False): + """Stop all sandboxes.""" + async with self._lock: + for sandbox_wrapper in self._sandboxes.values(): + sandbox_wrapper.terminate() + + if purge: + self._sandboxes.clear() + + self._started = False + print(f"[Modal] Pool stopped") + + async def _create_sandbox(self) -> _ModalSandboxWithSlots: + """Create a new sandbox with slots.""" + import modal + + sandbox_id = f"sandbox_{self._next_sandbox_idx}" + self._next_sandbox_idx += 1 + app_name = self.config.get_app_name() + + # Build secrets list + secrets_list = [] + for secret_name in self.config.secrets: + try: + secrets_list.append(modal.Secret.from_name(secret_name)) + except Exception as e: + print(f"[Modal] Warning: Could not load secret '{secret_name}': {e}") + + # Add env_vars as a programmatic secret + if self.config.env_vars: + secrets_list.append(modal.Secret.from_dict(self.config.env_vars)) + + # Build create kwargs + create_kwargs = { + "app": self._app, + "name": f"{app_name}-{sandbox_id}", + "image": self._image, + "timeout": self.config.max_lifetime, + "idle_timeout": self.config.idle_timeout, + "workdir": self.config.workspace_base, + } + + if self.config.cpu != 1.0: + create_kwargs["cpu"] = self.config.cpu + if self.config.memory != 2048: + create_kwargs["memory"] = self.config.memory + if self.config.gpu: + create_kwargs["gpu"] = self.config.gpu + if secrets_list: + create_kwargs["secrets"] = secrets_list + + # Try to recover existing sandbox or create new + try: + sandbox = modal.Sandbox.from_name( + app_name, + f"{app_name}-{sandbox_id}" + ) + if sandbox.poll() is None: + print(f"[Modal] Recovered existing sandbox: {sandbox_id}") + else: + sandbox = modal.Sandbox.create(**create_kwargs) + print(f"[Modal] Created new sandbox: {sandbox_id}") + except modal.exception.NotFoundError: + sandbox = modal.Sandbox.create(**create_kwargs) + print(f"[Modal] Created new sandbox: {sandbox_id}") + + wrapper = _ModalSandboxWithSlots(sandbox, sandbox_id, self.config) + await wrapper.initialize_workspaces() + + self._sandboxes[sandbox_id] = wrapper + return wrapper + + async def acquire(self, trajectory_id: Optional[str] = None) -> Slot: + """Acquire a slot for a trajectory.""" + deadline = asyncio.get_event_loop().time() + self.config.acquire_timeout_s + + while True: + async with self._lock: + # Try to find an available slot in existing sandboxes + for sandbox_wrapper in self._sandboxes.values(): + if sandbox_wrapper.is_healthy(): + slot = await sandbox_wrapper.acquire_slot(trajectory_id) + if slot: + return slot + + # No slots available - try to scale up + if len(self._sandboxes) < self.config.max_sandboxes: + try: + new_sandbox = await self._create_sandbox() + slot = await new_sandbox.acquire_slot(trajectory_id) + if slot: + return slot + except Exception as e: + print(f"[Modal] Failed to create sandbox: {e}") + + # Check timeout + if asyncio.get_event_loop().time() > deadline: + raise TimeoutError( + f"No slot available within {self.config.acquire_timeout_s}s " + f"(sandboxes: {len(self._sandboxes)}/{self.config.max_sandboxes})" + ) + + await asyncio.sleep(0.5) + + async def release(self, slot: Slot, reset_workspace: bool = False): + """Release a slot back to its sandbox.""" + sandbox_id = slot.alloc_id + + async with self._lock: + if sandbox_id in self._sandboxes: + await self._sandboxes[sandbox_id].release_slot(slot, reset_workspace) + + async def execute( + self, + slot: Slot, + tool_name: str, + args: Dict[str, Any], + timeout: Optional[float] = None, + ) -> ExecutionResult: + """Execute a tool in a slot.""" + sandbox_id = slot.alloc_id + exec_timeout = timeout or self.config.execution_timeout_s + + if sandbox_id not in self._sandboxes: + return ExecutionResult( + success=False, + error=f"Sandbox {sandbox_id} not found", + slot_id=slot.slot_id, + ) + + return await self._sandboxes[sandbox_id].execute( + slot, tool_name, args, exec_timeout + ) + + async def execute_batch( + self, + requests: List[Tuple[Slot, str, Dict[str, Any]]], + timeout: Optional[float] = None, + ) -> List[ExecutionResult]: + """ + Execute multiple tools in parallel across slots. + + This is the key optimization - batched execution maximizes + container utilization while agents wait for LLM responses. + """ + if not requests: + return [] + + exec_timeout = timeout or self.config.execution_timeout_s + + # Execute all requests in parallel + tasks = [ + self.execute(slot, tool_name, args, exec_timeout) + for slot, tool_name, args in requests + ] + + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Convert exceptions to ExecutionResults + final_results = [] + for i, result in enumerate(results): + if isinstance(result, Exception): + slot, tool_name, _ = requests[i] + final_results.append(ExecutionResult( + success=False, + error=str(result), + slot_id=slot.slot_id, + )) + else: + final_results.append(result) + + return final_results + + def get_status(self) -> Dict[str, Any]: + """Get pool status.""" + total_slots = 0 + available_slots = 0 + healthy_sandboxes = 0 + + for sandbox_wrapper in self._sandboxes.values(): + if sandbox_wrapper.is_healthy(): + healthy_sandboxes += 1 + total_slots += len(sandbox_wrapper.slots) + available_slots += sandbox_wrapper.available_slots() + + return { + "sandboxes": len(self._sandboxes), + "healthy_sandboxes": healthy_sandboxes, + "max_sandboxes": self.config.max_sandboxes, + "total_slots": total_slots, + "available_slots": available_slots, + "slots_per_sandbox": self.config.slots_per_sandbox, + } + + +class _ModalMultiProfileManager: + """ + Manages multiple sandbox pools across different profiles. + + This enables heterogeneous resource allocation - different trajectory + types can request different profiles (GPU vs CPU, etc.) + + Architecture: + Manager + ├── "default" profile → _ModalSandboxPool (CPU, 2GB) + ├── "pytorch-gpu" profile → _ModalSandboxPool (T4, 16GB) + └── "high-memory" profile → _ModalSandboxPool (CPU, 64GB) + """ + + def __init__( + self, + app_name: str = "atropos-sandbox", + profiles: Optional[Dict[str, ModalSandboxConfig]] = None, + default_profile: str = "default", + ): + self.app_name = app_name + self.default_profile = default_profile + self._profiles = profiles or ModalSandboxConfig.load_profiles() + self._pools: Dict[str, _ModalSandboxPool] = {} + self._slot_profile_map: Dict[str, str] = {} # slot_id -> profile_name + self._lock = asyncio.Lock() + self._started = False + + async def start(self, profiles_to_start: Optional[List[str]] = None): + """ + Start sandbox pools for specified profiles. + + Args: + profiles_to_start: Profile names to start. If None, starts default only. + """ + if self._started: + return + + profiles = profiles_to_start or [self.default_profile] + + for profile_name in profiles: + if profile_name not in self._profiles: + print(f"[Modal] Warning: Profile '{profile_name}' not found, skipping") + continue + + await self._ensure_pool(profile_name) + + self._started = True + print(f"[Modal] Multi-profile manager started with {len(self._pools)} pool(s)") + + async def stop(self, purge: bool = False): + """Stop all pools.""" + async with self._lock: + for pool in self._pools.values(): + await pool.stop(purge=purge) + + if purge: + self._pools.clear() + self._slot_profile_map.clear() + + self._started = False + print(f"[Modal] Multi-profile manager stopped") + + async def _ensure_pool(self, profile_name: str) -> _ModalSandboxPool: + """Ensure a pool exists for the given profile, create if needed.""" + if profile_name not in self._pools: + if profile_name not in self._profiles: + raise ValueError(f"Unknown profile: {profile_name}") + + config = self._profiles[profile_name].with_app_name(self.app_name) + + pool = _ModalSandboxPool(config) + await pool.start() + + self._pools[profile_name] = pool + print(f"[Modal] Started pool for profile '{profile_name}'") + + return self._pools[profile_name] + + async def acquire( + self, + trajectory_id: Optional[str] = None, + profile: Optional[str] = None, + ) -> Slot: + """ + Acquire a slot from the specified profile's pool. + + Args: + trajectory_id: ID of the trajectory requesting the slot + profile: Profile name to use. If None, uses default profile. + + Returns: + Acquired Slot + """ + profile_name = profile or self.default_profile + + async with self._lock: + pool = await self._ensure_pool(profile_name) + + slot = await pool.acquire(trajectory_id) + + # Track which profile this slot belongs to + self._slot_profile_map[slot.slot_id] = profile_name + + return slot + + async def release(self, slot: Slot, reset_workspace: bool = False): + """Release a slot back to its profile's pool.""" + profile_name = self._slot_profile_map.get(slot.slot_id, self.default_profile) + + if profile_name in self._pools: + await self._pools[profile_name].release(slot, reset_workspace) + + # Clean up mapping + self._slot_profile_map.pop(slot.slot_id, None) + + async def execute( + self, + slot: Slot, + tool_name: str, + args: Dict[str, Any], + timeout: Optional[float] = None, + ) -> ExecutionResult: + """Execute a tool in a slot.""" + profile_name = self._slot_profile_map.get(slot.slot_id, self.default_profile) + + if profile_name not in self._pools: + return ExecutionResult( + success=False, + error=f"Pool for profile '{profile_name}' not found", + slot_id=slot.slot_id, + ) + + return await self._pools[profile_name].execute(slot, tool_name, args, timeout) + + async def execute_batch( + self, + requests: List[Tuple[Slot, str, Dict[str, Any]]], + timeout: Optional[float] = None, + ) -> List[ExecutionResult]: + """Execute batch across potentially different profile pools.""" + if not requests: + return [] + + # Group requests by profile + by_profile: Dict[str, List[Tuple[int, Slot, str, Dict[str, Any]]]] = {} + + for idx, (slot, tool_name, args) in enumerate(requests): + profile_name = self._slot_profile_map.get(slot.slot_id, self.default_profile) + if profile_name not in by_profile: + by_profile[profile_name] = [] + by_profile[profile_name].append((idx, slot, tool_name, args)) + + # Execute each profile's batch in parallel + async def execute_profile_batch( + profile_name: str, + profile_requests: List[Tuple[int, Slot, str, Dict[str, Any]]] + ) -> List[Tuple[int, ExecutionResult]]: + if profile_name not in self._pools: + return [ + (idx, ExecutionResult( + success=False, + error=f"Pool for profile '{profile_name}' not found", + slot_id=slot.slot_id, + )) + for idx, slot, _, _ in profile_requests + ] + + pool = self._pools[profile_name] + batch_requests = [(slot, tool_name, args) for _, slot, tool_name, args in profile_requests] + results = await pool.execute_batch(batch_requests, timeout=timeout) + + return [(profile_requests[i][0], result) for i, result in enumerate(results)] + + # Run all profile batches in parallel + tasks = [ + execute_profile_batch(profile_name, profile_requests) + for profile_name, profile_requests in by_profile.items() + ] + + batch_results = await asyncio.gather(*tasks, return_exceptions=True) + + # Collect results in original order + results: List[Optional[ExecutionResult]] = [None] * len(requests) + + for batch_result in batch_results: + if isinstance(batch_result, Exception): + continue + for idx, result in batch_result: + results[idx] = result + + # Fill in any missing results + for idx, result in enumerate(results): + if result is None: + slot, _, _ = requests[idx] + results[idx] = ExecutionResult( + success=False, + error="Batch execution failed", + slot_id=slot.slot_id, + ) + + return results # type: ignore + + def get_status(self) -> Dict[str, Any]: + """Get status of all pools.""" + status = { + "profiles": list(self._profiles.keys()), + "active_pools": list(self._pools.keys()), + "default_profile": self.default_profile, + "pools": {}, + } + + for profile_name, pool in self._pools.items(): + status["pools"][profile_name] = pool.get_status() + + return status + + def list_profiles(self) -> Dict[str, Dict[str, Any]]: + """List all available profiles and their configurations.""" + return { + name: { + "image": profile.image, + "gpu": profile.gpu, + "cpu": profile.cpu, + "memory": profile.memory, + "slots_per_sandbox": profile.slots_per_sandbox, + "max_sandboxes": profile.max_sandboxes, + "active": name in self._pools, + } + for name, profile in self._profiles.items() + } class ModalToolBackend(ToolBackend): - def __init__(self, config: ModalBackendConfig): - self.config = config - + """ + Modal-based tool backend with slot-based multiplexing and multi-profile support. + + This backend provides scalable execution for RL training: + - Multiple trajectories share Modal sandboxes via slots + - Batched parallel execution across slots + - Auto-scaling sandbox pool per profile + - Named sandbox recovery after restart + - Multi-profile support for heterogeneous resources + + Usage (single profile): + config = ModalSandboxConfig( + name="default", + slots_per_sandbox=10, + max_sandboxes=5, + ).with_app_name("my-training") + + backend = ModalToolBackend(config) + await backend.start() + slot = await backend.acquire("trajectory_1") + ... + await backend.stop() + + Usage (multi-profile): + backend = ModalToolBackend.with_profiles( + app_name="my-training", + profiles={ + "default": ModalSandboxConfig(name="default"), + "pytorch-gpu": ModalSandboxConfig( + name="pytorch-gpu", + image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime", + gpu="T4", + memory=16384, + ), + } + ) + + await backend.start(profiles_to_start=["default", "pytorch-gpu"]) + + # CPU task + slot1 = await backend.acquire("traj_1", profile="default") + + # GPU task + slot2 = await backend.acquire("traj_2", profile="pytorch-gpu") + + # Execute in parallel across different profiles + results = await backend.execute_batch([ + (slot1, "bash", {"command": "python preprocess.py"}), + (slot2, "bash", {"command": "python train.py"}), + ]) + + await backend.release(slot1) + await backend.release(slot2) + await backend.stop() + """ + + def __init__( + self, + config: Optional[ModalSandboxConfig] = None, + *, + multi_profile_manager: Optional[_ModalMultiProfileManager] = None, + ): + """ + Initialize backend with either single config or multi-profile manager. + + Args: + config: Sandbox configuration (ModalSandboxConfig) + multi_profile_manager: Manager for multiple profiles + """ + self._multi_profile = multi_profile_manager is not None + + if self._multi_profile: + self._manager = multi_profile_manager + self.config = None + self.pool = None + else: + self.config = config or ModalSandboxConfig() + self.pool = _ModalSandboxPool(self.config) + self._manager = None + + @classmethod + def with_profiles( + cls, + app_name: str = "atropos-sandbox", + profiles: Optional[Dict[str, ModalSandboxConfig]] = None, + default_profile: str = "default", + profiles_file: Optional[str] = None, + ) -> "ModalToolBackend": + """ + Create backend with multi-profile support. + + Args: + app_name: Modal app name prefix + profiles: Dict of profile name -> ModalSandboxConfig. If None, loads from file/env. + default_profile: Default profile name + profiles_file: Path to YAML profiles file + + Returns: + ModalToolBackend with multi-profile manager + """ + if profiles is None: + profiles = ModalSandboxConfig.load_profiles(profiles_file) + + manager = _ModalMultiProfileManager( + app_name=app_name, + profiles=profiles, + default_profile=default_profile, + ) + + return cls(multi_profile_manager=manager) + @property def default_timeout_s(self) -> Optional[float]: - return None - - def _unavailable(self) -> RuntimeError: - return RuntimeError( - "Modal tool backend is not implemented yet. " - "Keep `--env.tool_pool_mode nomad` for now." - ) - - async def start(self) -> None: - raise self._unavailable() - - async def stop(self, *, purge: bool = False) -> None: # noqa: ARG002 - # If start() isn't implemented, stop() is also unavailable. - raise self._unavailable() - - async def acquire(self, trajectory_id: Optional[str] = None) -> Slot: # noqa: ARG002 - raise self._unavailable() - - async def release(self, slot: Slot, *, reset_workspace: bool = False) -> None: # noqa: ARG002 - raise self._unavailable() - + if self._multi_profile: + # Return default profile's timeout + return 30.0 # Default fallback + return self.config.execution_timeout_s + + async def start(self, profiles_to_start: Optional[List[str]] = None) -> None: + """ + Start the Modal pool(s). + + Args: + profiles_to_start: For multi-profile, which profiles to start. + If None, starts default profile only. + """ + if self._multi_profile: + await self._manager.start(profiles_to_start) + else: + await self.pool.start() + + async def stop(self, *, purge: bool = False) -> None: + """Stop the Modal pool(s).""" + if self._multi_profile: + await self._manager.stop(purge=purge) + else: + await self.pool.stop(purge=purge) + + async def acquire( + self, + trajectory_id: Optional[str] = None, + profile: Optional[str] = None, + ) -> Slot: + """ + Acquire a slot for a trajectory. + + Args: + trajectory_id: ID of the trajectory + profile: Profile name (multi-profile mode only). If None, uses default. + + Returns: + Acquired Slot + """ + if self._multi_profile: + return await self._manager.acquire(trajectory_id, profile) + else: + return await self.pool.acquire(trajectory_id) + + async def release(self, slot: Slot, *, reset_workspace: bool = False) -> None: + """Release a slot back to the pool.""" + if self._multi_profile: + await self._manager.release(slot, reset_workspace) + else: + await self.pool.release(slot, reset_workspace=reset_workspace) + async def execute_batch( self, requests: List[Tuple[Slot, str, Dict[str, Any]]], *, - timeout_s: Optional[float] = None, # noqa: ARG002 + timeout_s: Optional[float] = None, ) -> List[ExecutionResult]: - raise self._unavailable() - + """Execute a batch of tools in parallel across slots.""" + if self._multi_profile: + return await self._manager.execute_batch(requests, timeout=timeout_s) + else: + return await self.pool.execute_batch(requests, timeout=timeout_s) + + # ------------------------------------------------------------------------- + # Artifact helpers + # ------------------------------------------------------------------------- + + async def _execute_in_slot( + self, + slot: Slot, + command: str, + timeout: Optional[float] = None, + ) -> ExecutionResult: + """Helper to execute a command in a slot.""" + if self._multi_profile: + return await self._manager.execute(slot, "bash", {"command": command}, timeout) + return await self.pool.execute(slot, "bash", {"command": command}, timeout) + + async def read_artifact( + self, + slot: Slot, + path: str, + *, + encoding: str = "text", + max_bytes: Optional[int] = None, + include_sha256: bool = False, + timeout_s: Optional[float] = None, + ) -> Dict[str, Any]: + """Read a file from a slot's workspace.""" + full_path = f"{slot.workspace_dir}/{path}" if not path.startswith("/") else path + + # Build command based on options + if encoding == "base64": + cmd = f"base64 {full_path}" + else: + cmd = f"cat {full_path}" + + if max_bytes: + cmd = f"head -c {max_bytes} {full_path}" + if encoding == "base64": + cmd = f"head -c {max_bytes} {full_path} | base64" + + result = await self._execute_in_slot(slot, cmd, timeout_s) + + response: Dict[str, Any] = { + "success": result.success, + "content": result.output if result.success else "", + "error": result.error, + } + + if include_sha256 and result.success: + sha_result = await self._execute_in_slot( + slot, f"sha256sum {full_path} | cut -d' ' -f1", timeout_s + ) + if sha_result.success: + response["sha256"] = sha_result.output.strip() + + return response + + async def list_artifacts( + self, + slot: Slot, + path: str = ".", + *, + recursive: bool = False, + max_entries: Optional[int] = None, + timeout_s: Optional[float] = None, + ) -> Dict[str, Any]: + """List files in a slot's workspace.""" + full_path = f"{slot.workspace_dir}/{path}" if not path.startswith("/") else path + + if recursive: + cmd = f"find {full_path} -type f" + else: + cmd = f"ls -1 {full_path}" + + if max_entries: + cmd = f"{cmd} | head -n {max_entries}" + + result = await self._execute_in_slot(slot, cmd, timeout_s) + + entries = [] + if result.success and result.output.strip(): + entries = result.output.strip().split("\n") + + return { + "success": result.success, + "entries": entries, + "error": result.error, + } + + async def archive_artifacts( + self, + slot: Slot, + path: str = ".", + *, + archive_format: str = "tar.gz", + max_bytes: Optional[int] = None, + max_entries: Optional[int] = None, + timeout_s: Optional[float] = None, + ) -> Dict[str, Any]: + """Create an archive of files in a slot's workspace.""" + full_path = f"{slot.workspace_dir}/{path}" if not path.startswith("/") else path + + if archive_format == "tar.gz": + cmd = f"tar -czf - -C {full_path} . | base64" + elif archive_format == "tar": + cmd = f"tar -cf - -C {full_path} . | base64" + elif archive_format == "zip": + cmd = f"cd {full_path} && zip -r - . | base64" + else: + return {"success": False, "error": f"Unknown format: {archive_format}"} + + result = await self._execute_in_slot(slot, cmd, timeout_s) + + return { + "success": result.success, + "archive_base64": result.output if result.success else "", + "format": archive_format, + "error": result.error, + } + + def get_status(self) -> Dict[str, Any]: + """Get backend status.""" + if self._multi_profile: + return self._manager.get_status() + return self.pool.get_status() + + def list_profiles(self) -> Dict[str, Dict[str, Any]]: + """ + List available profiles (multi-profile mode only). + + Returns: + Dict mapping profile names to their configs + """ + if self._multi_profile: + return self._manager.list_profiles() + return { + "default": { + "image": self.config.image, + "gpu": self.config.gpu, + "cpu": self.config.cpu, + "memory": self.config.memory, + "slots_per_sandbox": self.config.slots_per_sandbox, + "max_sandboxes": self.config.max_sandboxes, + "active": True, + } + } diff --git a/docs/MODAL_BACKEND.md b/docs/MODAL_BACKEND.md new file mode 100644 index 0000000000..0e5c4f3ef2 --- /dev/null +++ b/docs/MODAL_BACKEND.md @@ -0,0 +1,236 @@ +# Modal Backend + +Hermes Agent uses [Modal](https://modal.com) for scalable, isolated cloud execution environments. There are two Modal integrations: + +1. **Terminal Tool** (`tools/terminal_tool.py`) - For CLI/agent command execution +2. **Atropos Backend** (`atropos/backends/modal_backend.py`) - For batch RL training workloads + + + +--- + +## Terminal Tool (CLI/Agent) + +The terminal tool provides a simple interface for executing commands in Modal sandboxes. + +### Configuration + +Set environment variables: + +```bash +export TERMINAL_ENV=modal +export TERMINAL_MODAL_IMAGE=python:3.11 +export TERMINAL_MODAL_APP_NAME=hermes-sandbox +``` + +Or use a YAML config file (`modal_profiles.yaml`): + +```yaml +profiles: + default: + image: python:3.11 + cpu: 1.0 + memory: 2048 + min_pool: 1 + max_pool: 5 + idle_timeout: 120 + + gpu: + image: pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime + gpu: T4 + memory: 16384 + min_pool: 0 + max_pool: 2 +``` + +### Features + +| Feature | Description | +|---------|-------------| +| **Sandbox Pool** | Pre-warmed sandboxes for low latency | +| **Auto-scaling** | Grows/shrinks pool based on demand | +| **Idle Timeout** | Sandboxes auto-terminate when unused | +| **Profile Selection** | Different configs for different workloads | +| **Credential Injection** | `modal.Secret` integration | + +### Usage + +```python +from tools.terminal_tool import terminal_tool + +# Simple command +output = terminal_tool("echo hello", task_id="my-task") + +# With profile selection +output = terminal_tool("python train.py", task_id="training", profile="gpu") + +# Cleanup when done +from tools.terminal_tool import cleanup_vm +cleanup_vm("my-task") +``` + +### Architecture + +``` +_ModalPoolManager (singleton) + ├── "default" pool → [sandbox-0, sandbox-1, ...] + └── "gpu" pool → [sandbox-0, ...] + +Each pool: + - Maintains min_pool warm sandboxes + - Scales up to max_pool on demand + - Background thread scales down idle sandboxes +``` + +--- + +## Atropos Backend (RL Training) + +The Atropos backend is designed for high-throughput batch execution during reinforcement learning training. + +### Key Concept: Slot-based Multiplexing + +Instead of one sandbox per trajectory, multiple trajectories share sandboxes via **slots**: + +``` +Sandbox (1 container) + ├── Slot 0 → Trajectory A (workspace: /data/slot_0) + ├── Slot 1 → Trajectory B (workspace: /data/slot_1) + └── Slot 2 → Trajectory C (workspace: /data/slot_2) +``` + +**Benefits**: +- Fewer containers = lower cost +- Shared warm-up time +- Better GPU utilization + +### Configuration + +```python +from atropos.backends.modal_backend import ModalSandboxConfig, ModalToolBackend + +config = ModalSandboxConfig( + name="default", + image="python:3.11", + cpu=1.0, + memory=2048, + slots_per_sandbox=10, # 10 trajectories per container + min_sandboxes=1, + max_sandboxes=5, +) + +backend = ModalToolBackend(config.with_app_name("my-training")) +``` + +### Multi-Profile Support + +Different trajectory types can request different resources: + +```python +backend = ModalToolBackend.with_profiles( + app_name="rl-training", + profiles={ + "default": ModalSandboxConfig( + name="default", + cpu=1.0, + memory=2048, + ), + "pytorch-gpu": ModalSandboxConfig( + name="pytorch-gpu", + image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime", + gpu="T4", + memory=16384, + ), + } +) + +# CPU task +slot1 = await backend.acquire("traj-1", profile="default") + +# GPU task +slot2 = await backend.acquire("traj-2", profile="pytorch-gpu") +``` + +### Batched Execution + +The key optimization - execute many commands in parallel: + +```python +# Acquire slots for multiple trajectories +slots = [await backend.acquire(f"traj-{i}") for i in range(50)] + +# Execute batch across all slots in parallel +results = await backend.execute_batch([ + (slot, "bash", {"command": "python step.py"}) + for slot in slots +]) + +# Release slots +for slot in slots: + await backend.release(slot) +``` + +### Architecture + +``` +ModalToolBackend + └── _ModalMultiProfileManager + ├── "default" → _ModalSandboxPool + │ ├── Sandbox 0 (slots 0-9) + │ └── Sandbox 1 (slots 0-9) + │ + └── "pytorch-gpu" → _ModalSandboxPool + └── Sandbox 0 (slots 0-9) +``` + +--- + +## Credentials + +Inject secrets securely using Modal's secret management: + +```bash +# Create secret in Modal dashboard or CLI +modal secret create my-api-key API_KEY=sk-xxx +``` + +```python +# Reference in config +config = ModalSandboxConfig( + secrets=["my-api-key"], # Modal secret names + env_vars={"DEBUG": "1"}, # Additional env vars +) +``` + +--- + +## Cost Optimization Tips + +1. **Use slots**: Set `slots_per_sandbox=10+` to share containers +2. **Set idle_timeout**: Auto-terminate unused sandboxes (default: 120s) +3. **Use min_sandboxes=0**: For bursty workloads, scale from zero +4. **Match resources**: Don't over-provision CPU/memory +5. **Use profiles**: GPU only when needed + +--- + +## Troubleshooting + +### "Modal package not installed" +```bash +pip install modal +modal token new # Authenticate +``` + +### "Sandbox creation failed" +- Check Modal dashboard for quota limits +- Verify image exists and is accessible +- Check secret names are correct + +### Shutdown errors +These are harmless warnings during Python interpreter shutdown: +``` +[Modal] Error terminating ...: cannot schedule new futures after interpreter shutdown +``` + +The sandboxes will auto-terminate via Modal's idle_timeout anyway. diff --git a/hermes_agent.egg-info/SOURCES.txt b/hermes_agent.egg-info/SOURCES.txt index 7abce632ff..0dca8437b9 100644 --- a/hermes_agent.egg-info/SOURCES.txt +++ b/hermes_agent.egg-info/SOURCES.txt @@ -51,6 +51,8 @@ hermes_agent.egg-info/requires.txt hermes_agent.egg-info/top_level.txt tests/test_batch_runner.py tests/test_checkpoint_resumption.py +tests/test_modal_integration.py +tests/test_modal_stress.py tests/test_modal_terminal.py tests/test_nous_api_limits.py tests/test_nous_api_pattern.py diff --git a/modal_profiles.yaml.example b/modal_profiles.yaml.example new file mode 100644 index 0000000000..11d233494d --- /dev/null +++ b/modal_profiles.yaml.example @@ -0,0 +1,134 @@ +# Modal Sandbox Profiles Configuration +# ===================================== +# This file defines different sandbox profiles for heterogeneous workloads. +# Copy to modal_profiles.yaml and customize as needed. +# +# Usage: +# terminal_tool("python train.py", profile="pytorch-gpu") +# terminal_tool("npm test", profile="node") +# +# Each profile can specify: +# - image: Docker image to use +# - gpu: GPU type (null, "T4", "A10G", "A100", "H100") +# - cpu: CPU cores (float) +# - memory: Memory in MB +# - min_pool: Minimum warm sandboxes (cost vs latency tradeoff) +# - max_pool: Maximum sandboxes (hard cost cap) +# - idle_timeout: Server-side auto-cleanup in seconds +# - max_lifetime: Maximum sandbox lifetime in seconds +# - scale_down_idle: Client-side scale-down threshold in seconds +# - workdir: Working directory inside container +# - secrets: List of Modal Secret names to inject (created via dashboard/CLI) +# - env_vars: Dict of environment variables to pass directly +# - use_dotenv: If true, loads local .env file into sandbox +# +# SECRETS SETUP: +# Create secrets via Modal dashboard or CLI: +# modal secret create huggingface-token HF_TOKEN=hf_xxx +# modal secret create openai-key OPENAI_API_KEY=sk-xxx +# Then reference by name in profile's secrets list. + +# Default profile used when no profile specified +default_profile: default + +profiles: + # Default Python environment - good for most tasks + default: + image: python:3.11 + gpu: null + cpu: 1.0 + memory: 2048 + min_pool: 1 # Keep 1 warm for fast response + max_pool: 5 + idle_timeout: 120 # Modal terminates if idle 2 min + max_lifetime: 3600 # Max 1 hour + scale_down_idle: 180 + workdir: /workspace + secrets: [] # Add secret names here: ["my-api-keys"] + env_vars: {} # Add env vars here: {DEBUG: "1"} + use_dotenv: false # Set to true to load local .env + + # PyTorch with GPU for ML training/inference + pytorch-gpu: + image: pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime + gpu: T4 # Options: T4, A10G, A100, H100 + cpu: 4.0 + memory: 16384 # 16GB + min_pool: 0 # Don't keep GPU sandboxes warm (expensive!) + max_pool: 2 + idle_timeout: 60 # Shorter idle timeout for GPU (cost) + max_lifetime: 1800 # 30 min max for GPU tasks + scale_down_idle: 60 + workdir: /workspace + # ML-specific secrets + secrets: + - huggingface-token # HF_TOKEN env var + - wandb-key # WANDB_API_KEY env var + env_vars: + CUDA_VISIBLE_DEVICES: "0" + PYTORCH_CUDA_ALLOC_CONF: "expandable_segments:True" + + # High-end GPU for large models + pytorch-a100: + image: pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime + gpu: A100 + cpu: 8.0 + memory: 65536 # 64GB + min_pool: 0 + max_pool: 1 # Only 1 at a time (very expensive) + idle_timeout: 30 + max_lifetime: 3600 + scale_down_idle: 30 + workdir: /workspace + + # Node.js for JavaScript/TypeScript tasks + node: + image: node:18 + gpu: null + cpu: 1.0 + memory: 2048 + min_pool: 0 # Create on-demand + max_pool: 3 + idle_timeout: 120 + max_lifetime: 3600 + scale_down_idle: 180 + workdir: /workspace + + # High memory for data processing + high-memory: + image: python:3.11 + gpu: null + cpu: 4.0 + memory: 32768 # 32GB + min_pool: 0 + max_pool: 2 + idle_timeout: 120 + max_lifetime: 3600 + scale_down_idle: 180 + workdir: /workspace + + # Rust development environment + rust: + image: rust:1.75 + gpu: null + cpu: 2.0 + memory: 4096 + min_pool: 0 + max_pool: 2 + idle_timeout: 120 + max_lifetime: 3600 + scale_down_idle: 180 + workdir: /workspace + + # Go development environment + golang: + image: golang:1.21 + gpu: null + cpu: 2.0 + memory: 4096 + min_pool: 0 + max_pool: 2 + idle_timeout: 120 + max_lifetime: 3600 + scale_down_idle: 180 + workdir: /workspace diff --git a/tests/test_modal_integration.py b/tests/test_modal_integration.py new file mode 100644 index 0000000000..8da582d5a9 --- /dev/null +++ b/tests/test_modal_integration.py @@ -0,0 +1,1082 @@ +#!/usr/bin/env python3 +""" +Comprehensive Modal Integration Test Suite + +Tests both: +1. terminal_tool.py Modal backend (CLI/agent use case) +2. atropos/backends/modal_backend.py (RL training use case) + +Run with: + # All tests (requires Modal account) + python tests/test_modal_integration.py + + # Dry run (no Modal, tests config/logic only) + python tests/test_modal_integration.py --dry-run + + # Specific test category + python tests/test_modal_integration.py --category terminal + python tests/test_modal_integration.py --category atropos + python tests/test_modal_integration.py --category profiles +""" + +import asyncio +import json +import os +import sys +import tempfile +import time +from pathlib import Path +from typing import Dict, Any, List, Optional +from dataclasses import dataclass + +# Add parent to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +# ============================================================================= +# Atropos Import Helper +# ============================================================================= + +def try_import_atropos_backend(): + """ + Try to import atropos backend directly, bypassing the atroposlib check. + Returns (ModalToolBackend, ModalSandboxConfig, Slot, SlotState) or raises ImportError. + """ + try: + # Try direct import first (works if atroposlib is installed) + from atropos.backends.modal_backend import ModalToolBackend, ModalSandboxConfig + from atropos.slots.slot import Slot, SlotState + return ModalToolBackend, ModalSandboxConfig, Slot, SlotState + except (ImportError, ModuleNotFoundError): + # Try importing the module directly without going through atropos/__init__.py + import importlib.util + + backend_path = Path(__file__).parent.parent / "atropos" / "backends" / "modal_backend.py" + slot_path = Path(__file__).parent.parent / "atropos" / "slots" / "slot.py" + executor_path = Path(__file__).parent.parent / "atropos" / "slots" / "executor.py" + base_path = Path(__file__).parent.parent / "atropos" / "backends" / "base.py" + + if not backend_path.exists(): + raise ImportError(f"modal_backend.py not found at {backend_path}") + + # Load slot module first + spec = importlib.util.spec_from_file_location("atropos_slots_slot", slot_path) + slot_module = importlib.util.module_from_spec(spec) + sys.modules["atropos.slots.slot"] = slot_module + spec.loader.exec_module(slot_module) + + # Load executor module + spec = importlib.util.spec_from_file_location("atropos_slots_executor", executor_path) + executor_module = importlib.util.module_from_spec(spec) + sys.modules["atropos.slots.executor"] = executor_module + spec.loader.exec_module(executor_module) + + # Load base module + spec = importlib.util.spec_from_file_location("atropos_backends_base", base_path) + base_module = importlib.util.module_from_spec(spec) + sys.modules["atropos.backends.base"] = base_module + spec.loader.exec_module(base_module) + + # Now load modal_backend + spec = importlib.util.spec_from_file_location("atropos_backends_modal_backend", backend_path) + backend_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(backend_module) + + return ( + backend_module.ModalToolBackend, + backend_module.ModalSandboxConfig, + slot_module.Slot, + slot_module.SlotState, + ) + + +# ============================================================================= +# Test Configuration +# ============================================================================= + +@dataclass +class TestConfig: + dry_run: bool = False + verbose: bool = True + category: Optional[str] = None # None = all, or "terminal", "atropos", "profiles" + + +# ============================================================================= +# Test Results Tracking +# ============================================================================= + +class TestResults: + def __init__(self): + self.passed: List[str] = [] + self.failed: List[tuple] = [] # (name, error) + self.skipped: List[tuple] = [] # (name, reason) + + def record_pass(self, name: str): + self.passed.append(name) + print(f" ✅ {name}") + + def record_fail(self, name: str, error: str): + self.failed.append((name, error)) + print(f" ❌ {name}: {error}") + + def record_skip(self, name: str, reason: str): + self.skipped.append((name, reason)) + print(f" ⏭️ {name}: {reason}") + + def summary(self): + total = len(self.passed) + len(self.failed) + len(self.skipped) + print(f"\n{'='*60}") + print(f"TEST RESULTS: {len(self.passed)}/{total} passed") + print(f" Passed: {len(self.passed)}") + print(f" Failed: {len(self.failed)}") + print(f" Skipped: {len(self.skipped)}") + + if self.failed: + print(f"\nFailed tests:") + for name, error in self.failed: + print(f" - {name}: {error}") + + return len(self.failed) == 0 + + +results = TestResults() + + +# ============================================================================= +# CATEGORY 1: Profile Configuration Tests +# ============================================================================= + +def test_profile_loading_from_env(): + """Test ModalProfile.from_env() loads environment variables correctly.""" + from tools.terminal_tool import ModalProfile + + # Set test environment variables + # Note: The prefix is TERMINAL_MODAL_PROFILE_{profile_name}_ where profile_name is used as-is + os.environ["TERMINAL_MODAL_PROFILE_testenv_IMAGE"] = "python:3.12" + os.environ["TERMINAL_MODAL_PROFILE_testenv_GPU"] = "A100" + os.environ["TERMINAL_MODAL_PROFILE_testenv_CPU"] = "4.0" + os.environ["TERMINAL_MODAL_PROFILE_testenv_MEMORY"] = "32768" + os.environ["TERMINAL_MODAL_PROFILE_testenv_SECRETS"] = "secret1,secret2" + os.environ["TERMINAL_MODAL_PROFILE_testenv_ENV_VARS"] = "KEY1=val1;KEY2=val2" + + try: + profile = ModalProfile.from_env("testenv") + + assert profile.name == "testenv", f"Expected name 'testenv', got '{profile.name}'" + assert profile.image == "python:3.12", f"Expected image 'python:3.12', got '{profile.image}'" + assert profile.gpu == "A100", f"Expected GPU 'A100', got '{profile.gpu}'" + assert profile.cpu == 4.0, f"Expected CPU 4.0, got {profile.cpu}" + assert profile.memory == 32768, f"Expected memory 32768, got {profile.memory}" + assert profile.secrets == ["secret1", "secret2"], f"Secrets mismatch: {profile.secrets}" + assert profile.env_vars == {"KEY1": "val1", "KEY2": "val2"}, f"Env vars mismatch: {profile.env_vars}" + + results.record_pass("test_profile_loading_from_env") + except Exception as e: + results.record_fail("test_profile_loading_from_env", str(e)) + finally: + # Cleanup + for key in list(os.environ.keys()): + if key.startswith("TERMINAL_MODAL_PROFILE_testenv_"): + del os.environ[key] + + +def test_profile_loading_from_yaml(): + """Test ModalProfile.load_profiles() from YAML file.""" + from tools.terminal_tool import ModalProfile, YAML_AVAILABLE + + if not YAML_AVAILABLE: + results.record_skip("test_profile_loading_from_yaml", "PyYAML not installed") + return + + yaml_content = """ +profiles: + test-yaml: + image: pytorch/pytorch:2.0 + gpu: T4 + cpu: 2.0 + memory: 8192 + min_pool: 1 + max_pool: 3 + secrets: + - hf-token + env_vars: + CUDA_VISIBLE_DEVICES: "0" + test-yaml-2: + image: node:20 + cpu: 1.0 +""" + + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write(yaml_content) + yaml_path = f.name + + try: + profiles = ModalProfile.load_profiles(yaml_path) + + assert "test-yaml" in profiles, f"Profile 'test-yaml' not found in {list(profiles.keys())}" + assert "test-yaml-2" in profiles, f"Profile 'test-yaml-2' not found" + + p1 = profiles["test-yaml"] + assert p1.image == "pytorch/pytorch:2.0" + assert p1.gpu == "T4" + assert p1.cpu == 2.0 + assert p1.memory == 8192 + assert p1.secrets == ["hf-token"] + assert p1.env_vars == {"CUDA_VISIBLE_DEVICES": "0"} + + results.record_pass("test_profile_loading_from_yaml") + except Exception as e: + results.record_fail("test_profile_loading_from_yaml", str(e)) + finally: + os.unlink(yaml_path) + + +def test_profile_defaults(): + """Test ModalProfile uses correct defaults.""" + from tools.terminal_tool import ModalProfile + + try: + profile = ModalProfile(name="minimal") + + assert profile.image == "python:3.11" + assert profile.gpu is None + assert profile.cpu == 1.0 + assert profile.memory == 2048 + assert profile.min_pool == 1 + assert profile.max_pool == 5 + assert profile.idle_timeout == 120 + assert profile.secrets == [] + assert profile.env_vars == {} + + results.record_pass("test_profile_defaults") + except Exception as e: + results.record_fail("test_profile_defaults", str(e)) + + +def test_atropos_config_with_app_name(): + """Test ModalSandboxConfig.with_app_name() method.""" + try: + # Try direct import first + try: + from atropos.backends.modal_backend import ModalSandboxConfig + except (ImportError, ModuleNotFoundError): + # Try importing module directly without atropos/__init__.py + ModalToolBackend, ModalSandboxConfig, _, _ = try_import_atropos_backend() + + config = ModalSandboxConfig( + name="test-convert", + image="python:3.10", + gpu="A10G", + cpu=2.0, + memory=4096, + secrets=["secret1"], + env_vars={"FOO": "bar"}, + ) + + config_with_app = config.with_app_name("my-app") + + assert config_with_app.app_name == "my-app-test-convert" + assert config_with_app.image == "python:3.10" + assert config_with_app.gpu == "A10G" + assert config_with_app.cpu == 2.0 + assert config_with_app.memory == 4096 + assert config_with_app.secrets == ["secret1"] + assert config_with_app.env_vars == {"FOO": "bar"} + + results.record_pass("test_atropos_config_with_app_name") + except ImportError as e: + results.record_skip("test_atropos_config_with_app_name", f"Requires atroposlib: pip install -e '.[atropos]'") + except Exception as e: + results.record_fail("test_atropos_config_with_app_name", str(e)) + + +# ============================================================================= +# CATEGORY 2: Terminal Tool Modal Tests +# ============================================================================= + +def test_terminal_modal_pool_manager_singleton(): + """Test _ModalPoolManager is a proper singleton.""" + from tools.terminal_tool import _ModalPoolManager + + try: + # Reset singleton for test + _ModalPoolManager._instance = None + + manager1 = _ModalPoolManager.get_instance() + manager2 = _ModalPoolManager.get_instance() + + assert manager1 is manager2, "Pool manager should be singleton" + + results.record_pass("test_terminal_modal_pool_manager_singleton") + except Exception as e: + results.record_fail("test_terminal_modal_pool_manager_singleton", str(e)) + + +def test_terminal_create_environment_modal(): + """Test _create_environment creates Modal environment correctly.""" + from tools.terminal_tool import _create_environment + + try: + env = _create_environment( + env_type="modal", + image="python:3.11", + cwd="/workspace", + timeout=60, + task_id="test-task-123", + profile="default", + ) + + # Check it's the right type + assert env.__class__.__name__ == "_ModalSandboxEnvironment" + assert env.profile == "default" + assert env.task_id == "test-task-123" + + results.record_pass("test_terminal_create_environment_modal") + except Exception as e: + results.record_fail("test_terminal_create_environment_modal", str(e)) + + +def test_terminal_tool_profile_parameter(config: TestConfig): + """Test terminal_tool() accepts profile parameter.""" + if config.dry_run: + results.record_skip("test_terminal_tool_profile_parameter", "Dry run mode") + return + + from tools.terminal_tool import terminal_tool, cleanup_vm + + # Save original env + original_env = os.environ.get("TERMINAL_ENV") + + try: + os.environ["TERMINAL_ENV"] = "modal" + task_id = f"test-profile-param-{int(time.time())}" + + # This should work without error (profile passed through) + result = terminal_tool( + "echo 'Hello from Modal'", + task_id=task_id, + profile="default", + ) + + result_data = json.loads(result) + # terminal_tool returns {"output", "exit_code", "error"} not {"success"} + assert result_data.get("exit_code") == 0, f"Command failed: {result_data}" + assert "Hello from Modal" in result_data.get("output", "") + + cleanup_vm(task_id) + results.record_pass("test_terminal_tool_profile_parameter") + except Exception as e: + results.record_fail("test_terminal_tool_profile_parameter", str(e)) + finally: + if original_env: + os.environ["TERMINAL_ENV"] = original_env + elif "TERMINAL_ENV" in os.environ: + del os.environ["TERMINAL_ENV"] + + +def test_terminal_modal_execute_simple(config: TestConfig): + """Test basic command execution in Modal sandbox.""" + if config.dry_run: + results.record_skip("test_terminal_modal_execute_simple", "Dry run mode") + return + + from tools.terminal_tool import terminal_tool, cleanup_vm + + original_env = os.environ.get("TERMINAL_ENV") + + try: + os.environ["TERMINAL_ENV"] = "modal" + task_id = f"test-simple-{int(time.time())}" + + # Test echo + result = json.loads(terminal_tool("echo 'test123'", task_id=task_id)) + assert result["exit_code"] == 0, f"Echo failed: {result}" + assert "test123" in result["output"] + + # Test pwd + result = json.loads(terminal_tool("pwd", task_id=task_id)) + assert result["exit_code"] == 0, f"pwd failed: {result}" + + # Test file creation and reading + result = json.loads(terminal_tool("echo 'content' > test.txt && cat test.txt", task_id=task_id)) + assert result["exit_code"] == 0, f"File ops failed: {result}" + assert "content" in result["output"] + + cleanup_vm(task_id) + results.record_pass("test_terminal_modal_execute_simple") + except Exception as e: + results.record_fail("test_terminal_modal_execute_simple", str(e)) + finally: + if original_env: + os.environ["TERMINAL_ENV"] = original_env + elif "TERMINAL_ENV" in os.environ: + del os.environ["TERMINAL_ENV"] + + +def test_terminal_modal_persistence(config: TestConfig): + """Test state persists within same task_id.""" + if config.dry_run: + results.record_skip("test_terminal_modal_persistence", "Dry run mode") + return + + from tools.terminal_tool import terminal_tool, cleanup_vm + + original_env = os.environ.get("TERMINAL_ENV") + + try: + os.environ["TERMINAL_ENV"] = "modal" + task_id = f"test-persist-{int(time.time())}" + + # Create a file + result1 = json.loads(terminal_tool("echo 'persistent data' > /workspace/persist.txt", task_id=task_id)) + assert result1["exit_code"] == 0, f"Create file failed: {result1}" + + # Read it in separate call (same task_id) + result2 = json.loads(terminal_tool("cat /workspace/persist.txt", task_id=task_id)) + assert result2["exit_code"] == 0, f"Read file failed: {result2}" + assert "persistent data" in result2["output"] + + cleanup_vm(task_id) + results.record_pass("test_terminal_modal_persistence") + except Exception as e: + results.record_fail("test_terminal_modal_persistence", str(e)) + finally: + if original_env: + os.environ["TERMINAL_ENV"] = original_env + elif "TERMINAL_ENV" in os.environ: + del os.environ["TERMINAL_ENV"] + + +def test_terminal_modal_isolation(config: TestConfig): + """Test different task_ids are isolated.""" + if config.dry_run: + results.record_skip("test_terminal_modal_isolation", "Dry run mode") + return + + from tools.terminal_tool import terminal_tool, cleanup_vm + + original_env = os.environ.get("TERMINAL_ENV") + + try: + os.environ["TERMINAL_ENV"] = "modal" + task_id_1 = f"test-iso-1-{int(time.time())}" + task_id_2 = f"test-iso-2-{int(time.time())}" + + # Create file in task 1 + result1 = json.loads(terminal_tool("echo 'task1' > /workspace/iso.txt", task_id=task_id_1)) + assert result1["exit_code"] == 0, f"Task 1 create failed: {result1}" + + # Create different file in task 2 + result2 = json.loads(terminal_tool("echo 'task2' > /workspace/iso.txt", task_id=task_id_2)) + assert result2["exit_code"] == 0, f"Task 2 create failed: {result2}" + + # Verify task 1 still has its own content + result3 = json.loads(terminal_tool("cat /workspace/iso.txt", task_id=task_id_1)) + assert result3["exit_code"] == 0, f"Task 1 read failed: {result3}" + assert "task1" in result3["output"], f"Task 1 content corrupted: {result3['output']}" + + # Verify task 2 has its content + result4 = json.loads(terminal_tool("cat /workspace/iso.txt", task_id=task_id_2)) + assert result4["exit_code"] == 0, f"Task 2 read failed: {result4}" + assert "task2" in result4["output"], f"Task 2 content corrupted: {result4['output']}" + + cleanup_vm(task_id_1) + cleanup_vm(task_id_2) + results.record_pass("test_terminal_modal_isolation") + except Exception as e: + results.record_fail("test_terminal_modal_isolation", str(e)) + finally: + if original_env: + os.environ["TERMINAL_ENV"] = original_env + elif "TERMINAL_ENV" in os.environ: + del os.environ["TERMINAL_ENV"] + + +# ============================================================================= +# CATEGORY 3: Atropos Modal Backend Tests +# ============================================================================= + +async def test_atropos_backend_lifecycle(config: TestConfig): + """Test ModalToolBackend start/stop lifecycle.""" + if config.dry_run: + results.record_skip("test_atropos_backend_lifecycle", "Dry run mode") + return + + try: + try: + from atropos.backends.modal_backend import ModalToolBackend, ModalSandboxConfig + except (ImportError, ModuleNotFoundError): + ModalToolBackend, ModalSandboxConfig, _, _, _ = try_import_atropos_backend() + + config_obj = ModalSandboxConfig( + app_name="test-lifecycle", + min_sandboxes=1, + max_sandboxes=2, + slots_per_sandbox=3, + ) + + backend = ModalToolBackend(config_obj) + + # Start + await backend.start() + + status = backend.get_status() + assert status["sandboxes"] >= 1, f"Expected at least 1 sandbox, got {status}" + assert status["slots_per_sandbox"] == 3 + + # Stop + await backend.stop(purge=True) + + results.record_pass("test_atropos_backend_lifecycle") + except ImportError as e: + results.record_skip("test_atropos_backend_lifecycle", f"Requires atroposlib: pip install -e '.[atropos]'") + except Exception as e: + results.record_fail("test_atropos_backend_lifecycle", str(e)) + + +async def test_atropos_slot_acquire_release(config: TestConfig): + """Test slot acquisition and release.""" + if config.dry_run: + results.record_skip("test_atropos_slot_acquire_release", "Dry run mode") + return + + try: + try: + from atropos.backends.modal_backend import ModalToolBackend, ModalSandboxConfig + except (ImportError, ModuleNotFoundError): + ModalToolBackend, ModalSandboxConfig, _, _, _ = try_import_atropos_backend() + + config_obj = ModalSandboxConfig( + app_name="test-slots", + min_sandboxes=1, + max_sandboxes=2, + slots_per_sandbox=5, + ) + + backend = ModalToolBackend(config_obj) + await backend.start() + + try: + # Acquire slot + slot = await backend.acquire("trajectory-1") + + assert slot is not None + assert slot.trajectory_id == "trajectory-1" + assert "/data/" in slot.workspace_dir + + # Check status shows slot in use + status = backend.get_status() + assert status["available_slots"] < status["total_slots"] + + # Release slot + await backend.release(slot) + + # Check slot is available again + status = backend.get_status() + # Note: might need small delay for status update + + results.record_pass("test_atropos_slot_acquire_release") + finally: + await backend.stop(purge=True) + except ImportError as e: + results.record_skip("test_atropos_slot_acquire_release", f"Requires atroposlib: pip install -e '.[atropos]'") + except Exception as e: + results.record_fail("test_atropos_slot_acquire_release", str(e)) + + +async def test_atropos_execute_in_slot(config: TestConfig): + """Test command execution in acquired slot.""" + if config.dry_run: + results.record_skip("test_atropos_execute_in_slot", "Dry run mode") + return + + try: + try: + from atropos.backends.modal_backend import ModalToolBackend, ModalSandboxConfig + except (ImportError, ModuleNotFoundError): + ModalToolBackend, ModalSandboxConfig, _, _, _ = try_import_atropos_backend() + + config_obj = ModalSandboxConfig( + app_name="test-execute", + min_sandboxes=1, + max_sandboxes=1, + slots_per_sandbox=3, + ) + + backend = ModalToolBackend(config_obj) + await backend.start() + + try: + slot = await backend.acquire("test-exec") + + # Execute bash command + results_list = await backend.execute_batch([ + (slot, "bash", {"command": "echo 'hello world'"}) + ]) + + assert len(results_list) == 1 + result = results_list[0] + assert result.success, f"Command failed: {result.error}" + assert "hello world" in result.output + + await backend.release(slot) + results.record_pass("test_atropos_execute_in_slot") + finally: + await backend.stop(purge=True) + except ImportError as e: + results.record_skip("test_atropos_execute_in_slot", f"Requires atroposlib: pip install -e '.[atropos]'") + except Exception as e: + results.record_fail("test_atropos_execute_in_slot", str(e)) + + +async def test_atropos_batched_execution(config: TestConfig): + """Test batched parallel execution across multiple slots.""" + if config.dry_run: + results.record_skip("test_atropos_batched_execution", "Dry run mode") + return + + try: + try: + from atropos.backends.modal_backend import ModalToolBackend, ModalSandboxConfig + except (ImportError, ModuleNotFoundError): + ModalToolBackend, ModalSandboxConfig, _, _, _ = try_import_atropos_backend() + + config_obj = ModalSandboxConfig( + app_name="test-batch", + min_sandboxes=1, + max_sandboxes=2, + slots_per_sandbox=5, + ) + + backend = ModalToolBackend(config_obj) + await backend.start() + + try: + # Acquire multiple slots + slots = [] + for i in range(3): + slot = await backend.acquire(f"batch-{i}") + slots.append(slot) + + # Execute batch of commands + start_time = time.time() + results_list = await backend.execute_batch([ + (slots[0], "bash", {"command": "sleep 1 && echo 'slot0'"}), + (slots[1], "bash", {"command": "sleep 1 && echo 'slot1'"}), + (slots[2], "bash", {"command": "sleep 1 && echo 'slot2'"}), + ]) + elapsed = time.time() - start_time + + # All should succeed + assert len(results_list) == 3 + for i, result in enumerate(results_list): + assert result.success, f"Slot {i} failed: {result.error}" + assert f"slot{i}" in result.output + + # Should be parallel - with Modal overhead, allow up to 5s for 3x 1-second sleeps + # (If sequential, would take > 3s just for the sleeps) + assert elapsed < 5.0, f"Batch execution took {elapsed}s, expected < 5.0s (parallel)" + + for slot in slots: + await backend.release(slot) + + results.record_pass("test_atropos_batched_execution") + finally: + await backend.stop(purge=True) + except ImportError as e: + results.record_skip("test_atropos_batched_execution", f"Requires atroposlib: pip install -e '.[atropos]'") + except Exception as e: + results.record_fail("test_atropos_batched_execution", str(e)) + + +async def test_atropos_slot_workspace_isolation(config: TestConfig): + """Test workspace isolation between slots.""" + if config.dry_run: + results.record_skip("test_atropos_slot_workspace_isolation", "Dry run mode") + return + + try: + try: + from atropos.backends.modal_backend import ModalToolBackend, ModalSandboxConfig + except (ImportError, ModuleNotFoundError): + ModalToolBackend, ModalSandboxConfig, _, _, _ = try_import_atropos_backend() + + config_obj = ModalSandboxConfig( + app_name="test-isolation", + min_sandboxes=1, + max_sandboxes=1, + slots_per_sandbox=3, + ) + + backend = ModalToolBackend(config_obj) + await backend.start() + + try: + slot1 = await backend.acquire("iso-1") + slot2 = await backend.acquire("iso-2") + + # Write different content to each slot + await backend.execute_batch([ + (slot1, "bash", {"command": "echo 'content1' > test.txt"}), + (slot2, "bash", {"command": "echo 'content2' > test.txt"}), + ]) + + # Read back and verify isolation + results_list = await backend.execute_batch([ + (slot1, "bash", {"command": "cat test.txt"}), + (slot2, "bash", {"command": "cat test.txt"}), + ]) + + assert "content1" in results_list[0].output, f"Slot 1 content wrong: {results_list[0].output}" + assert "content2" in results_list[1].output, f"Slot 2 content wrong: {results_list[1].output}" + + await backend.release(slot1) + await backend.release(slot2) + + results.record_pass("test_atropos_slot_workspace_isolation") + finally: + await backend.stop(purge=True) + except ImportError as e: + results.record_skip("test_atropos_slot_workspace_isolation", f"Requires atroposlib: pip install -e '.[atropos]'") + except Exception as e: + results.record_fail("test_atropos_slot_workspace_isolation", str(e)) + + +async def test_atropos_workspace_reset(config: TestConfig): + """Test workspace reset on slot release.""" + if config.dry_run: + results.record_skip("test_atropos_workspace_reset", "Dry run mode") + return + + try: + try: + from atropos.backends.modal_backend import ModalToolBackend, ModalSandboxConfig + except (ImportError, ModuleNotFoundError): + ModalToolBackend, ModalSandboxConfig, _, _, _ = try_import_atropos_backend() + + config_obj = ModalSandboxConfig( + app_name="test-reset", + min_sandboxes=1, + max_sandboxes=1, + slots_per_sandbox=2, + ) + + backend = ModalToolBackend(config_obj) + await backend.start() + + try: + # Acquire, create file, release with reset + slot = await backend.acquire("reset-test") + slot_id = slot.slot_id + + await backend.execute_batch([ + (slot, "bash", {"command": "echo 'should be deleted' > test.txt"}), + ]) + + await backend.release(slot, reset_workspace=True) + + # Re-acquire (might get same slot) + slot2 = await backend.acquire("reset-test-2") + + # Check file doesn't exist (or we got different slot) + result = await backend.execute_batch([ + (slot2, "bash", {"command": "cat test.txt 2>/dev/null || echo 'file not found'"}), + ]) + + # Either file not found OR different slot + output = result[0].output + if slot2.slot_id == slot_id: + assert "file not found" in output or not result[0].success, f"File should be deleted: {output}" + + await backend.release(slot2) + results.record_pass("test_atropos_workspace_reset") + finally: + await backend.stop(purge=True) + except ImportError as e: + results.record_skip("test_atropos_workspace_reset", f"Requires atroposlib: pip install -e '.[atropos]'") + except Exception as e: + results.record_fail("test_atropos_workspace_reset", str(e)) + + +async def test_atropos_multi_profile(config: TestConfig): + """Test multi-profile support with different resources.""" + if config.dry_run: + results.record_skip("test_atropos_multi_profile", "Dry run mode") + return + + try: + try: + from atropos.backends.modal_backend import ModalToolBackend, ModalSandboxConfig + except (ImportError, ModuleNotFoundError): + ModalToolBackend, ModalSandboxConfig, _, _ = try_import_atropos_backend() + + # Create backend with multiple profiles + backend = ModalToolBackend.with_profiles( + app_name="test-multiprofile", + profiles={ + "default": ModalSandboxConfig( + name="default", + image="python:3.11", + cpu=1.0, + memory=2048, + min_sandboxes=1, + max_sandboxes=2, + slots_per_sandbox=3, + ), + "compute": ModalSandboxConfig( + name="compute", + image="python:3.11", + cpu=2.0, + memory=4096, + min_sandboxes=0, # Start on demand + max_sandboxes=1, + slots_per_sandbox=2, + ), + }, + default_profile="default", + ) + + await backend.start(profiles_to_start=["default"]) + + try: + # List profiles + profiles = backend.list_profiles() + assert "default" in profiles + assert "compute" in profiles + assert profiles["default"]["active"] == True + assert profiles["compute"]["active"] == False # Not started yet + + # Acquire from default profile + slot1 = await backend.acquire("traj-1", profile="default") + assert slot1 is not None + + # Acquire from compute profile (should start it on demand) + slot2 = await backend.acquire("traj-2", profile="compute") + assert slot2 is not None + + # Execute on both + results_list = await backend.execute_batch([ + (slot1, "bash", {"command": "python --version"}), + (slot2, "bash", {"command": "python --version"}), + ]) + + assert results_list[0].success + assert results_list[1].success + + await backend.release(slot1) + await backend.release(slot2) + + # Check status shows both profiles + status = backend.get_status() + assert "default" in status["pools"] + assert "compute" in status["pools"] + + results.record_pass("test_atropos_multi_profile") + finally: + await backend.stop(purge=True) + except ImportError as e: + results.record_skip("test_atropos_multi_profile", f"Requires atroposlib: pip install -e '.[atropos]'") + except Exception as e: + results.record_fail("test_atropos_multi_profile", str(e)) + + +async def test_atropos_cross_profile_batch(config: TestConfig): + """Test batched execution across different profiles.""" + if config.dry_run: + results.record_skip("test_atropos_cross_profile_batch", "Dry run mode") + return + + try: + try: + from atropos.backends.modal_backend import ModalToolBackend, ModalSandboxConfig + except (ImportError, ModuleNotFoundError): + ModalToolBackend, ModalSandboxConfig, _, _ = try_import_atropos_backend() + + backend = ModalToolBackend.with_profiles( + app_name="test-crossprofile", + profiles={ + "profile-a": ModalSandboxConfig( + name="profile-a", + min_sandboxes=1, + max_sandboxes=1, + slots_per_sandbox=2, + ), + "profile-b": ModalSandboxConfig( + name="profile-b", + min_sandboxes=1, + max_sandboxes=1, + slots_per_sandbox=2, + ), + }, + default_profile="profile-a", + ) + + await backend.start(profiles_to_start=["profile-a", "profile-b"]) + + try: + slot_a = await backend.acquire("traj-a", profile="profile-a") + slot_b = await backend.acquire("traj-b", profile="profile-b") + + # Batch execute across profiles + results_list = await backend.execute_batch([ + (slot_a, "bash", {"command": "echo 'from-a'"}), + (slot_b, "bash", {"command": "echo 'from-b'"}), + ]) + + assert len(results_list) == 2 + assert "from-a" in results_list[0].output + assert "from-b" in results_list[1].output + + await backend.release(slot_a) + await backend.release(slot_b) + + results.record_pass("test_atropos_cross_profile_batch") + finally: + await backend.stop(purge=True) + except ImportError as e: + results.record_skip("test_atropos_cross_profile_batch", f"Requires atroposlib: pip install -e '.[atropos]'") + except Exception as e: + results.record_fail("test_atropos_cross_profile_batch", str(e)) + + +async def test_atropos_artifact_helpers(config: TestConfig): + """Test read_artifact, list_artifacts, archive_artifacts.""" + if config.dry_run: + results.record_skip("test_atropos_artifact_helpers", "Dry run mode") + return + + try: + try: + from atropos.backends.modal_backend import ModalToolBackend, ModalSandboxConfig + except (ImportError, ModuleNotFoundError): + ModalToolBackend, ModalSandboxConfig, _, _, _ = try_import_atropos_backend() + + config_obj = ModalSandboxConfig( + app_name="test-artifacts", + min_sandboxes=1, + max_sandboxes=1, + slots_per_sandbox=2, + ) + + backend = ModalToolBackend(config_obj) + await backend.start() + + try: + slot = await backend.acquire("artifact-test") + + # Create test files + await backend.execute_batch([ + (slot, "bash", {"command": "echo 'hello' > file1.txt && echo 'world' > file2.txt && mkdir subdir && echo 'nested' > subdir/file3.txt"}), + ]) + + # Test read_artifact + content = await backend.read_artifact(slot, "file1.txt") + assert content["success"] + assert "hello" in content["content"] + + # Test list_artifacts + listing = await backend.list_artifacts(slot, ".", recursive=False) + assert listing["success"] + assert "file1.txt" in listing["entries"] or any("file1" in e for e in listing["entries"]) + + # Test archive_artifacts + archive = await backend.archive_artifacts(slot, ".", archive_format="tar.gz") + assert archive["success"] + assert len(archive["archive_base64"]) > 0 + + await backend.release(slot) + results.record_pass("test_atropos_artifact_helpers") + finally: + await backend.stop(purge=True) + except ImportError as e: + results.record_skip("test_atropos_artifact_helpers", f"Requires atroposlib: pip install -e '.[atropos]'") + except Exception as e: + results.record_fail("test_atropos_artifact_helpers", str(e)) + + +# ============================================================================= +# Test Runner +# ============================================================================= + +def run_sync_tests(config: TestConfig): + """Run synchronous tests.""" + print("\n" + "="*60) + print("SYNCHRONOUS TESTS") + print("="*60) + + if config.category in (None, "profiles"): + print("\n--- Profile Configuration Tests ---") + test_profile_loading_from_env() + test_profile_loading_from_yaml() + test_profile_defaults() + test_atropos_config_with_app_name() + + if config.category in (None, "terminal"): + print("\n--- Terminal Tool Modal Tests ---") + test_terminal_modal_pool_manager_singleton() + test_terminal_create_environment_modal() + test_terminal_tool_profile_parameter(config) + test_terminal_modal_execute_simple(config) + test_terminal_modal_persistence(config) + test_terminal_modal_isolation(config) + + +async def run_async_tests(config: TestConfig): + """Run asynchronous tests.""" + print("\n" + "="*60) + print("ASYNCHRONOUS TESTS (Atropos Backend)") + print("="*60) + + if config.category in (None, "atropos"): + print("\n--- Backend Lifecycle Tests ---") + await test_atropos_backend_lifecycle(config) + + print("\n--- Slot Management Tests ---") + await test_atropos_slot_acquire_release(config) + await test_atropos_execute_in_slot(config) + await test_atropos_batched_execution(config) + await test_atropos_slot_workspace_isolation(config) + await test_atropos_workspace_reset(config) + + print("\n--- Multi-Profile Tests ---") + await test_atropos_multi_profile(config) + await test_atropos_cross_profile_batch(config) + + print("\n--- Artifact Helper Tests ---") + await test_atropos_artifact_helpers(config) + + +def main(): + import argparse + + parser = argparse.ArgumentParser(description="Modal Integration Test Suite") + parser.add_argument("--dry-run", action="store_true", help="Skip tests requiring Modal") + parser.add_argument("--category", choices=["terminal", "atropos", "profiles"], help="Run specific category") + parser.add_argument("--verbose", action="store_true", default=True) + args = parser.parse_args() + + config = TestConfig( + dry_run=args.dry_run, + verbose=args.verbose, + category=args.category, + ) + + print("="*60) + print("MODAL INTEGRATION TEST SUITE") + print("="*60) + print(f"Mode: {'DRY RUN' if config.dry_run else 'LIVE'}") + print(f"Category: {config.category or 'ALL'}") + + # Run sync tests + run_sync_tests(config) + + # Run async tests + asyncio.run(run_async_tests(config)) + + # Summary + success = results.summary() + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/tests/test_modal_stress.py b/tests/test_modal_stress.py new file mode 100644 index 0000000000..a819aac642 --- /dev/null +++ b/tests/test_modal_stress.py @@ -0,0 +1,923 @@ +#!/usr/bin/env python3 +""" +Modal Integration Stress Tests & Full Integration Tests + +This test suite includes: +1. Stress tests for Modal sandbox pools (concurrent load, scaling) +2. Atropos backend tests (requires atroposlib) +3. mini-swe-agent integration tests + +Prerequisites: + # Install dev dependencies + pip install -e '.[dev,modal]' + + # Install atroposlib for Atropos tests + pip install -e '.[atropos]' + + # Clone mini-swe-agent (if not present) + git clone https://github.com/anthropics/mini-swe-agent.git mini-swe-agent + # Or as submodule: + git submodule add https://github.com/anthropics/mini-swe-agent.git mini-swe-agent + +Run with: + # All tests + python tests/test_modal_stress.py + + # Stress tests only + python tests/test_modal_stress.py --category stress + + # Atropos tests only + python tests/test_modal_stress.py --category atropos + + # Mini-swe-agent tests only + python tests/test_modal_stress.py --category miniswe + + # Dry run (no Modal calls) + python tests/test_modal_stress.py --dry-run +""" + +import asyncio +import json +import os +import sys +import time +import random +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from typing import Dict, Any, List, Optional, Tuple +from dataclasses import dataclass + +# Add parent to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +# ============================================================================= +# Test Configuration +# ============================================================================= + +@dataclass +class StressTestConfig: + dry_run: bool = False + verbose: bool = True + category: Optional[str] = None + # Stress test parameters (reduced defaults for faster first-run) + concurrent_tasks: int = 3 # Start small - Modal cold starts are slow + total_operations: int = 10 + max_sandboxes: int = 3 + slots_per_sandbox: int = 3 + + +# ============================================================================= +# Test Results Tracking +# ============================================================================= + +class TestResults: + def __init__(self): + self.passed: List[str] = [] + self.failed: List[Tuple[str, str]] = [] + self.skipped: List[Tuple[str, str]] = [] + self.metrics: Dict[str, Any] = {} + + def record_pass(self, name: str, metrics: Optional[Dict] = None): + self.passed.append(name) + if metrics: + self.metrics[name] = metrics + print(f" ✅ {name}") + if metrics: + for k, v in metrics.items(): + print(f" 📊 {k}: {v}") + + def record_fail(self, name: str, error: str): + self.failed.append((name, error)) + print(f" ❌ {name}: {error}") + + def record_skip(self, name: str, reason: str): + self.skipped.append((name, reason)) + print(f" ⏭️ {name}: {reason}") + + def summary(self): + total = len(self.passed) + len(self.failed) + len(self.skipped) + print(f"\n{'='*70}") + print(f"STRESS TEST RESULTS: {len(self.passed)}/{total} passed") + print(f" Passed: {len(self.passed)}") + print(f" Failed: {len(self.failed)}") + print(f" Skipped: {len(self.skipped)}") + + if self.failed: + print(f"\nFailed tests:") + for name, error in self.failed: + print(f" - {name}: {error}") + + if self.metrics: + print(f"\nPerformance Metrics:") + for test, metrics in self.metrics.items(): + print(f" {test}:") + for k, v in metrics.items(): + print(f" - {k}: {v}") + + return len(self.failed) == 0 + + +results = TestResults() + + +# ============================================================================= +# Helper: Atropos Import +# ============================================================================= + +def try_import_atropos(): + """Try importing Atropos backend components.""" + try: + from atropos.backends.modal_backend import ( + ModalToolBackend, ModalSandboxConfig, + _ModalMultiProfileManager + ) + from atropos.slots.slot import Slot, SlotState + return ModalToolBackend, ModalSandboxConfig, Slot, SlotState + except (ImportError, ModuleNotFoundError) as e: + return None + + +def try_import_miniswe(): + """Try importing mini-swe-agent components.""" + try: + # Check if mini-swe-agent path exists and has content + mini_swe_path = Path(__file__).parent.parent / "mini-swe-agent" / "src" + if mini_swe_path.exists() and list(mini_swe_path.iterdir()): + sys.path.insert(0, str(mini_swe_path)) + import minisweagent + return minisweagent + return None + except (ImportError, ModuleNotFoundError) as e: + return None + + +# ============================================================================= +# CATEGORY 1: Stress Tests (Terminal Tool) +# ============================================================================= + +def test_stress_concurrent_tasks(config: StressTestConfig): + """Stress test: Multiple concurrent task_ids hitting the pool.""" + if config.dry_run: + results.record_skip("test_stress_concurrent_tasks", "Dry run mode") + return + + from tools.terminal_tool import terminal_tool, cleanup_vm + + original_env = os.environ.get("TERMINAL_ENV") + os.environ["TERMINAL_ENV"] = "modal" + + try: + num_tasks = config.concurrent_tasks + task_ids = [f"stress-concurrent-{i}-{int(time.time())}" for i in range(num_tasks)] + + start_time = time.time() + errors = [] + successes = 0 + + def run_task(task_id: str) -> Tuple[bool, str]: + try: + result = json.loads(terminal_tool( + f"echo 'Hello from {task_id}' && sleep 0.5", + task_id=task_id, + )) + success = result["exit_code"] == 0 + + # IMPORTANT: Clean up immediately after task completes + # This releases the sandbox back to the pool for other tasks + try: + cleanup_vm(task_id) + except: + pass + + if success: + return True, "" + # Include more details for debugging + error_detail = result.get("error", "no error message") + output = result.get("output", "")[:100] # First 100 chars + return False, f"Exit code: {result['exit_code']}, error: {error_detail}, output: {output}" + except Exception as e: + # Clean up even on failure + try: + cleanup_vm(task_id) + except: + pass + import traceback + return False, f"Exception: {str(e)}\n{traceback.format_exc()}" + + # Run all tasks concurrently using threads + with ThreadPoolExecutor(max_workers=num_tasks) as executor: + futures = {executor.submit(run_task, tid): tid for tid in task_ids} + + for future in as_completed(futures): + task_id = futures[future] + try: + success, error = future.result(timeout=60) + if success: + successes += 1 + else: + errors.append(f"{task_id}: {error}") + except Exception as e: + errors.append(f"{task_id}: {str(e)}") + + elapsed = time.time() - start_time + + # No need for cleanup here - each task cleans up immediately + + # Report + success_rate = successes / num_tasks * 100 + + if success_rate >= 90: # Allow 10% failure rate for stress test + results.record_pass("test_stress_concurrent_tasks", { + "concurrent_tasks": num_tasks, + "successes": successes, + "failures": len(errors), + "success_rate": f"{success_rate:.1f}%", + "total_time": f"{elapsed:.2f}s", + "avg_time_per_task": f"{elapsed/num_tasks:.2f}s", + }) + else: + results.record_fail( + "test_stress_concurrent_tasks", + f"Success rate {success_rate:.1f}% < 90%. Errors: {errors[:3]}" + ) + + except Exception as e: + results.record_fail("test_stress_concurrent_tasks", str(e)) + finally: + if original_env: + os.environ["TERMINAL_ENV"] = original_env + elif "TERMINAL_ENV" in os.environ: + del os.environ["TERMINAL_ENV"] + + +def test_stress_rapid_fire(config: StressTestConfig): + """Stress test: Rapid sequential commands to same task_id.""" + if config.dry_run: + results.record_skip("test_stress_rapid_fire", "Dry run mode") + return + + from tools.terminal_tool import terminal_tool, cleanup_vm + + original_env = os.environ.get("TERMINAL_ENV") + os.environ["TERMINAL_ENV"] = "modal" + + try: + task_id = f"stress-rapid-{int(time.time())}" + num_commands = config.total_operations + + start_time = time.time() + successes = 0 + errors = [] + + for i in range(num_commands): + try: + result = json.loads(terminal_tool(f"echo {i}", task_id=task_id)) + if result["exit_code"] == 0 and str(i) in result["output"]: + successes += 1 + else: + errors.append(f"Command {i}: unexpected result") + except Exception as e: + errors.append(f"Command {i}: {str(e)}") + + elapsed = time.time() - start_time + cleanup_vm(task_id) + + success_rate = successes / num_commands * 100 + commands_per_second = num_commands / elapsed + + if success_rate >= 95: + results.record_pass("test_stress_rapid_fire", { + "total_commands": num_commands, + "successes": successes, + "success_rate": f"{success_rate:.1f}%", + "total_time": f"{elapsed:.2f}s", + "commands_per_second": f"{commands_per_second:.1f}", + }) + else: + results.record_fail( + "test_stress_rapid_fire", + f"Success rate {success_rate:.1f}% < 95%" + ) + + except Exception as e: + results.record_fail("test_stress_rapid_fire", str(e)) + finally: + if original_env: + os.environ["TERMINAL_ENV"] = original_env + elif "TERMINAL_ENV" in os.environ: + del os.environ["TERMINAL_ENV"] + + +def test_stress_pool_scaling(config: StressTestConfig): + """Stress test: Force pool to scale up and down by running tasks in batches.""" + if config.dry_run: + results.record_skip("test_stress_pool_scaling", "Dry run mode") + return + + from tools.terminal_tool import terminal_tool, cleanup_vm, _ModalPoolManager + + original_env = os.environ.get("TERMINAL_ENV") + os.environ["TERMINAL_ENV"] = "modal" + + try: + # Run tasks in batches matching max_sandboxes to test pool reuse + # This verifies sandboxes can be acquired, used, released, and reused + batch_size = config.max_sandboxes + num_batches = 3 + total_tasks = batch_size * num_batches + + start_time = time.time() + successes = 0 + + for batch in range(num_batches): + task_ids = [f"stress-scale-{batch}-{i}-{int(time.time())}" for i in range(batch_size)] + + def run_task(task_id: str): + try: + result = json.loads(terminal_tool( + "echo done", # Fast command to test scaling + task_id=task_id, + )) + success = result["exit_code"] == 0 + try: + cleanup_vm(task_id) + except: + pass + return success + except: + try: + cleanup_vm(task_id) + except: + pass + return False + + # Run batch concurrently + with ThreadPoolExecutor(max_workers=batch_size) as executor: + batch_results = list(executor.map(run_task, task_ids)) + successes += sum(batch_results) + + elapsed = time.time() - start_time + + # Check pool status + try: + manager = _ModalPoolManager.get_instance() + pool_status = manager.get_status() if hasattr(manager, 'get_status') else {} + except: + pool_status = {} + + success_rate = successes / total_tasks * 100 + + if success_rate >= 80: # Allow some tolerance + results.record_pass("test_stress_pool_scaling", { + "total_tasks": total_tasks, + "num_batches": num_batches, + "batch_size": batch_size, + "successes": successes, + "success_rate": f"{success_rate:.1f}%", + "total_time": f"{elapsed:.2f}s", + "pool_status": pool_status, + }) + else: + results.record_fail( + "test_stress_pool_scaling", + f"Success rate {success_rate:.1f}% < 80%" + ) + + except Exception as e: + results.record_fail("test_stress_pool_scaling", str(e)) + finally: + if original_env: + os.environ["TERMINAL_ENV"] = original_env + elif "TERMINAL_ENV" in os.environ: + del os.environ["TERMINAL_ENV"] + + +def test_stress_large_output(config: StressTestConfig): + """Stress test: Commands producing large output.""" + if config.dry_run: + results.record_skip("test_stress_large_output", "Dry run mode") + return + + from tools.terminal_tool import terminal_tool, cleanup_vm + + original_env = os.environ.get("TERMINAL_ENV") + os.environ["TERMINAL_ENV"] = "modal" + + try: + task_id = f"stress-large-{int(time.time())}" + + # First verify basic connectivity with simple command + warmup = json.loads(terminal_tool("echo warmup", task_id=task_id)) + if warmup["exit_code"] != 0: + results.record_fail( + "test_stress_large_output", + f"Warmup failed: {warmup.get('error', 'unknown')}" + ) + return + + # Generate output - use seq which is more portable + start_time = time.time() + result = json.loads(terminal_tool( + 'seq 1 500 | while read i; do echo "Line $i: This is test content for large output"; done', + task_id=task_id, + timeout=60, + )) + elapsed = time.time() - start_time + + cleanup_vm(task_id) + + output_size = len(result.get("output", "")) + error_msg = result.get("error", "") + + if result["exit_code"] == 0 and output_size > 5000: + results.record_pass("test_stress_large_output", { + "output_size": f"{output_size:,} bytes", + "time": f"{elapsed:.2f}s", + "throughput": f"{output_size/elapsed/1024:.1f} KB/s" if elapsed > 0 else "N/A", + }) + else: + results.record_fail( + "test_stress_large_output", + f"Exit code: {result['exit_code']}, output size: {output_size}, error: {error_msg}" + ) + + except Exception as e: + import traceback + results.record_fail("test_stress_large_output", f"{str(e)}\n{traceback.format_exc()}") + finally: + try: + cleanup_vm(task_id) + except: + pass + if original_env: + os.environ["TERMINAL_ENV"] = original_env + elif "TERMINAL_ENV" in os.environ: + del os.environ["TERMINAL_ENV"] + + +def test_stress_error_recovery(config: StressTestConfig): + """Stress test: Commands that fail and verify sandbox continues working.""" + if config.dry_run: + results.record_skip("test_stress_error_recovery", "Dry run mode") + return + + from tools.terminal_tool import terminal_tool, cleanup_vm + + original_env = os.environ.get("TERMINAL_ENV") + os.environ["TERMINAL_ENV"] = "modal" + + try: + task_id = f"stress-error-{int(time.time())}" + + # Run some failing commands + failing_commands = [ + "exit 1", + "false", + "cat /nonexistent/file", + "command_that_does_not_exist", + ] + + for cmd in failing_commands: + result = json.loads(terminal_tool(cmd, task_id=task_id)) + # These should fail but not crash + assert result["exit_code"] != 0 or result.get("error"), f"Expected failure for: {cmd}" + + # Now run a command that should succeed + result = json.loads(terminal_tool("echo 'recovery success'", task_id=task_id)) + + cleanup_vm(task_id) + + if result["exit_code"] == 0 and "recovery success" in result["output"]: + results.record_pass("test_stress_error_recovery", { + "failed_commands": len(failing_commands), + "recovery": "success", + }) + else: + results.record_fail( + "test_stress_error_recovery", + f"Recovery failed: {result}" + ) + + except Exception as e: + results.record_fail("test_stress_error_recovery", str(e)) + finally: + if original_env: + os.environ["TERMINAL_ENV"] = original_env + elif "TERMINAL_ENV" in os.environ: + del os.environ["TERMINAL_ENV"] + + +# ============================================================================= +# CATEGORY 2: Atropos Backend Stress Tests +# ============================================================================= + +async def test_atropos_stress_slot_churn(config: StressTestConfig): + """Atropos stress test: Rapid slot acquire/release cycles.""" + if config.dry_run: + results.record_skip("test_atropos_stress_slot_churn", "Dry run mode") + return + + imports = try_import_atropos() + if imports is None: + results.record_skip("test_atropos_stress_slot_churn", "Requires atroposlib") + return + + ModalToolBackend, ModalSandboxConfig, _, _ = imports + + try: + backend_config = ModalSandboxConfig( + app_name=f"stress-churn-{int(time.time())}", + min_sandboxes=1, + max_sandboxes=3, + slots_per_sandbox=5, + ) + + backend = ModalToolBackend(backend_config) + await backend.start() + + try: + num_cycles = config.total_operations + start_time = time.time() + successes = 0 + + for i in range(num_cycles): + try: + slot = await backend.acquire(f"churn-{i}") + + # Quick command + results_list = await backend.execute_batch([ + (slot, "bash", {"command": f"echo {i}"}) + ]) + + if results_list[0].success: + successes += 1 + + await backend.release(slot, reset_workspace=(i % 5 == 0)) + except Exception as e: + pass # Count as failure + + elapsed = time.time() - start_time + success_rate = successes / num_cycles * 100 + + if success_rate >= 90: + results.record_pass("test_atropos_stress_slot_churn", { + "cycles": num_cycles, + "successes": successes, + "success_rate": f"{success_rate:.1f}%", + "total_time": f"{elapsed:.2f}s", + "cycles_per_second": f"{num_cycles/elapsed:.1f}", + }) + else: + results.record_fail( + "test_atropos_stress_slot_churn", + f"Success rate {success_rate:.1f}% < 90%" + ) + + finally: + await backend.stop(purge=True) + + except Exception as e: + results.record_fail("test_atropos_stress_slot_churn", str(e)) + + +async def test_atropos_stress_parallel_batches(config: StressTestConfig): + """Atropos stress test: Multiple parallel batch executions.""" + if config.dry_run: + results.record_skip("test_atropos_stress_parallel_batches", "Dry run mode") + return + + imports = try_import_atropos() + if imports is None: + results.record_skip("test_atropos_stress_parallel_batches", "Requires atroposlib") + return + + ModalToolBackend, ModalSandboxConfig, _, _ = imports + + try: + backend_config = ModalSandboxConfig( + app_name=f"stress-batch-{int(time.time())}", + min_sandboxes=2, + max_sandboxes=4, + slots_per_sandbox=5, + ) + + backend = ModalToolBackend(backend_config) + await backend.start() + + try: + num_slots = 10 + slots = [] + + # Acquire multiple slots + for i in range(num_slots): + slot = await backend.acquire(f"batch-{i}") + slots.append(slot) + + # Run multiple batches in parallel + start_time = time.time() + num_batches = 5 + + async def run_batch(batch_id: int): + requests = [ + (slot, "bash", {"command": f"echo 'batch{batch_id}-slot{i}'"}) + for i, slot in enumerate(slots) + ] + return await backend.execute_batch(requests) + + batch_tasks = [run_batch(i) for i in range(num_batches)] + all_results = await asyncio.gather(*batch_tasks) + + elapsed = time.time() - start_time + + # Count successes + total_commands = num_batches * num_slots + successes = sum( + 1 for batch_result in all_results + for r in batch_result + if r.success + ) + + # Release slots + for slot in slots: + await backend.release(slot) + + success_rate = successes / total_commands * 100 + + if success_rate >= 90: + results.record_pass("test_atropos_stress_parallel_batches", { + "batches": num_batches, + "slots": num_slots, + "total_commands": total_commands, + "successes": successes, + "success_rate": f"{success_rate:.1f}%", + "total_time": f"{elapsed:.2f}s", + "commands_per_second": f"{total_commands/elapsed:.1f}", + }) + else: + results.record_fail( + "test_atropos_stress_parallel_batches", + f"Success rate {success_rate:.1f}% < 90%" + ) + + finally: + await backend.stop(purge=True) + + except Exception as e: + results.record_fail("test_atropos_stress_parallel_batches", str(e)) + + +async def test_atropos_stress_multi_profile_load(config: StressTestConfig): + """Atropos stress test: Load across multiple profiles.""" + if config.dry_run: + results.record_skip("test_atropos_stress_multi_profile_load", "Dry run mode") + return + + imports = try_import_atropos() + if imports is None: + results.record_skip("test_atropos_stress_multi_profile_load", "Requires atroposlib") + return + + ModalToolBackend, ModalSandboxConfig, _, _ = imports + + try: + backend = ModalToolBackend.with_profiles( + app_name=f"stress-multiprofile-{int(time.time())}", + profiles={ + "cpu-light": ModalSandboxConfig( + name="cpu-light", + cpu=0.5, + memory=1024, + min_sandboxes=1, + max_sandboxes=2, + slots_per_sandbox=5, + ), + "cpu-heavy": ModalSandboxConfig( + name="cpu-heavy", + cpu=2.0, + memory=4096, + min_sandboxes=0, + max_sandboxes=2, + slots_per_sandbox=3, + ), + } + ) + + await backend.start(profiles_to_start=["cpu-light", "cpu-heavy"]) + + try: + num_tasks_per_profile = 5 + slots = [] + + # Acquire from both profiles + for i in range(num_tasks_per_profile): + light_slot = await backend.acquire(f"light-{i}", profile="cpu-light") + heavy_slot = await backend.acquire(f"heavy-{i}", profile="cpu-heavy") + slots.append((light_slot, "cpu-light")) + slots.append((heavy_slot, "cpu-heavy")) + + # Execute batch across all profiles + start_time = time.time() + + requests = [ + (slot, "bash", {"command": f"echo 'profile={profile}'"}) + for slot, profile in slots + ] + + batch_results = await backend.execute_batch(requests) + elapsed = time.time() - start_time + + successes = sum(1 for r in batch_results if r.success) + + # Release all + for slot, _ in slots: + await backend.release(slot) + + status = backend.get_status() + + success_rate = successes / len(slots) * 100 + + if success_rate >= 90: + results.record_pass("test_atropos_stress_multi_profile_load", { + "profiles": 2, + "tasks_per_profile": num_tasks_per_profile, + "total_tasks": len(slots), + "successes": successes, + "success_rate": f"{success_rate:.1f}%", + "time": f"{elapsed:.2f}s", + "status": status, + }) + else: + results.record_fail( + "test_atropos_stress_multi_profile_load", + f"Success rate {success_rate:.1f}% < 90%" + ) + + finally: + await backend.stop(purge=True) + + except Exception as e: + results.record_fail("test_atropos_stress_multi_profile_load", str(e)) + + +# ============================================================================= +# CATEGORY 3: Mini-SWE-Agent Integration Tests +# ============================================================================= + +def test_miniswe_environment_available(): + """Check if mini-swe-agent is properly set up.""" + mini_swe_path = Path(__file__).parent.parent / "mini-swe-agent" / "src" + + if not mini_swe_path.exists(): + results.record_skip( + "test_miniswe_environment_available", + "mini-swe-agent not found. Run: git clone https://github.com/anthropics/mini-swe-agent.git mini-swe-agent" + ) + return + + if not list(mini_swe_path.iterdir()): + results.record_skip( + "test_miniswe_environment_available", + "mini-swe-agent directory is empty. Run: git submodule update --init" + ) + return + + miniswe = try_import_miniswe() + if miniswe is None: + results.record_fail( + "test_miniswe_environment_available", + "Failed to import minisweagent module" + ) + return + + results.record_pass("test_miniswe_environment_available", { + "path": str(mini_swe_path), + "module": miniswe.__name__, + }) + + +def test_miniswe_modal_backend(config: StressTestConfig): + """Test mini-swe-agent with Modal backend.""" + if config.dry_run: + results.record_skip("test_miniswe_modal_backend", "Dry run mode") + return + + miniswe = try_import_miniswe() + if miniswe is None: + results.record_skip( + "test_miniswe_modal_backend", + "mini-swe-agent not available" + ) + return + + try: + # Check if ModalEnvironment exists in minisweagent + if not hasattr(miniswe, 'ModalEnvironment'): + results.record_skip( + "test_miniswe_modal_backend", + "minisweagent.ModalEnvironment not found" + ) + return + + # Create Modal environment + env = miniswe.ModalEnvironment( + image="python:3.11", + timeout=60, + ) + + # Execute a command + result = env.execute("echo 'Hello from mini-swe-agent Modal'") + + env.cleanup() + + if "Hello from mini-swe-agent Modal" in str(result): + results.record_pass("test_miniswe_modal_backend") + else: + results.record_fail( + "test_miniswe_modal_backend", + f"Unexpected result: {result}" + ) + + except Exception as e: + results.record_fail("test_miniswe_modal_backend", str(e)) + + +# ============================================================================= +# Test Runner +# ============================================================================= + +def run_sync_tests(config: StressTestConfig): + """Run synchronous tests.""" + if config.category in (None, "stress"): + print("\n" + "="*70) + print("STRESS TESTS (Terminal Tool)") + print("="*70) + + test_stress_concurrent_tasks(config) + test_stress_rapid_fire(config) + test_stress_pool_scaling(config) + test_stress_large_output(config) + test_stress_error_recovery(config) + + if config.category in (None, "miniswe"): + print("\n" + "="*70) + print("MINI-SWE-AGENT INTEGRATION TESTS") + print("="*70) + + test_miniswe_environment_available() + test_miniswe_modal_backend(config) + + +async def run_async_tests(config: StressTestConfig): + """Run asynchronous tests.""" + if config.category in (None, "atropos"): + print("\n" + "="*70) + print("ATROPOS BACKEND STRESS TESTS") + print("="*70) + + await test_atropos_stress_slot_churn(config) + await test_atropos_stress_parallel_batches(config) + await test_atropos_stress_multi_profile_load(config) + + +def main(): + import argparse + + parser = argparse.ArgumentParser(description="Modal Stress Test Suite") + parser.add_argument("--dry-run", action="store_true", help="Skip tests requiring Modal") + parser.add_argument("--category", choices=["stress", "atropos", "miniswe"], help="Run specific category") + parser.add_argument("--concurrent", type=int, default=10, help="Number of concurrent tasks") + parser.add_argument("--operations", type=int, default=50, help="Total operations for stress tests") + parser.add_argument("--verbose", action="store_true", default=True) + args = parser.parse_args() + + config = StressTestConfig( + dry_run=args.dry_run, + verbose=args.verbose, + category=args.category, + concurrent_tasks=args.concurrent, + total_operations=args.operations, + ) + + print("="*70) + print("MODAL STRESS & INTEGRATION TEST SUITE") + print("="*70) + print(f"Mode: {'DRY RUN' if config.dry_run else 'LIVE'}") + print(f"Category: {config.category or 'ALL'}") + print(f"Concurrent tasks: {config.concurrent_tasks}") + print(f"Total operations: {config.total_operations}") + + # Run sync tests + run_sync_tests(config) + + # Run async tests + asyncio.run(run_async_tests(config)) + + # Summary + success = results.summary() + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/tests/test_modal_terminal.py b/tests/test_modal_terminal.py index c9f7406f03..cd5bcaccf9 100644 --- a/tests/test_modal_terminal.py +++ b/tests/test_modal_terminal.py @@ -236,6 +236,63 @@ def test_environment_isolation(): return isolated +def test_pool_status(): + """Test that the Modal pool manager reports status correctly.""" + print("\n" + "=" * 60) + print("TEST 7: Pool Status") + print("=" * 60) + + try: + # Import pool manager + _ModalPoolManager = terminal_module._ModalPoolManager + + # Get pool manager instance + manager = _ModalPoolManager.get_instance() + status = manager.get_status() + + print(f"\nPool Manager Status:") + print(f" App name: {manager.app_name}") + print(f" Default profile: {manager.default_profile}") + print(f" Available profiles: {list(manager.profiles.keys())}") + print(f" Active pools: {list(status.keys())}") + + for pool_name, pool_status in status.items(): + print(f"\n Pool '{pool_name}':") + print(f" Size: {pool_status['pool_size']}/{pool_status['max_pool']}") + print(f" In use: {pool_status['in_use']}") + print(f" Min pool: {pool_status['min_pool']}") + + print(f"\nTest: ✅ Passed") + return True + except Exception as e: + print(f"\nError: {e}") + print(f"\nTest: ❌ Failed") + return False + + +def test_profile_selection(): + """Test that profile parameter is accepted (even if profile doesn't exist).""" + print("\n" + "=" * 60) + print("TEST 8: Profile Selection") + print("=" * 60) + + test_task_id = "modal_test_profile" + + # Test with default profile (no profile specified) + print("Testing with default profile...") + result = terminal_tool("echo 'default profile'", task_id=test_task_id) + result_json = json.loads(result) + + success = result_json.get('exit_code') == 0 + print(f" Default profile: {'✅' if success else '❌'} (exit code: {result_json.get('exit_code')})") + + # Cleanup + cleanup_vm(test_task_id) + + print(f"\nTest: {'✅ Passed' if success else '❌ Failed'}") + return success + + def main(): """Run all Modal terminal tests.""" print("🧪 Modal Terminal Tool Test Suite") @@ -247,6 +304,8 @@ def main(): print(f" TERMINAL_ENV: {config['env_type']}") print(f" TERMINAL_MODAL_IMAGE: {config['modal_image']}") print(f" TERMINAL_TIMEOUT: {config['timeout']}s") + print(f" TERMINAL_MODAL_APP_NAME: {os.getenv('TERMINAL_MODAL_APP_NAME', 'hermes-sandbox')}") + print(f" TERMINAL_MODAL_DEFAULT_PROFILE: {os.getenv('TERMINAL_MODAL_DEFAULT_PROFILE', 'default')}") if config['env_type'] != 'modal': print(f"\n⚠️ WARNING: TERMINAL_ENV is set to '{config['env_type']}', not 'modal'") @@ -270,6 +329,8 @@ def main(): results['pip_install'] = test_pip_install() results['filesystem_persistence'] = test_filesystem_persistence() results['environment_isolation'] = test_environment_isolation() + results['pool_status'] = test_pool_status() + results['profile_selection'] = test_profile_selection() # Summary print("\n" + "=" * 60) diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index bf0dc6b091..e8ef2c0d83 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -36,8 +36,11 @@ import shutil import subprocess import tempfile import uuid +from dataclasses import dataclass, field from pathlib import Path -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, ClassVar, List + +import yaml # Add mini-swe-agent to path if not installed mini_swe_path = Path(__file__).parent.parent / "mini-swe-agent" / "src" @@ -723,38 +726,582 @@ class _DockerEnvironment: pass -class _ModalEnvironment: + +@dataclass +class ModalProfile: """ - Modal cloud execution environment wrapper with sudo support. + Configuration for a Modal sandbox profile. - Wraps mini-swe-agent's SwerexModalEnvironment but adds: - - SUDO_PASSWORD support via _transform_sudo_command + Each profile defines the container image, resources, and pool scaling behavior. + Different profiles can be used for different workloads - Note: stdin handling is not needed for Modal since it uses remote async execution. + Secrets: + secrets: List of Modal Secret names to inject into the sandbox. + These secrets must be created on Modal dashboard or via CLI. + + env_vars: Dict of environment variables to pass directly to sandbox. + Use for non-sensitive configuration. + Example: {"DEBUG": "1", "LOG_LEVEL": "info"} + + use_dotenv: loads local dotenv + """ + name: str + image: str = "python:3.11" + gpu: Optional[str] = None # None, "T4", "A10G", "A100", "H100" + cpu: float = 1.0 + memory: int = 2048 # MB + min_pool: int = 1 + max_pool: int = 5 + idle_timeout: int = 120 # Modal server-side auto-cleanup (seconds) + max_lifetime: int = 3600 # Max sandbox lifetime (seconds) + scale_down_idle: int = 180 # Client-side scale down threshold (seconds) + workdir: str = "/workspace" + # Secrets and environment variables + secrets: List[str] = field(default_factory=list) # Modal Secret names + env_vars: Dict[str, str] = field(default_factory=dict) # Direct env vars + use_dotenv: bool = False # Load .env file and pass to sandbox + + @classmethod + def from_env(cls, profile_name: str) -> "ModalProfile": + """Load profile configuration from environment variables.""" + prefix = f"TERMINAL_MODAL_PROFILE_{profile_name}_" + + # Parse secrets list from comma-separated string + secrets_str = os.getenv(f"{prefix}SECRETS", "") + secrets = [s.strip() for s in secrets_str.split(",") if s.strip()] + + # Parse env_vars from KEY=VALUE pairs separated by semicolons + env_vars_str = os.getenv(f"{prefix}ENV_VARS", "") + env_vars = {} + if env_vars_str: + for pair in env_vars_str.split(";"): + if "=" in pair: + k, v = pair.split("=", 1) + env_vars[k.strip()] = v.strip() + + return cls( + name=profile_name, + image=os.getenv(f"{prefix}IMAGE", "python:3.11"), + gpu=os.getenv(f"{prefix}GPU"), + cpu=float(os.getenv(f"{prefix}CPU", "1.0")), + memory=int(os.getenv(f"{prefix}MEMORY", "2048")), + min_pool=int(os.getenv(f"{prefix}MIN_POOL", "1")), + max_pool=int(os.getenv(f"{prefix}MAX_POOL", "5")), + idle_timeout=int(os.getenv(f"{prefix}IDLE_TIMEOUT", "120")), + max_lifetime=int(os.getenv(f"{prefix}MAX_LIFETIME", "3600")), + scale_down_idle=int(os.getenv(f"{prefix}SCALE_DOWN_IDLE", "180")), + workdir=os.getenv(f"{prefix}WORKDIR", "/workspace"), + secrets=secrets, + env_vars=env_vars, + use_dotenv=os.getenv(f"{prefix}USE_DOTENV", "").lower() in ("true", "1", "yes"), + ) + + @classmethod + def load_profiles(cls, config_file: Optional[str] = None) -> Dict[str, "ModalProfile"]: + """ + Load all profiles from YAML file or environment variables. + + Priority: + 1. YAML file specified by config_file or TERMINAL_MODAL_PROFILES_FILE + 2. Environment variables with TERMINAL_MODAL_PROFILE__* pattern + 3. Default profile with basic settings + """ + profiles = {} + + # Try YAML file first + yaml_path = config_file or os.getenv("TERMINAL_MODAL_PROFILES_FILE", "modal_profiles.yaml") + if Path(yaml_path).exists(): + try: + with open(yaml_path) as f: + config = yaml.safe_load(f) + for name, cfg in config.get("profiles", {}).items(): + profiles[name] = cls(name=name, **cfg) + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Loaded {len(profiles)} profiles from {yaml_path}") + return profiles + except Exception as e: + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Warning: Failed to load {yaml_path}: {e}") + + # Check for environment variable profiles + # Look for any env vars starting with TERMINAL_MODAL_PROFILE_ + profile_names = set() + for key in os.environ: + if key.startswith("TERMINAL_MODAL_PROFILE_") and "_IMAGE" in key: + # Extract profile name: TERMINAL_MODAL_PROFILE__IMAGE + parts = key.replace("TERMINAL_MODAL_PROFILE_", "").rsplit("_IMAGE", 1) + if parts[0]: + profile_names.add(parts[0]) + + for name in profile_names: + profiles[name] = cls.from_env(name) + + # If no profiles found, create a default one + if not profiles: + default_name = os.getenv("TERMINAL_MODAL_DEFAULT_PROFILE", "default") + profiles[default_name] = cls( + name=default_name, + image=os.getenv("TERMINAL_MODAL_IMAGE", "python:3.11"), + min_pool=int(os.getenv("TERMINAL_MODAL_MIN_POOL", "1")), + max_pool=int(os.getenv("TERMINAL_MODAL_MAX_POOL", "5")), + idle_timeout=int(os.getenv("TERMINAL_MODAL_IDLE_TIMEOUT", "120")), + max_lifetime=int(os.getenv("TERMINAL_MODAL_MAX_LIFETIME", "3600")), + scale_down_idle=int(os.getenv("TERMINAL_MODAL_SCALE_DOWN_IDLE", "180")), + ) + + return profiles + + +class _ModalSandboxPool: + """ + Auto-scaling pool of warm Modal sandboxes for a single profile. + + Features: + - Named sandboxes for recovery after restart + - Reactive scale-up when demand exceeds capacity + - Background scale-down when sandboxes are idle + - Server-side idle_timeout for orphan protection """ - def __init__(self, image: str, cwd: str = "/", timeout: int = 60): - from minisweagent.environments.extra.swerex_modal import SwerexModalEnvironment - self._inner = SwerexModalEnvironment(image=image, cwd=cwd, timeout=timeout) + def __init__(self, profile: ModalProfile, app_name: str): + self.profile = profile + self.app_name = app_name + self._app = None + self._modal_image = None + self._pool: Dict[str, Any] = {} # sandbox_name -> modal.Sandbox + self._in_use: Dict[str, str] = {} # task_id -> sandbox_name + self._last_used: Dict[str, float] = {} # sandbox_name -> timestamp + self._lock = threading.Lock() + self._running = True + self._next_index = 0 + + # Start scale-down monitor if min_pool > 0 (worth keeping warm) + self._monitor_thread = None + if profile.min_pool > 0 or profile.max_pool > 0: + self._monitor_thread = threading.Thread( + target=self._scale_down_monitor, + daemon=True, + name=f"modal-pool-{profile.name}" + ) + self._monitor_thread.start() + + def _get_sandbox_name(self, index: int) -> str: + """Generate a unique sandbox name for this profile.""" + return f"hermes-{self.profile.name}-{index}" + + def _ensure_app(self): + """Lazy initialization of Modal app and image.""" + if self._app is None: + try: + import modal + self._app = modal.App.lookup(self.app_name, create_if_missing=True) + self._modal_image = modal.Image.from_registry(self.profile.image) + except ImportError: + raise ImportError("Modal package not installed. Run: pip install modal") + + def _recover_or_create_sandbox(self, name: str) -> Any: + """ + Try to recover an existing named sandbox, or create a new one. + + Uses Modal's named sandbox feature for recovery after Hermes restart. + Supports Modal Secrets for secure credential injection. + """ + import modal + + # Try to recover existing sandbox + try: + sb = modal.Sandbox.from_name(self.app_name, name) + if sb.poll() is None: # Still running + # Health check - verify sandbox is responsive + try: + sb.exec("echo", "ok", timeout=10) + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Recovered existing sandbox: {name}") + return sb + except Exception: + # Sandbox is not healthy, will create new + pass + except modal.exception.NotFoundError: + pass + except Exception as e: + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Could not recover sandbox {name}: {e}") + + # Build create kwargs based on profile + create_kwargs = { + "app": self._app, + "name": name, + "image": self._modal_image, + "timeout": self.profile.max_lifetime, + "idle_timeout": self.profile.idle_timeout, + "workdir": self.profile.workdir, + } + + # Add resource specs + if self.profile.cpu != 1.0: + create_kwargs["cpu"] = self.profile.cpu + if self.profile.memory != 2048: + create_kwargs["memory"] = self.profile.memory + + # Add GPU if specified + if self.profile.gpu: + create_kwargs["gpu"] = self.profile.gpu + + # Build secrets list + secrets_list = [] + + # Add named secrets from Modal dashboard/CLI + for secret_name in self.profile.secrets: + try: + secrets_list.append(modal.Secret.from_name(secret_name)) + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Adding secret: {secret_name}") + except Exception as e: + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Warning: Could not load secret '{secret_name}': {e}") + + # Add direct environment variables + if self.profile.env_vars: + secrets_list.append(modal.Secret.from_dict(self.profile.env_vars)) + + # Add .env file if requested + if self.profile.use_dotenv: + try: + secrets_list.append(modal.Secret.from_dotenv()) + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Loading .env file into sandbox") + except Exception as e: + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Warning: Could not load .env file: {e}") + + # Add global secrets from environment variable + global_secrets_str = os.getenv("TERMINAL_MODAL_SECRETS", "") + if global_secrets_str: + for secret_name in global_secrets_str.split(","): + secret_name = secret_name.strip() + if secret_name and secret_name not in self.profile.secrets: + try: + secrets_list.append(modal.Secret.from_name(secret_name)) + except Exception as e: + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Warning: Could not load global secret '{secret_name}': {e}") + + if secrets_list: + create_kwargs["secrets"] = secrets_list + + if not os.getenv("HERMES_QUIET"): + gpu_str = f" with GPU={self.profile.gpu}" if self.profile.gpu else "" + secrets_str = f" with {len(secrets_list)} secret(s)" if secrets_list else "" + print(f"[Modal] Creating sandbox: {name}{gpu_str}{secrets_str}") + + return modal.Sandbox.create(**create_kwargs) + + def _find_available_slot(self) -> Optional[str]: + """Find an available sandbox in the pool (not currently in use).""" + in_use_names = set(self._in_use.values()) + for name in self._pool: + if name not in in_use_names: + # Verify sandbox is still running + try: + if self._pool[name].poll() is None: + return name + else: + # Sandbox died, remove it + del self._pool[name] + self._last_used.pop(name, None) + except: + pass + return None + + def _current_size(self) -> int: + """Get current pool size.""" + return len(self._pool) + + def acquire(self, task_id: str, timeout: float = 60.0) -> Any: + """ + Acquire a sandbox for a task. + + - Returns existing sandbox if task already has one + - Finds available sandbox in pool if any + - Scales up if under max_pool and all busy + - Waits if at max_pool and all busy + """ + deadline = time.time() + timeout + + while True: + with self._lock: + # Task already has a sandbox? + if task_id in self._in_use: + name = self._in_use[task_id] + self._last_used[name] = time.time() + return self._pool[name] + + self._ensure_app() + + # Find available slot in pool + available = self._find_available_slot() + if available: + self._in_use[task_id] = available + self._last_used[available] = time.time() + return self._pool[available] + + # Scale up if under max + if self._current_size() < self.profile.max_pool: + name = self._get_sandbox_name(self._next_index) + self._next_index += 1 + try: + sb = self._recover_or_create_sandbox(name) + self._pool[name] = sb + self._in_use[task_id] = name + self._last_used[name] = time.time() + return sb + except Exception as e: + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Failed to create sandbox: {e}") + raise + + # At capacity - wait and retry + if time.time() > deadline: + raise TimeoutError( + f"No Modal sandbox available for profile '{self.profile.name}' " + f"within {timeout}s (pool size: {self._current_size()}/{self.profile.max_pool})" + ) + time.sleep(0.5) + + def release(self, task_id: str, terminate: bool = False): + """ + Release a sandbox back to the pool. + + If terminate=False, sandbox stays warm for reuse. + If terminate=True, sandbox is terminated immediately. + """ + with self._lock: + if task_id not in self._in_use: + return + + name = self._in_use.pop(task_id) + self._last_used[name] = time.time() + + if terminate: + self._terminate_sandbox(name) + + def _terminate_sandbox(self, name: str, during_shutdown: bool = False): + """Terminate and remove a sandbox from the pool.""" + if name in self._pool: + try: + self._pool[name].terminate() + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Terminated sandbox: {name}") + except Exception as e: + if not during_shutdown and not os.getenv("HERMES_QUIET"): + print(f"[Modal] Error terminating {name}: {e}") + del self._pool[name] + self._last_used.pop(name, None) + + def _scale_down_monitor(self): + """Background thread: terminate idle sandboxes above min_pool size.""" + while self._running: + time.sleep(30) # Check every 30 seconds + + with self._lock: + if self._current_size() <= self.profile.min_pool: + continue + + now = time.time() + in_use_names = set(self._in_use.values()) + + # Find idle sandboxes to terminate + to_terminate = [] + for name, last_used in list(self._last_used.items()): + if name in in_use_names: + continue + if now - last_used > self.profile.scale_down_idle: + # Don't go below min_pool + if self._current_size() - len(to_terminate) > self.profile.min_pool: + to_terminate.append(name) + + for name in to_terminate: + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Scaling down idle sandbox: {name}") + self._terminate_sandbox(name) + + def shutdown(self, during_shutdown: bool = False): + """Stop monitor thread and terminate all sandboxes.""" + self._running = False + with self._lock: + for name in list(self._pool.keys()): + self._terminate_sandbox(name, during_shutdown=during_shutdown) + + +class _ModalPoolManager: + """ + Manages multiple sandbox pools, one per profile. + + Singleton pattern - shared across all _ModalSandboxEnvironment instances. + Each profile has its own pool with independent scaling. + """ + + _instance: ClassVar[Optional["_ModalPoolManager"]] = None + _init_lock: ClassVar[threading.Lock] = threading.Lock() + + @classmethod + def get_instance(cls) -> "_ModalPoolManager": + """Get or create the singleton instance.""" + with cls._init_lock: + if cls._instance is None: + cls._instance = cls() + return cls._instance + + @classmethod + def reset_instance(cls): + """Reset the singleton (for testing).""" + with cls._init_lock: + if cls._instance is not None: + cls._instance.shutdown() + cls._instance = None + + def __init__(self): + self.app_name = os.getenv("TERMINAL_MODAL_APP_NAME", "hermes-sandbox") + self.profiles = ModalProfile.load_profiles() + self.default_profile = os.getenv("TERMINAL_MODAL_DEFAULT_PROFILE", "default") + + # Fall back to first profile if default not found + if self.default_profile not in self.profiles and self.profiles: + self.default_profile = next(iter(self.profiles.keys())) + + self._pools: Dict[str, _ModalSandboxPool] = {} + self._pools_lock = threading.Lock() + + if not os.getenv("HERMES_QUIET"): + print(f"[Modal] Pool manager initialized with profiles: {list(self.profiles.keys())}") + print(f"[Modal] Default profile: {self.default_profile}") + + def _get_pool(self, profile_name: str) -> _ModalSandboxPool: + """Get or create a pool for a profile.""" + with self._pools_lock: + if profile_name not in self._pools: + if profile_name not in self.profiles: + available = list(self.profiles.keys()) + raise ValueError( + f"Unknown Modal profile: '{profile_name}'. " + f"Available profiles: {available}" + ) + profile = self.profiles[profile_name] + self._pools[profile_name] = _ModalSandboxPool(profile, self.app_name) + return self._pools[profile_name] + + def acquire(self, task_id: str, profile: Optional[str] = None, timeout: float = 60.0) -> Any: + """Acquire a sandbox from the appropriate profile's pool.""" + profile_name = profile or self.default_profile + return self._get_pool(profile_name).acquire(task_id, timeout=timeout) + + def release(self, task_id: str, profile: Optional[str] = None, terminate: bool = False): + """Release a sandbox back to its pool.""" + profile_name = profile or self.default_profile + if profile_name in self._pools: + self._pools[profile_name].release(task_id, terminate=terminate) + + def get_status(self) -> Dict[str, Any]: + """Get status of all pools.""" + status = {} + with self._pools_lock: + for name, pool in self._pools.items(): + with pool._lock: + status[name] = { + "pool_size": pool._current_size(), + "in_use": len(pool._in_use), + "max_pool": pool.profile.max_pool, + "min_pool": pool.profile.min_pool, + } + return status + + def shutdown(self, during_shutdown: bool = False): + """Shutdown all pools.""" + with self._pools_lock: + for pool in self._pools.values(): + pool.shutdown(during_shutdown=during_shutdown) + self._pools.clear() + + +class _ModalSandboxEnvironment: + """ + Modal Sandbox environment with profile-based pool management. + + Features: + - Profile selection for heterogeneous workloads + - Auto-scaling warm sandbox pool + - Named sandbox recovery + - SUDO_PASSWORD support + """ + + def __init__( + self, + image: str, # Used only if no profile config + cwd: str = "/workspace", + timeout: int = 60, + task_id: str = "", + profile: Optional[str] = None, # Profile name (e.g., "pytorch-gpu") + ): self.cwd = cwd self.timeout = timeout + self.task_id = task_id or str(uuid.uuid4()) + self.profile = profile + self._released = False + + # Acquire sandbox from pool + manager = _ModalPoolManager.get_instance() + self._sandbox = manager.acquire(self.task_id, profile=profile) def execute(self, command: str, cwd: str = "", *, timeout: int | None = None) -> dict: - """Execute a command in Modal with sudo support.""" + """Execute a command in the Modal sandbox.""" # Transform sudo commands if SUDO_PASSWORD is available exec_command = _transform_sudo_command(command) + work_dir = cwd or self.cwd - # Delegate to inner environment with transformed command - return self._inner.execute(exec_command, cwd=cwd, timeout=timeout) + try: + # Run command via bash with proper working directory + process = self._sandbox.exec( + "bash", "-c", f"cd {work_dir} && {exec_command}", + timeout=timeout or self.timeout + ) + + # Read output + stdout = process.stdout.read() + stderr = process.stderr.read() + process.wait() + + # Combine stdout and stderr + output = stdout + if stderr: + output = output + stderr if output else stderr + + return {"output": output, "returncode": process.returncode} + + except Exception as e: + error_msg = str(e) + if "timeout" in error_msg.lower(): + return {"output": f"Command timed out after {timeout or self.timeout}s", "returncode": 124} + return {"output": f"Modal execution error: {error_msg}", "returncode": 1} def cleanup(self): - """Cleanup the Modal deployment.""" - if hasattr(self._inner, 'stop'): - self._inner.stop() + """Release sandbox back to pool (stays warm for reuse).""" + if not self._released: + self._released = True + _ModalPoolManager.get_instance().release( + self.task_id, + profile=self.profile, + terminate=False + ) def stop(self): - """Stop the Modal deployment.""" - self.cleanup() + """Terminate this sandbox explicitly.""" + if not self._released: + self._released = True + _ModalPoolManager.get_instance().release( + self.task_id, + profile=self.profile, + terminate=True + ) def __del__(self): """Cleanup on destruction.""" @@ -768,7 +1315,7 @@ class _ModalEnvironment: TERMINAL_TOOL_DESCRIPTION = """Execute commands on a secure Linux environment. **Environment:** -- Isolated execution environment (local, Docker, or Modal cloud based on configuration) +- Isolated execution environment (local, Docker, Singularity, or Modal cloud based on configuration) - Filesystem persists between tool calls within the same task - Internet access available @@ -776,17 +1323,20 @@ TERMINAL_TOOL_DESCRIPTION = """Execute commands on a secure Linux environment. - Simple commands: Just provide the 'command' parameter - Background processes: Set 'background': True for servers/long-running tasks - Command timeout: Optional 'timeout' parameter in seconds +- Modal profiles: Use 'profile' parameter for specialized environments (e.g., GPU) **Examples:** - Run command: `{"command": "ls -la"}` - Background task: `{"command": "source venv/bin/activate && python server.py", "background": True}` - With timeout: `{"command": "long_task.sh", "timeout": 300}` +- GPU task (Modal): `{"command": "python train.py", "profile": "pytorch-gpu"}` **Best Practices:** - Run servers/long processes in background - Monitor disk usage for large tasks - Install whatever tools you need with apt-get or pip - Do not be afraid to run pip with --break-system-packages +- For ML/GPU tasks with Modal, use the appropriate profile **Things to avoid:** - Do NOT use interactive tools such as tmux, vim, nano, python repl - you will get stuck. @@ -820,9 +1370,17 @@ def _get_env_config() -> Dict[str, Any]: } -def _create_environment(env_type: str, image: str, cwd: str, timeout: int, ssh_config: dict = None): +def _create_environment( + env_type: str, + image: str, + cwd: str, + timeout: int, + ssh_config: dict = None, + task_id: str = "", + profile: Optional[str] = None, +): """ - Create an execution environment from mini-swe-agent. + Create an execution environment. Args: env_type: One of "local", "docker", "singularity", "modal", "ssh" @@ -830,6 +1388,8 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int, ssh_c cwd: Working directory timeout: Default command timeout ssh_config: SSH connection config (for env_type="ssh") + task_id: Unique task identifier (used for Modal pool management) + profile: Modal profile name (e.g., "pytorch-gpu") - only used for modal Returns: Environment instance with execute() method @@ -847,8 +1407,14 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int, ssh_c return _SingularityEnvironment(image=image, cwd=cwd, timeout=timeout) elif env_type == "modal": - # Use custom Modal wrapper with sudo support - return _ModalEnvironment(image=image, cwd=cwd, timeout=timeout) + # Use native Modal Sandbox with auto-scaling pool and profile support + return _ModalSandboxEnvironment( + image=image, + cwd=cwd, + timeout=timeout, + task_id=task_id, + profile=profile, + ) elif env_type == "ssh": if not ssh_config or not ssh_config.get("host") or not ssh_config.get("user"): @@ -1044,20 +1610,34 @@ def cleanup_vm(task_id: str): atexit.register(_stop_cleanup_thread) +def _shutdown_modal_pools(): + """Shutdown Modal pool manager on exit (silently, as interpreter is shutting down).""" + try: + if _ModalPoolManager._instance is not None: + _ModalPoolManager._instance.shutdown(during_shutdown=True) + except: + pass # Ignore all errors during interpreter shutdown + +atexit.register(_shutdown_modal_pools) + + def terminal_tool( command: str, background: bool = False, timeout: Optional[int] = None, - task_id: Optional[str] = None + task_id: Optional[str] = None, + profile: Optional[str] = None, ) -> str: """ - Execute a command using mini-swe-agent's execution environments. + Execute a command using configured execution environments. Args: command: The command to execute background: Whether to run in background (default: False) timeout: Command timeout in seconds (default: from config) task_id: Unique identifier for environment isolation (optional) + profile: Modal profile name for heterogeneous workloads (e.g., "pytorch-gpu") + Only used when TERMINAL_ENV=modal. If not specified, uses default profile. Returns: str: JSON string with output, exit_code, and error fields @@ -1071,6 +1651,9 @@ def terminal_tool( # With custom timeout >>> result = terminal_tool(command="long_task.sh", timeout=300) + + # Use GPU profile for ML tasks (Modal only) + >>> result = terminal_tool(command="python train.py", profile="pytorch-gpu") """ global _active_environments, _last_activity @@ -1114,8 +1697,9 @@ def terminal_tool( # Get or create environment with _env_lock: if effective_task_id not in _active_environments: - # Check disk usage before creating new environment - _check_disk_usage_warning() + # Check disk usage before creating new environment (Singularity only) + if env_type == "singularity": + _check_disk_usage_warning() try: # Build SSH config if using SSH environment @@ -1133,7 +1717,9 @@ def terminal_tool( image=image, cwd=cwd, timeout=effective_timeout, - ssh_config=ssh_config + ssh_config=ssh_config, + task_id=effective_task_id, + profile=profile, ) except ImportError as e: return json.dumps({