feat(environments): add Daytona cloud sandbox backend

New execution backend using the Daytona Python SDK. Supports persistent
sandboxes via stop/start lifecycle, interrupt handling, and automatic
retry on transient errors.

Signed-off-by: rovle <lovre.pesut@gmail.com>
This commit is contained in:
rovle 2026-03-05 00:40:34 -08:00
parent 21d61bdd71
commit 1e312c6582
2 changed files with 180 additions and 1 deletions

View file

@ -2,7 +2,7 @@
Each backend provides the same interface (BaseEnvironment ABC) for running
shell commands in a specific execution context: local, Docker, Singularity,
SSH, or Modal.
SSH, Modal, or Daytona.
The terminal_tool.py factory (_create_environment) selects the backend
based on the TERMINAL_ENV configuration.

View file

@ -0,0 +1,179 @@
"""Daytona cloud execution environment.
Uses the Daytona Python SDK to run commands in cloud sandboxes.
Supports persistent sandboxes: when enabled, sandboxes are stopped on cleanup
and resumed on next creation, preserving the filesystem across sessions.
"""
import logging
import math
import threading
import uuid
from typing import Optional
from tools.environments.base import BaseEnvironment
from tools.interrupt import is_interrupted
logger = logging.getLogger(__name__)
class DaytonaEnvironment(BaseEnvironment):
"""Daytona cloud sandbox execution backend.
Uses stopped/started sandbox lifecycle for filesystem persistence
instead of snapshots, making it faster and stateless on the host.
"""
def __init__(
self,
image: str,
cwd: str = "/home/daytona",
timeout: int = 60,
cpu: int = 1,
memory: int = 5120, # MB (hermes convention)
disk: int = 51200, # MB (hermes convention)
persistent_filesystem: bool = True,
task_id: str = "default",
):
# Resolve ~ to Daytona's default user home (matches Docker's ~ -> /root pattern)
if cwd == "~":
cwd = "/home/daytona"
super().__init__(cwd=cwd, timeout=timeout)
from daytona import (
Daytona,
CreateSandboxFromImageParams,
DaytonaError,
Resources,
)
self._persistent = persistent_filesystem
self._task_id = task_id
self._daytona = Daytona()
self._sandbox = None
self._lock = threading.Lock()
memory_gib = max(1, math.ceil(memory / 1024))
disk_gib = max(1, math.ceil(disk / 1024))
resources = Resources(cpu=cpu, memory=memory_gib, disk=disk_gib)
labels = {"hermes_task_id": task_id}
# Try to resume an existing stopped sandbox for this task
if self._persistent:
try:
self._sandbox = self._daytona.find_one(labels=labels)
self._sandbox.start()
logger.info("Daytona: resumed sandbox %s for task %s",
self._sandbox.id, task_id)
except DaytonaError:
self._sandbox = None
except Exception as e:
logger.warning("Daytona: failed to resume sandbox for task %s: %s",
task_id, e)
self._sandbox = None
# Create a fresh sandbox if we don't have one
if self._sandbox is None:
self._sandbox = self._daytona.create(
CreateSandboxFromImageParams(
image=image,
labels=labels,
auto_stop_interval=0,
resources=resources,
)
)
logger.info("Daytona: created sandbox %s for task %s",
self._sandbox.id, task_id)
def _ensure_sandbox_ready(self):
"""Restart sandbox if it was stopped (e.g., by a previous interrupt)."""
self._sandbox.refresh_data()
if self._sandbox.state in ("stopped", "archived"):
self._sandbox.start()
logger.info("Daytona: restarted sandbox %s", self._sandbox.id)
def _exec_in_thread(self, exec_command: str, cwd: Optional[str], timeout: int) -> dict:
"""Run exec in a background thread with interrupt polling."""
result_holder: dict = {"value": None, "error": None}
def _run():
try:
response = self._sandbox.process.exec(
exec_command, cwd=cwd, timeout=timeout,
)
result_holder["value"] = {
"output": response.result or "",
"returncode": response.exit_code,
}
except Exception as e:
result_holder["error"] = e
t = threading.Thread(target=_run, daemon=True)
t.start()
while t.is_alive():
t.join(timeout=0.2)
if is_interrupted():
with self._lock:
try:
self._sandbox.stop()
except Exception:
pass
return {
"output": "[Command interrupted - Daytona sandbox stopped]",
"returncode": 130,
}
if result_holder["error"]:
return {"error": result_holder["error"]}
return result_holder["value"]
def execute(self, command: str, cwd: str = "", *,
timeout: Optional[int] = None,
stdin_data: Optional[str] = None) -> dict:
with self._lock:
self._ensure_sandbox_ready()
if stdin_data is not None:
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
while marker in stdin_data:
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
command = f"{command} << '{marker}'\n{stdin_data}\n{marker}"
exec_command = self._prepare_command(command)
effective_cwd = cwd or self.cwd or None
effective_timeout = timeout or self.timeout
result = self._exec_in_thread(exec_command, effective_cwd, effective_timeout)
if "error" in result:
from daytona import DaytonaError
err = result["error"]
if isinstance(err, DaytonaError):
with self._lock:
try:
self._ensure_sandbox_ready()
except Exception:
return {"output": f"Daytona execution error: {err}", "returncode": 1}
result = self._exec_in_thread(exec_command, effective_cwd, effective_timeout)
if "error" not in result:
return result
return {"output": f"Daytona execution error: {err}", "returncode": 1}
return result
def cleanup(self):
with self._lock:
if self._sandbox is None:
return
try:
if self._persistent:
self._sandbox.stop()
logger.info("Daytona: stopped sandbox %s (filesystem preserved)",
self._sandbox.id)
else:
self._daytona.delete(self._sandbox)
logger.info("Daytona: deleted sandbox %s", self._sandbox.id)
except Exception as e:
logger.warning("Daytona: cleanup failed: %s", e)
self._sandbox = None