perf(ssh,modal): bulk file sync via tar pipe and tar/base64 archive

SSH: symlink-staging + tar -ch piped over SSH in a single TCP stream.
Eliminates per-file scp round-trips. Handles timeout (kills both
processes), SSH Popen failure (kills tar), and tar create failure.

Modal: in-memory gzipped tar archive, base64-encoded, decoded+extracted
in one exec call. Checks exit code and raises on failure.

Both backends use shared helpers extracted into file_sync.py:
- quoted_mkdir_command() — mirrors existing quoted_rm_command()
- unique_parent_dirs() — deduplicates parent dirs from file pairs

Migrates _ensure_remote_dirs to use the new helpers.

28 new tests (21 SSH + 7 Modal), all passing.

Closes #7465
Closes #7467
This commit is contained in:
kshitijk4poor 2026-04-11 11:17:17 +05:30 committed by alt-glitch
parent 723b5bec85
commit 04d4f41e77
5 changed files with 897 additions and 5 deletions

View file

@ -10,6 +10,7 @@ import logging
import os
import shlex
import time
from pathlib import Path
from typing import Callable
from tools.environments.base import _file_mtime_key
@ -60,6 +61,16 @@ def quoted_rm_command(remote_paths: list[str]) -> str:
return "rm -f " + " ".join(shlex.quote(p) for p in remote_paths)
def quoted_mkdir_command(dirs: list[str]) -> str:
"""Build a shell ``mkdir -p`` command for a batch of directories."""
return "mkdir -p " + " ".join(shlex.quote(d) for d in dirs)
def unique_parent_dirs(files: list[tuple[str, str]]) -> list[str]:
"""Extract sorted unique parent directories from (host, remote) pairs."""
return sorted({str(Path(remote).parent) for _, remote in files})
class FileSyncManager:
"""Tracks local file changes and syncs to a remote environment.

View file

@ -5,8 +5,11 @@ wrapper, while preserving Hermes' persistent snapshot behavior across sessions.
"""
import asyncio
import base64
import io
import logging
import shlex
import tarfile
import threading
from pathlib import Path
from typing import Any, Optional
@ -18,7 +21,13 @@ from tools.environments.base import (
_load_json_store,
_save_json_store,
)
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
from tools.environments.file_sync import (
FileSyncManager,
iter_sync_files,
quoted_mkdir_command,
quoted_rm_command,
unique_parent_dirs,
)
logger = logging.getLogger(__name__)
@ -259,13 +268,13 @@ class ModalEnvironment(BaseEnvironment):
get_files_fn=lambda: iter_sync_files("/root/.hermes"),
upload_fn=self._modal_upload,
delete_fn=self._modal_delete,
bulk_upload_fn=self._modal_bulk_upload,
)
self._sync_manager.sync(force=True)
self.init_session()
def _modal_upload(self, host_path: str, remote_path: str) -> None:
"""Upload a single file via base64-over-exec."""
import base64
content = Path(host_path).read_bytes()
b64 = base64.b64encode(content).decode("ascii")
container_dir = str(Path(remote_path).parent)
@ -280,6 +289,44 @@ class ModalEnvironment(BaseEnvironment):
self._worker.run_coroutine(_write(), timeout=15)
def _modal_bulk_upload(self, files: list[tuple[str, str]]) -> None:
"""Upload many files in a single exec call via tar archive.
Builds a gzipped tar archive in memory, base64-encodes it, and
decodes+extracts in one ``exec`` call. Avoids per-file
exec+encoding overhead (~580 files goes from minutes to seconds).
"""
if not files:
return
# Build a tar archive in memory with files at their remote paths
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tar:
for host_path, remote_path in files:
# Store with leading '/' stripped so extracting at '/'
# recreates the full absolute path
tar.add(host_path, arcname=remote_path.lstrip("/"))
payload = base64.b64encode(buf.getvalue()).decode("ascii")
# Pre-create parent dirs + decode + extract in one exec call
parents = unique_parent_dirs(files)
mkdir_part = quoted_mkdir_command(parents)
cmd = (
f"{mkdir_part} && "
f"echo {shlex.quote(payload)} | base64 -d | tar xzf - -C /"
)
sandbox = self._sandbox
async def _bulk():
proc = await sandbox.exec.aio("bash", "-c", cmd)
exit_code = await proc.wait.aio()
if exit_code != 0:
raise RuntimeError(
f"Modal bulk upload failed (exit {exit_code})"
)
self._worker.run_coroutine(_bulk(), timeout=120)
def _modal_delete(self, remote_paths: list[str]) -> None:
"""Batch-delete remote files via exec."""
rm_cmd = quoted_rm_command(remote_paths)

View file

@ -1,6 +1,7 @@
"""SSH remote execution environment with ControlMaster connection persistence."""
import logging
import os
import shlex
import shutil
import subprocess
@ -8,7 +9,13 @@ import tempfile
from pathlib import Path
from tools.environments.base import BaseEnvironment, _popen_bash
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
from tools.environments.file_sync import (
FileSyncManager,
iter_sync_files,
quoted_mkdir_command,
quoted_rm_command,
unique_parent_dirs,
)
logger = logging.getLogger(__name__)
@ -50,6 +57,7 @@ class SSHEnvironment(BaseEnvironment):
get_files_fn=lambda: iter_sync_files(f"{self._remote_home}/.hermes"),
upload_fn=self._scp_upload,
delete_fn=self._ssh_delete,
bulk_upload_fn=self._ssh_bulk_upload,
)
self._sync_manager.sync(force=True)
@ -107,9 +115,8 @@ class SSHEnvironment(BaseEnvironment):
"""Create base ~/.hermes directory tree on remote in one SSH call."""
base = f"{self._remote_home}/.hermes"
dirs = [base, f"{base}/skills", f"{base}/credentials", f"{base}/cache"]
mkdir_cmd = "mkdir -p " + " ".join(shlex.quote(d) for d in dirs)
cmd = self._build_ssh_command()
cmd.append(mkdir_cmd)
cmd.append(quoted_mkdir_command(dirs))
subprocess.run(cmd, capture_output=True, text=True, timeout=10)
# _get_sync_files provided via iter_sync_files in FileSyncManager init
@ -131,6 +138,92 @@ class SSHEnvironment(BaseEnvironment):
if result.returncode != 0:
raise RuntimeError(f"scp failed: {result.stderr.strip()}")
def _ssh_bulk_upload(self, files: list[tuple[str, str]]) -> None:
"""Upload many files in a single tar-over-SSH stream.
Pipes ``tar c`` on the local side through an SSH connection to
``tar x`` on the remote, transferring all files in one TCP stream
instead of spawning a subprocess per file. Directory creation is
batched into a single ``mkdir -p`` call beforehand.
Typical improvement: ~580 files goes from O(N) scp round-trips
to a single streaming transfer.
"""
if not files:
return
# Pre-create all unique parent directories in one SSH call
parents = unique_parent_dirs(files)
if parents:
cmd = self._build_ssh_command()
cmd.append(quoted_mkdir_command(parents))
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"remote mkdir failed: {result.stderr.strip()}")
# Symlink staging avoids fragile GNU tar --transform rules.
with tempfile.TemporaryDirectory(prefix="hermes-ssh-bulk-") as staging:
for host_path, remote_path in files:
# remote_path is absolute (e.g. /home/user/.hermes/skills/foo.md)
# Create the directory structure under staging
staged = os.path.join(staging, remote_path.lstrip("/"))
os.makedirs(os.path.dirname(staged), exist_ok=True)
# Symlink to the actual file (avoid copying)
os.symlink(os.path.abspath(host_path), staged)
# tar: dereference symlinks (-h), create archive from staging root
# The archive paths are relative to staging, which mirrors / on remote
tar_cmd = ["tar", "-chf", "-", "-C", staging, "."]
# ssh: extract on remote at /
ssh_cmd = self._build_ssh_command()
ssh_cmd.append("tar xf - -C /")
tar_proc = subprocess.Popen(
tar_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
try:
ssh_proc = subprocess.Popen(
ssh_cmd, stdin=tar_proc.stdout, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except Exception:
tar_proc.kill()
tar_proc.wait()
raise
# Allow tar_proc to receive SIGPIPE if ssh_proc exits early
tar_proc.stdout.close()
try:
_, ssh_stderr = ssh_proc.communicate(timeout=120)
# Use communicate() instead of wait() to drain stderr and
# avoid deadlock if tar produces more than PIPE_BUF of errors.
tar_stderr_raw = b""
if tar_proc.poll() is None:
_, tar_stderr_raw = tar_proc.communicate(timeout=10)
else:
tar_stderr_raw = tar_proc.stderr.read() if tar_proc.stderr else b""
except subprocess.TimeoutExpired:
tar_proc.kill()
ssh_proc.kill()
tar_proc.wait()
ssh_proc.wait()
raise RuntimeError("SSH bulk upload timed out")
if tar_proc.returncode != 0:
raise RuntimeError(
f"tar create failed (rc={tar_proc.returncode}): "
f"{tar_stderr_raw.decode(errors='replace').strip()}"
)
if ssh_proc.returncode != 0:
raise RuntimeError(
f"tar extract over SSH failed (rc={ssh_proc.returncode}): "
f"{ssh_stderr.decode(errors='replace').strip()}"
)
logger.debug("SSH: bulk-uploaded %d file(s) via tar pipe", len(files))
def _ssh_delete(self, remote_paths: list[str]) -> None:
"""Batch-delete remote files in one SSH call."""
cmd = self._build_ssh_command()