feat(file-sync): sync remote changes back to host on teardown

Add sync_back() to FileSyncManager — on sandbox cleanup, downloads
the remote .hermes/ directory as a tar archive, diffs against SHA-256
hashes of what was originally pushed, and applies only changed files.

- SHA-256 content hashing on push for accurate change detection
- Retry with exponential backoff (3 attempts, 2s/4s/8s)
- SIGINT deferred during sync-back to prevent partial writes
- fcntl.flock serialization for concurrent gateway sandboxes
- Last-write-wins conflict resolution with logged warnings
- New files created on remote are pulled back via path inference
- Backend implementations: SSH (tar cf over pipe), Modal (exec tar
  cf, read stdout), Daytona (exec tar cf, SDK download_file)
- Wired into cleanup() for all three backends (runs before
  ControlMaster close / sandbox terminate / sandbox stop)

28 new tests (10 FSM core + 18 backend-specific), 72 total passing.
This commit is contained in:
alt-glitch 2026-04-11 17:32:27 -07:00
parent 27eeea0555
commit 37c478cf2f
6 changed files with 1023 additions and 0 deletions

View file

@ -16,6 +16,7 @@ from tools.environments.base import (
_ThreadedProcessHandle,
)
from tools.environments.file_sync import (
BulkDownloadFn,
FileSyncManager,
iter_sync_files,
quoted_mkdir_command,
@ -134,6 +135,7 @@ class DaytonaEnvironment(BaseEnvironment):
upload_fn=self._daytona_upload,
delete_fn=self._daytona_delete,
bulk_upload_fn=self._daytona_bulk_upload,
bulk_download_fn=self._daytona_bulk_download,
)
self._sync_manager.sync(force=True)
self.init_session()
@ -166,6 +168,14 @@ class DaytonaEnvironment(BaseEnvironment):
]
self._sandbox.fs.upload_files(uploads)
def _daytona_bulk_download(self, dest: Path) -> None:
"""Download remote .hermes/ as a tar archive."""
base = shlex.quote(self._remote_home)
self._sandbox.process.exec(
f"tar cf /tmp/.hermes_sync.tar -C {base}/.hermes ."
)
self._sandbox.fs.download_file("/tmp/.hermes_sync.tar", str(dest))
def _daytona_delete(self, remote_paths: list[str]) -> None:
"""Batch-delete remote files via SDK exec."""
self._sandbox.process.exec(quoted_rm_command(remote_paths))
@ -213,6 +223,10 @@ class DaytonaEnvironment(BaseEnvironment):
return _ThreadedProcessHandle(exec_fn, cancel_fn=cancel)
def cleanup(self):
if self._sync_manager:
logger.info("Daytona: syncing files from sandbox...")
self._sync_manager.sync_back()
with self._lock:
if self._sandbox is None:
return

View file

@ -6,9 +6,15 @@ and Daytona. Docker and Singularity use bind mounts (live host FS
view) and don't need this.
"""
import fcntl
import hashlib
import logging
import os
import shlex
import shutil
import signal
import tarfile
import tempfile
import time
from pathlib import Path
from typing import Callable
@ -23,6 +29,7 @@ _FORCE_SYNC_ENV = "HERMES_FORCE_FILE_SYNC"
# Transport callbacks provided by each backend
UploadFn = Callable[[str, str], None] # (host_path, remote_path) -> raises on failure
BulkUploadFn = Callable[[list[tuple[str, str]]], None] # [(host_path, remote_path), ...] -> raises on failure
BulkDownloadFn = Callable[[Path], None] # (dest_tar_path) -> writes tar archive, raises on failure
DeleteFn = Callable[[list[str]], None] # (remote_paths) -> raises on failure
GetFilesFn = Callable[[], list[tuple[str, str]]] # () -> [(host_path, remote_path), ...]
@ -71,6 +78,19 @@ def unique_parent_dirs(files: list[tuple[str, str]]) -> list[str]:
return sorted({str(Path(remote).parent) for _, remote in files})
def _sha256_file(path: str) -> str:
"""Return hex SHA-256 digest of a file."""
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
_SYNC_BACK_MAX_RETRIES = 3
_SYNC_BACK_BACKOFF = (2, 4, 8) # seconds between retries
class FileSyncManager:
"""Tracks local file changes and syncs to a remote environment.
@ -89,12 +109,15 @@ class FileSyncManager:
delete_fn: DeleteFn,
sync_interval: float = _SYNC_INTERVAL_SECONDS,
bulk_upload_fn: BulkUploadFn | None = None,
bulk_download_fn: BulkDownloadFn | None = None,
):
self._get_files_fn = get_files_fn
self._upload_fn = upload_fn
self._bulk_upload_fn = bulk_upload_fn
self._bulk_download_fn = bulk_download_fn
self._delete_fn = delete_fn
self._synced_files: dict[str, tuple[float, int]] = {} # remote_path -> (mtime, size)
self._pushed_hashes: dict[str, str] = {} # remote_path -> sha256 hex digest
self._last_sync_time: float = 0.0 # monotonic; 0 ensures first sync runs
self._sync_interval = sync_interval
@ -156,8 +179,12 @@ class FileSyncManager:
logger.debug("file_sync: deleted %s", to_delete)
# --- Commit (all succeeded) ---
for host_path, remote_path in to_upload:
self._pushed_hashes[remote_path] = _sha256_file(host_path)
for p in to_delete:
new_files.pop(p, None)
self._pushed_hashes.pop(p, None)
self._synced_files = new_files
self._last_sync_time = time.monotonic()
@ -166,3 +193,156 @@ class FileSyncManager:
self._synced_files = prev_files
self._last_sync_time = time.monotonic()
logger.warning("file_sync: sync failed, rolled back state: %s", exc)
# ------------------------------------------------------------------
# Sync-back: pull remote changes to host on teardown
# ------------------------------------------------------------------
def sync_back(self, hermes_home: Path | None = None) -> None:
"""Pull remote changes back to the host filesystem.
Downloads the remote ``.hermes/`` directory as a tar archive,
unpacks it, and applies only files that differ from what was
originally pushed (based on SHA-256 content hashes).
Protected against SIGINT (defers the signal until complete) and
serialized across concurrent gateway sandboxes via file lock.
"""
if self._bulk_download_fn is None:
return
lock_path = (hermes_home or Path.home() / ".hermes") / ".sync.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
last_exc: Exception | None = None
for attempt in range(_SYNC_BACK_MAX_RETRIES):
try:
self._sync_back_once(lock_path)
return
except Exception as exc:
last_exc = exc
if attempt < _SYNC_BACK_MAX_RETRIES - 1:
delay = _SYNC_BACK_BACKOFF[attempt]
logger.warning(
"sync_back: attempt %d failed (%s), retrying in %ds",
attempt + 1, exc, delay,
)
time.sleep(delay)
logger.warning("sync_back: all %d attempts failed: %s", _SYNC_BACK_MAX_RETRIES, last_exc)
def _sync_back_once(self, lock_path: Path) -> None:
"""Single sync-back attempt with SIGINT protection and file lock."""
# Defer SIGINT so we don't leave host files in a partial state.
deferred_sigint: list[object] = []
original_handler = signal.getsignal(signal.SIGINT)
def _defer_sigint(signum, frame):
deferred_sigint.append((signum, frame))
logger.debug("sync_back: SIGINT deferred until sync completes")
signal.signal(signal.SIGINT, _defer_sigint)
try:
self._sync_back_locked(lock_path)
finally:
signal.signal(signal.SIGINT, original_handler)
# Re-raise deferred SIGINT so the caller can handle it.
if deferred_sigint:
os.kill(os.getpid(), signal.SIGINT)
def _sync_back_locked(self, lock_path: Path) -> None:
"""Sync-back under file lock (serializes concurrent gateways)."""
lock_fd = open(lock_path, "w")
try:
fcntl.flock(lock_fd, fcntl.LOCK_EX)
self._sync_back_impl()
finally:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
lock_fd.close()
def _sync_back_impl(self) -> None:
"""Download, diff, and apply remote changes to host."""
assert self._bulk_download_fn is not None
with tempfile.NamedTemporaryFile(suffix=".tar") as tf:
self._bulk_download_fn(Path(tf.name))
with tempfile.TemporaryDirectory(prefix="hermes-sync-back-") as staging:
with tarfile.open(tf.name) as tar:
tar.extractall(staging, filter="data")
applied = 0
for dirpath, _dirnames, filenames in os.walk(staging):
for fname in filenames:
staged_file = os.path.join(dirpath, fname)
rel = os.path.relpath(staged_file, staging)
remote_path = "/" + rel
remote_hash = _sha256_file(staged_file)
pushed_hash = self._pushed_hashes.get(remote_path)
if remote_hash == pushed_hash:
continue
# Resolve host path from get_files_fn mapping
host_path = self._resolve_host_path(remote_path)
if host_path is None:
# New file created on remote — find host base
# by mapping from remote prefix to host prefix.
host_path = self._infer_host_path(remote_path)
if host_path is None:
logger.debug(
"sync_back: skipping %s (no host mapping)",
remote_path,
)
continue
if os.path.exists(host_path) and pushed_hash is not None:
host_hash = _sha256_file(host_path)
if host_hash != pushed_hash:
logger.warning(
"sync_back: conflict on %s — host modified "
"since push, remote also changed. Applying "
"remote version (last-write-wins).",
remote_path,
)
os.makedirs(os.path.dirname(host_path), exist_ok=True)
shutil.copy2(staged_file, host_path)
applied += 1
if applied:
logger.info("sync_back: applied %d changed file(s)", applied)
else:
logger.debug("sync_back: no remote changes detected")
def _resolve_host_path(self, remote_path: str) -> str | None:
"""Find the host path for a known remote path from the file mapping."""
try:
for host, remote in self._get_files_fn():
if remote == remote_path:
return host
except Exception:
pass
return None
def _infer_host_path(self, remote_path: str) -> str | None:
"""Infer a host path for a new remote file by matching path prefixes.
Uses the existing file mapping to find a remote->host prefix pair,
then applies the same prefix substitution to the new file.
"""
try:
for host, remote in self._get_files_fn():
# Find a common prefix (e.g. /root/.hermes/skills -> ~/.hermes/skills)
remote_dir = str(Path(remote).parent) + "/"
if remote_path.startswith(remote_dir) or remote_path.startswith(
str(Path(remote).parent.parent) + "/"
):
host_dir = str(Path(host).parent)
remote_base_dir = str(Path(remote).parent)
suffix = remote_path[len(remote_base_dir):]
return host_dir + suffix
except Exception:
pass
return None

View file

@ -22,6 +22,7 @@ from tools.environments.base import (
_save_json_store,
)
from tools.environments.file_sync import (
BulkDownloadFn,
FileSyncManager,
iter_sync_files,
quoted_mkdir_command,
@ -269,6 +270,7 @@ class ModalEnvironment(BaseEnvironment):
upload_fn=self._modal_upload,
delete_fn=self._modal_delete,
bulk_upload_fn=self._modal_bulk_upload,
bulk_download_fn=self._modal_bulk_download,
)
self._sync_manager.sync(force=True)
self.init_session()
@ -347,6 +349,23 @@ class ModalEnvironment(BaseEnvironment):
self._worker.run_coroutine(_bulk(), timeout=120)
def _modal_bulk_download(self, dest: Path) -> None:
"""Download remote .hermes/ as a tar archive."""
async def _download():
proc = await self._sandbox.exec.aio(
"bash", "-c", "tar cf - -C /root/.hermes ."
)
data = await proc.stdout.read.aio()
exit_code = await proc.wait.aio()
if exit_code != 0:
raise RuntimeError(f"Modal bulk download failed (exit {exit_code})")
return data
tar_bytes = self._worker.run_coroutine(_download(), timeout=120)
if isinstance(tar_bytes, str):
tar_bytes = tar_bytes.encode()
dest.write_bytes(tar_bytes)
def _modal_delete(self, remote_paths: list[str]) -> None:
"""Batch-delete remote files via exec."""
rm_cmd = quoted_rm_command(remote_paths)
@ -404,6 +423,10 @@ class ModalEnvironment(BaseEnvironment):
if self._sandbox is None:
return
if self._sync_manager:
logger.info("Modal: syncing files from sandbox...")
self._sync_manager.sync_back()
if self._persistent:
try:
async def _snapshot():

View file

@ -10,6 +10,7 @@ from pathlib import Path
from tools.environments.base import BaseEnvironment, _popen_bash
from tools.environments.file_sync import (
BulkDownloadFn,
FileSyncManager,
iter_sync_files,
quoted_mkdir_command,
@ -58,6 +59,7 @@ class SSHEnvironment(BaseEnvironment):
upload_fn=self._scp_upload,
delete_fn=self._ssh_delete,
bulk_upload_fn=self._ssh_bulk_upload,
bulk_download_fn=self._ssh_bulk_download,
)
self._sync_manager.sync(force=True)
@ -216,6 +218,16 @@ class SSHEnvironment(BaseEnvironment):
logger.debug("SSH: bulk-uploaded %d file(s) via tar pipe", len(files))
def _ssh_bulk_download(self, dest: Path) -> None:
"""Download remote .hermes/ as a tar archive."""
base = f"{self._remote_home}/.hermes"
ssh_cmd = self._build_ssh_command()
ssh_cmd.append(f"tar cf - -C {shlex.quote(base)} .")
with open(dest, "wb") as f:
result = subprocess.run(ssh_cmd, stdout=f, stderr=subprocess.PIPE, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"SSH bulk download failed: {result.stderr.decode(errors='replace').strip()}")
def _ssh_delete(self, remote_paths: list[str]) -> None:
"""Batch-delete remote files in one SSH call."""
cmd = self._build_ssh_command()
@ -245,6 +257,10 @@ class SSHEnvironment(BaseEnvironment):
return _popen_bash(cmd, stdin_data)
def cleanup(self):
if self._sync_manager:
logger.info("SSH: syncing files from sandbox...")
self._sync_manager.sync_back()
if self.control_socket.exists():
try:
cmd = ["ssh", "-o", f"ControlPath={self.control_socket}",