hermes-agent/tools/environments/singularity.py
alt-glitch d684d7ee7e feat(environments): unified spawn-per-call execution layer
Replace dual execution model (PersistentShellMixin + per-backend oneshot)
with spawn-per-call + session snapshot for all backends except ManagedModal.

Core changes:
- Every command spawns a fresh bash process; session snapshot (env vars,
  functions, aliases) captured at init and re-sourced before each command
- CWD persists via file-based read (local) or in-band stdout markers (remote)
- ProcessHandle protocol + _ThreadedProcessHandle adapter for SDK backends
- cancel_fn wired for Modal (sandbox.terminate) and Daytona (sandbox.stop)
- Shared utilities extracted: _pipe_stdin, _popen_bash, _load_json_store,
  _save_json_store, _file_mtime_key, _SYNC_INTERVAL_SECONDS
- Rate-limited file sync unified in base _before_execute() with _sync_files() hook
- execute_oneshot() removed; all 11 call sites in code_execution_tool.py
  migrated to execute()
- Daytona timeout wrapper replaced with SDK-native timeout parameter
- persistent_shell.py deleted (291 lines)

Backend-specific:
- Local: process-group kill via os.killpg, file-based CWD read
- Docker: -e env flags only on init_session, not per-command
- SSH: shlex.quote transport, ControlMaster connection reuse
- Singularity: apptainer exec with instance://, no forced --pwd
- Modal: _AsyncWorker + _ThreadedProcessHandle, cancel_fn -> sandbox.terminate
- Daytona: SDK-level timeout (not shell wrapper), cancel_fn -> sandbox.stop
- ManagedModal: unchanged (gateway owns execution); docstring added explaining why
2026-04-08 17:23:15 -07:00

262 lines
9.1 KiB
Python

"""Singularity/Apptainer persistent container environment.
Security-hardened with --containall, --no-home, capability dropping.
Supports configurable resource limits and optional filesystem persistence
via writable overlay directories that survive across sessions.
"""
import logging
import os
import shutil
import subprocess
import threading
import uuid
from pathlib import Path
from typing import Optional
from hermes_constants import get_hermes_home
from tools.environments.base import (
BaseEnvironment,
_load_json_store,
_popen_bash,
_save_json_store,
)
logger = logging.getLogger(__name__)
_SNAPSHOT_STORE = get_hermes_home() / "singularity_snapshots.json"
def _find_singularity_executable() -> str:
"""Locate the apptainer or singularity CLI binary."""
if shutil.which("apptainer"):
return "apptainer"
if shutil.which("singularity"):
return "singularity"
raise RuntimeError(
"Neither 'apptainer' nor 'singularity' was found in PATH. "
"Install Apptainer (https://apptainer.org/docs/admin/main/installation.html) "
"or Singularity and ensure the CLI is available."
)
def _ensure_singularity_available() -> str:
"""Preflight check: resolve the executable and verify it responds."""
exe = _find_singularity_executable()
try:
result = subprocess.run(
[exe, "version"], capture_output=True, text=True, timeout=10,
)
except FileNotFoundError:
raise RuntimeError(
f"Singularity backend selected but '{exe}' could not be executed."
)
except subprocess.TimeoutExpired:
raise RuntimeError(f"'{exe} version' timed out.")
if result.returncode != 0:
stderr = result.stderr.strip()[:200]
raise RuntimeError(f"'{exe} version' failed (exit code {result.returncode}): {stderr}")
return exe
def _load_snapshots() -> dict:
return _load_json_store(_SNAPSHOT_STORE)
def _save_snapshots(data: dict) -> None:
_save_json_store(_SNAPSHOT_STORE, data)
def _get_scratch_dir() -> Path:
custom_scratch = os.getenv("TERMINAL_SCRATCH_DIR")
if custom_scratch:
scratch_path = Path(custom_scratch)
scratch_path.mkdir(parents=True, exist_ok=True)
return scratch_path
from tools.environments.base import get_sandbox_dir
sandbox = get_sandbox_dir() / "singularity"
scratch = Path("/scratch")
if scratch.exists() and os.access(scratch, os.W_OK):
user_scratch = scratch / os.getenv("USER", "hermes") / "hermes-agent"
user_scratch.mkdir(parents=True, exist_ok=True)
logger.info("Using /scratch for sandboxes: %s", user_scratch)
return user_scratch
sandbox.mkdir(parents=True, exist_ok=True)
return sandbox
def _get_apptainer_cache_dir() -> Path:
cache_dir = os.getenv("APPTAINER_CACHEDIR")
if cache_dir:
cache_path = Path(cache_dir)
cache_path.mkdir(parents=True, exist_ok=True)
return cache_path
scratch = _get_scratch_dir()
cache_path = scratch / ".apptainer"
cache_path.mkdir(parents=True, exist_ok=True)
return cache_path
_sif_build_lock = threading.Lock()
def _get_or_build_sif(image: str, executable: str = "apptainer") -> str:
if image.endswith('.sif') and Path(image).exists():
return image
if not image.startswith('docker://'):
return image
image_name = image.replace('docker://', '').replace('/', '-').replace(':', '-')
cache_dir = _get_apptainer_cache_dir()
sif_path = cache_dir / f"{image_name}.sif"
if sif_path.exists():
return str(sif_path)
with _sif_build_lock:
if sif_path.exists():
return str(sif_path)
logger.info("Building SIF image (one-time setup)...")
logger.info(" Source: %s", image)
logger.info(" Target: %s", sif_path)
tmp_dir = cache_dir / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
env["APPTAINER_TMPDIR"] = str(tmp_dir)
env["APPTAINER_CACHEDIR"] = str(cache_dir)
try:
result = subprocess.run(
[executable, "build", str(sif_path), image],
capture_output=True, text=True, timeout=600, env=env,
)
if result.returncode != 0:
logger.warning("SIF build failed, falling back to docker:// URL")
logger.warning(" Error: %s", result.stderr[:500])
return image
logger.info("SIF image built successfully")
return str(sif_path)
except subprocess.TimeoutExpired:
logger.warning("SIF build timed out, falling back to docker:// URL")
if sif_path.exists():
sif_path.unlink()
return image
except Exception as e:
logger.warning("SIF build error: %s, falling back to docker:// URL", e)
return image
class SingularityEnvironment(BaseEnvironment):
"""Hardened Singularity/Apptainer container with resource limits and persistence.
Spawn-per-call: every execute() spawns a fresh ``apptainer exec ... bash -c`` process.
Session snapshot preserves env vars across calls.
CWD persists via in-band stdout markers.
"""
def __init__(
self,
image: str,
cwd: str = "~",
timeout: int = 60,
cpu: float = 0,
memory: int = 0,
disk: int = 0,
persistent_filesystem: bool = False,
task_id: str = "default",
):
super().__init__(cwd=cwd, timeout=timeout)
self.executable = _ensure_singularity_available()
self.image = _get_or_build_sif(image, self.executable)
self.instance_id = f"hermes_{uuid.uuid4().hex[:12]}"
self._instance_started = False
self._persistent = persistent_filesystem
self._task_id = task_id
self._overlay_dir: Optional[Path] = None
self._cpu = cpu
self._memory = memory
if self._persistent:
overlay_base = _get_scratch_dir() / "hermes-overlays"
overlay_base.mkdir(parents=True, exist_ok=True)
self._overlay_dir = overlay_base / f"overlay-{task_id}"
self._overlay_dir.mkdir(parents=True, exist_ok=True)
self._start_instance()
self.init_session()
def _start_instance(self):
cmd = [self.executable, "instance", "start"]
cmd.extend(["--containall", "--no-home"])
if self._persistent and self._overlay_dir:
cmd.extend(["--overlay", str(self._overlay_dir)])
else:
cmd.append("--writable-tmpfs")
try:
from tools.credential_files import get_credential_file_mounts, get_skills_directory_mount
for mount_entry in get_credential_file_mounts():
cmd.extend(["--bind", f"{mount_entry['host_path']}:{mount_entry['container_path']}:ro"])
for skills_mount in get_skills_directory_mount():
cmd.extend(["--bind", f"{skills_mount['host_path']}:{skills_mount['container_path']}:ro"])
except Exception as e:
logger.debug("Singularity: could not load credential/skills mounts: %s", e)
if self._memory > 0:
cmd.extend(["--memory", f"{self._memory}M"])
if self._cpu > 0:
cmd.extend(["--cpus", str(self._cpu)])
cmd.extend([str(self.image), self.instance_id])
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"Failed to start instance: {result.stderr}")
self._instance_started = True
logger.info("Singularity instance %s started (persistent=%s)",
self.instance_id, self._persistent)
except subprocess.TimeoutExpired:
raise RuntimeError("Instance start timed out")
def _run_bash(self, cmd_string: str, *, login: bool = False,
timeout: int = 120,
stdin_data: str | None = None) -> subprocess.Popen:
"""Spawn a bash process inside the Singularity instance."""
if not self._instance_started:
raise RuntimeError("Singularity instance not started")
cmd = [self.executable, "exec",
f"instance://{self.instance_id}"]
if login:
cmd.extend(["bash", "-l", "-c", cmd_string])
else:
cmd.extend(["bash", "-c", cmd_string])
return _popen_bash(cmd, stdin_data)
def cleanup(self):
"""Stop the instance. If persistent, the overlay dir survives."""
if self._instance_started:
try:
subprocess.run(
[self.executable, "instance", "stop", self.instance_id],
capture_output=True, text=True, timeout=30,
)
logger.info("Singularity instance %s stopped", self.instance_id)
except Exception as e:
logger.warning("Failed to stop Singularity instance %s: %s", self.instance_id, e)
self._instance_started = False
if self._persistent and self._overlay_dir:
snapshots = _load_snapshots()
snapshots[self._task_id] = str(self._overlay_dir)
_save_snapshots(snapshots)