mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-04 02:21:47 +00:00
Merge 5727b3429f into 05d8f11085
This commit is contained in:
commit
b4f338ccf4
19 changed files with 1022 additions and 29 deletions
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
Each backend provides the same interface (BaseEnvironment ABC) for running
|
||||
shell commands in a specific execution context: local, Docker, Singularity,
|
||||
SSH, Modal, or Daytona.
|
||||
SSH, Modal, Daytona, or Koyeb.
|
||||
|
||||
The terminal_tool.py factory (_create_environment) selects the backend
|
||||
based on the TERMINAL_ENV configuration.
|
||||
|
|
|
|||
231
tools/environments/koyeb.py
Normal file
231
tools/environments/koyeb.py
Normal file
|
|
@ -0,0 +1,231 @@
|
|||
"""Koyeb cloud execution environment.
|
||||
|
||||
Uses the Koyeb Python SDK to run commands in cloud sandboxes.
|
||||
Each task gets its own sandbox which is deleted on cleanup.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import re
|
||||
import shlex
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from tools.environments.base import (
|
||||
BaseEnvironment,
|
||||
_ThreadedProcessHandle,
|
||||
)
|
||||
from tools.environments.file_sync import (
|
||||
FileSyncManager,
|
||||
iter_sync_files,
|
||||
quoted_mkdir_command,
|
||||
quoted_rm_command,
|
||||
unique_parent_dirs,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KoyebEnvironment(BaseEnvironment):
|
||||
"""Koyeb cloud sandbox execution backend.
|
||||
|
||||
Spawn-per-call via _ThreadedProcessHandle wrapping blocking SDK calls.
|
||||
cancel_fn wired to sandbox.delete() for interrupt support.
|
||||
Shell timeout wrapper preserved (SDK timeout unreliable).
|
||||
"""
|
||||
|
||||
_stdin_mode = "heredoc"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
image: str,
|
||||
cwd: str = "/root",
|
||||
timeout: int = 60,
|
||||
cpu: int = 1,
|
||||
memory: int = 5120,
|
||||
disk: int = 10240,
|
||||
persistent_filesystem: bool = True,
|
||||
task_id: str = "default",
|
||||
instance_type: str = "micro",
|
||||
region: str = None,
|
||||
):
|
||||
requested_cwd = cwd
|
||||
super().__init__(cwd=cwd, timeout=timeout)
|
||||
|
||||
from koyeb import Sandbox
|
||||
|
||||
self._task_id = task_id
|
||||
self._sandbox = None
|
||||
self._lock = threading.Lock()
|
||||
self._instance_type = instance_type
|
||||
self._region = region or os.getenv("KOYEB_REGION", "na")
|
||||
self._api_token = os.getenv("KOYEB_API_TOKEN")
|
||||
|
||||
# Convert memory from MB to GB (Koyeb uses GB)
|
||||
memory_gib = max(1, math.ceil(memory / 1024))
|
||||
|
||||
# Koyeb instance types: micro, small, medium, large, xlarge, 2xlarge, etc.
|
||||
# For now, we'll use the instance_type parameter directly
|
||||
# cpu and memory parameters are kept for compatibility but may be overridden by instance_type
|
||||
|
||||
# Koyeb app names must be lowercase alphanumeric + hyphens only.
|
||||
# Sanitize task_id: replace underscores/invalid chars with hyphens,
|
||||
# collapse runs, strip leading/trailing hyphens, and truncate.
|
||||
safe_id = re.sub(r"[^a-z0-9-]", "-", task_id.lower())
|
||||
safe_id = re.sub(r"-{2,}", "-", safe_id).strip("-")
|
||||
sandbox_name = f"hermes-{safe_id}"[:63] # Koyeb name max length
|
||||
try:
|
||||
self._sandbox = Sandbox.create(
|
||||
image=image,
|
||||
name=sandbox_name,
|
||||
wait_ready=True,
|
||||
instance_type=self._instance_type,
|
||||
region=self._region,
|
||||
api_token=self._api_token,
|
||||
timeout=300,
|
||||
)
|
||||
logger.info("Koyeb: created sandbox %s for task %s",
|
||||
self._sandbox.id, task_id)
|
||||
except Exception as e:
|
||||
logger.error("Koyeb: failed to create sandbox: %s", e)
|
||||
raise
|
||||
|
||||
# Detect remote home dir
|
||||
self._remote_home = "/root"
|
||||
try:
|
||||
home = self._sandbox.exec("echo $HOME").stdout.strip()
|
||||
if home:
|
||||
self._remote_home = home
|
||||
if requested_cwd in ("~", "/root"):
|
||||
self.cwd = home
|
||||
except Exception:
|
||||
pass
|
||||
logger.info("Koyeb: resolved home to %s, cwd to %s", self._remote_home, self.cwd)
|
||||
|
||||
self._sync_manager = FileSyncManager(
|
||||
get_files_fn=lambda: iter_sync_files(f"{self._remote_home}/.hermes"),
|
||||
upload_fn=self._koyeb_upload,
|
||||
delete_fn=self._koyeb_delete,
|
||||
bulk_upload_fn=self._koyeb_bulk_upload,
|
||||
bulk_download_fn=self._koyeb_bulk_download,
|
||||
)
|
||||
self._sync_manager.sync(force=True)
|
||||
self.init_session()
|
||||
|
||||
def _koyeb_upload(self, host_path: str, remote_path: str) -> None:
|
||||
"""Upload a single file via Koyeb SDK."""
|
||||
parent = str(Path(remote_path).parent)
|
||||
self._sandbox.exec(f"mkdir -p {shlex.quote(parent)}")
|
||||
self._sandbox.filesystem.upload_file(host_path, remote_path, encoding="base64")
|
||||
|
||||
def _koyeb_bulk_upload(self, files: list[tuple[str, str]]) -> None:
|
||||
"""Upload many files as a single tar archive to avoid per-file HTTP overhead."""
|
||||
if not files:
|
||||
return
|
||||
|
||||
import tarfile
|
||||
import tempfile
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix=".tar", delete=False) as tmp:
|
||||
tmp_path = tmp.name
|
||||
|
||||
try:
|
||||
with tarfile.open(tmp_path, "w") as tar:
|
||||
for host_path, remote_path in files:
|
||||
# Store with absolute remote path inside the tar
|
||||
tar.add(host_path, arcname=remote_path)
|
||||
|
||||
remote_tar = f"/tmp/.hermes_upload.{os.getpid()}.tar"
|
||||
self._sandbox.filesystem.upload_file(tmp_path, remote_tar, encoding="base64")
|
||||
self._sandbox.exec(f"tar xf {shlex.quote(remote_tar)} -C / && rm -f {shlex.quote(remote_tar)}")
|
||||
finally:
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def _koyeb_bulk_download(self, dest: Path) -> None:
|
||||
"""Download remote .hermes/ as a tar archive."""
|
||||
rel_base = f"{self._remote_home}/.hermes".lstrip("/")
|
||||
# PID-suffixed remote temp path avoids collisions if sync_back fires
|
||||
# concurrently for the same sandbox (e.g. retry after partial failure).
|
||||
remote_tar = f"/tmp/.hermes_sync.{os.getpid()}.tar"
|
||||
self._sandbox.exec(
|
||||
f"tar cf {shlex.quote(remote_tar)} -C / {shlex.quote(rel_base)}"
|
||||
)
|
||||
self._sandbox.filesystem.download_file(remote_tar, str(dest))
|
||||
# Clean up remote temp file
|
||||
try:
|
||||
self._sandbox.exec(f"rm -f {shlex.quote(remote_tar)}")
|
||||
except Exception:
|
||||
pass # best-effort cleanup
|
||||
|
||||
def _koyeb_delete(self, remote_paths: list[str]) -> None:
|
||||
"""Batch-delete remote files via SDK exec."""
|
||||
self._sandbox.exec(quoted_rm_command(remote_paths))
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Sandbox lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _ensure_sandbox_ready(self) -> None:
|
||||
"""Restart sandbox if it was stopped (e.g., by a previous interrupt)."""
|
||||
# Koyeb sandboxes don't have a stopped state like Daytona
|
||||
# They're either running or need to be recreated
|
||||
pass
|
||||
|
||||
def _before_execute(self) -> None:
|
||||
"""Ensure sandbox is ready, then sync files via FileSyncManager."""
|
||||
with self._lock:
|
||||
self._ensure_sandbox_ready()
|
||||
self._sync_manager.sync()
|
||||
|
||||
def _run_bash(self, cmd_string: str, *, login: bool = False,
|
||||
timeout: int = 120,
|
||||
stdin_data: str | None = None):
|
||||
"""Return a _ThreadedProcessHandle wrapping a blocking Koyeb SDK call."""
|
||||
sandbox = self._sandbox
|
||||
lock = self._lock
|
||||
|
||||
def cancel():
|
||||
with lock:
|
||||
try:
|
||||
sandbox.delete()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if login:
|
||||
shell_cmd = f"bash -l -c {shlex.quote(cmd_string)}"
|
||||
else:
|
||||
shell_cmd = f"bash -c {shlex.quote(cmd_string)}"
|
||||
|
||||
def exec_fn() -> tuple[str, int]:
|
||||
result = sandbox.exec(shell_cmd, timeout=timeout)
|
||||
output = result.stdout or ""
|
||||
if result.stderr:
|
||||
output = f"{output}\n{result.stderr}" if output else result.stderr
|
||||
return (output, result.exit_code)
|
||||
|
||||
return _ThreadedProcessHandle(exec_fn, cancel_fn=cancel)
|
||||
|
||||
def cleanup(self):
|
||||
with self._lock:
|
||||
if self._sandbox is None:
|
||||
return
|
||||
|
||||
# Sync remote changes back to host before teardown
|
||||
if self._sync_manager:
|
||||
logger.info("Koyeb: syncing files from sandbox...")
|
||||
try:
|
||||
self._sync_manager.sync_back()
|
||||
except Exception as e:
|
||||
logger.warning("Koyeb: sync_back failed: %s", e)
|
||||
|
||||
try:
|
||||
self._sandbox.delete()
|
||||
logger.info("Koyeb: deleted sandbox %s", self._sandbox.id)
|
||||
except Exception as e:
|
||||
logger.warning("Koyeb: cleanup failed: %s", e)
|
||||
self._sandbox = None
|
||||
Loading…
Add table
Add a link
Reference in a new issue