hermes-agent/tests/tools/test_ssh_bulk_upload.py
Siddharth Balyan 27eeea0555
perf(ssh,modal): bulk file sync via tar pipe and tar/base64 archive (#8014)
* perf(ssh,modal): bulk file sync via tar pipe and tar/base64 archive

SSH: symlink-staging + tar -ch piped over SSH in a single TCP stream.
Eliminates per-file scp round-trips. Handles timeout (kills both
processes), SSH Popen failure (kills tar), and tar create failure.

Modal: in-memory gzipped tar archive, base64-encoded, decoded+extracted
in one exec call. Checks exit code and raises on failure.

Both backends use shared helpers extracted into file_sync.py:
- quoted_mkdir_command() — mirrors existing quoted_rm_command()
- unique_parent_dirs() — deduplicates parent dirs from file pairs

Migrates _ensure_remote_dirs to use the new helpers.

28 new tests (21 SSH + 7 Modal), all passing.

Closes #7465
Closes #7467

* fix(modal): pipe stdin to avoid ARG_MAX, clean up review findings

- Modal bulk upload: stream base64 payload through proc.stdin in 1MB
  chunks instead of embedding in command string (Modal SDK enforces
  64KB ARG_MAX_BYTES — typical payloads are ~4.3MB)
- Modal single-file upload: same stdin fix, add exit code checking
- Remove what-narrating comments in ssh.py and modal.py (keep WHY
  comments: symlink staging rationale, SIGPIPE, deadlock avoidance)
- Remove unnecessary `sandbox = self._sandbox` alias in modal bulk
- Daytona: use shared helpers (unique_parent_dirs, quoted_mkdir_command)
  instead of inlined duplicates

---------

Co-authored-by: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com>
2026-04-12 06:18:05 +05:30

517 lines
19 KiB
Python

"""Tests for SSH bulk upload via tar pipe."""
import os
import subprocess
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from tools.environments import ssh as ssh_env
from tools.environments.file_sync import quoted_mkdir_command, unique_parent_dirs
from tools.environments.ssh import SSHEnvironment
def _mock_proc(*, returncode=0, poll_return=0, communicate_return=(b"", b""),
stderr_read=b""):
"""Create a MagicMock mimicking subprocess.Popen for tar/ssh pipes."""
m = MagicMock()
m.stdout = MagicMock()
m.returncode = returncode
m.poll.return_value = poll_return
m.communicate.return_value = communicate_return
m.stderr = MagicMock()
m.stderr.read.return_value = stderr_read
return m
@pytest.fixture
def mock_env(monkeypatch):
"""Create an SSHEnvironment with mocked connection/sync."""
monkeypatch.setattr(ssh_env.shutil, "which", lambda _name: "/usr/bin/ssh")
monkeypatch.setattr(ssh_env.SSHEnvironment, "_establish_connection", lambda self: None)
monkeypatch.setattr(ssh_env.SSHEnvironment, "_detect_remote_home", lambda self: "/home/testuser")
monkeypatch.setattr(ssh_env.SSHEnvironment, "_ensure_remote_dirs", lambda self: None)
monkeypatch.setattr(ssh_env.SSHEnvironment, "init_session", lambda self: None)
monkeypatch.setattr(
ssh_env, "FileSyncManager",
lambda **kw: type("M", (), {"sync": lambda self, **k: None})(),
)
return SSHEnvironment(host="example.com", user="testuser")
class TestSSHBulkUpload:
"""Unit tests for _ssh_bulk_upload — tar pipe mechanics."""
def test_empty_files_is_noop(self, mock_env):
"""Empty file list should not spawn any subprocesses."""
with patch.object(subprocess, "run") as mock_run, \
patch.object(subprocess, "Popen") as mock_popen:
mock_env._ssh_bulk_upload([])
mock_run.assert_not_called()
mock_popen.assert_not_called()
def test_mkdir_batched_into_single_call(self, mock_env, tmp_path):
"""All parent directories should be created in one SSH call."""
# Create test files
f1 = tmp_path / "a.txt"
f1.write_text("aaa")
f2 = tmp_path / "b.txt"
f2.write_text("bbb")
files = [
(str(f1), "/home/testuser/.hermes/skills/a.txt"),
(str(f2), "/home/testuser/.hermes/credentials/b.txt"),
]
# Mock subprocess.run for mkdir and Popen for tar pipe
mock_run = MagicMock(return_value=subprocess.CompletedProcess([], 0))
def make_proc(cmd, **kwargs):
m = MagicMock()
m.stdout = MagicMock()
m.returncode = 0
m.poll.return_value = 0
m.communicate.return_value = (b"", b"")
m.stderr = MagicMock()
m.stderr.read.return_value = b""
return m
with patch.object(subprocess, "run", mock_run), \
patch.object(subprocess, "Popen", side_effect=make_proc):
mock_env._ssh_bulk_upload(files)
# Exactly one subprocess.run call for mkdir
assert mock_run.call_count == 1
mkdir_cmd = mock_run.call_args[0][0]
# Should contain mkdir -p with both parent dirs
mkdir_str = " ".join(mkdir_cmd)
assert "mkdir -p" in mkdir_str
assert "/home/testuser/.hermes/skills" in mkdir_str
assert "/home/testuser/.hermes/credentials" in mkdir_str
def test_staging_symlinks_mirror_remote_layout(self, mock_env, tmp_path):
"""Symlinks in staging dir should mirror the remote path structure."""
f1 = tmp_path / "local_a.txt"
f1.write_text("content a")
files = [
(str(f1), "/home/testuser/.hermes/skills/my_skill.md"),
]
staging_paths = []
def capture_tar_cmd(cmd, **kwargs):
if cmd[0] == "tar":
# Capture the staging dir from -C argument
c_idx = cmd.index("-C")
staging_dir = cmd[c_idx + 1]
# Check the symlink exists
expected = os.path.join(
staging_dir, "home/testuser/.hermes/skills/my_skill.md"
)
staging_paths.append(expected)
assert os.path.islink(expected), f"Expected symlink at {expected}"
assert os.readlink(expected) == os.path.abspath(str(f1))
mock = MagicMock()
mock.stdout = MagicMock()
mock.returncode = 0
mock.poll.return_value = 0
mock.communicate.return_value = (b"", b"")
mock.stderr = MagicMock()
mock.stderr.read.return_value = b""
return mock
with patch.object(subprocess, "run",
return_value=subprocess.CompletedProcess([], 0)), \
patch.object(subprocess, "Popen", side_effect=capture_tar_cmd):
mock_env._ssh_bulk_upload(files)
assert len(staging_paths) == 1, "tar command should have been called"
def test_tar_pipe_commands(self, mock_env, tmp_path):
"""Verify tar and SSH commands are wired correctly."""
f1 = tmp_path / "x.txt"
f1.write_text("x")
files = [(str(f1), "/home/testuser/.hermes/cache/x.txt")]
popen_cmds = []
def capture_popen(cmd, **kwargs):
popen_cmds.append(cmd)
mock = MagicMock()
mock.stdout = MagicMock()
mock.returncode = 0
mock.poll.return_value = 0
mock.communicate.return_value = (b"", b"")
mock.stderr = MagicMock()
mock.stderr.read.return_value = b""
return mock
with patch.object(subprocess, "run",
return_value=subprocess.CompletedProcess([], 0)), \
patch.object(subprocess, "Popen", side_effect=capture_popen):
mock_env._ssh_bulk_upload(files)
assert len(popen_cmds) == 2, "Should spawn tar + ssh processes"
tar_cmd = popen_cmds[0]
ssh_cmd = popen_cmds[1]
# tar: create, dereference symlinks, to stdout
assert tar_cmd[0] == "tar"
assert "-chf" in tar_cmd
assert "-" in tar_cmd # stdout
assert "-C" in tar_cmd
# ssh: extract from stdin at /
ssh_str = " ".join(ssh_cmd)
assert "ssh" in ssh_str
assert "tar xf - -C /" in ssh_str
assert "testuser@example.com" in ssh_str
def test_mkdir_failure_raises(self, mock_env, tmp_path):
"""mkdir failure should raise RuntimeError before tar pipe."""
f1 = tmp_path / "y.txt"
f1.write_text("y")
files = [(str(f1), "/home/testuser/.hermes/skills/y.txt")]
failed_run = subprocess.CompletedProcess([], 1, stderr="Permission denied")
with patch.object(subprocess, "run", return_value=failed_run):
with pytest.raises(RuntimeError, match="remote mkdir failed"):
mock_env._ssh_bulk_upload(files)
def test_tar_create_failure_raises(self, mock_env, tmp_path):
"""tar create failure should raise RuntimeError."""
f1 = tmp_path / "z.txt"
f1.write_text("z")
files = [(str(f1), "/home/testuser/.hermes/skills/z.txt")]
mock_tar = MagicMock()
mock_tar.stdout = MagicMock()
mock_tar.returncode = 1
mock_tar.poll.return_value = 1
mock_tar.communicate.return_value = (b"tar: error", b"")
mock_tar.stderr = MagicMock()
mock_tar.stderr.read.return_value = b"tar: error"
mock_ssh = MagicMock()
mock_ssh.communicate.return_value = (b"", b"")
mock_ssh.returncode = 0
def popen_side_effect(cmd, **kwargs):
if cmd[0] == "tar":
return mock_tar
return mock_ssh
with patch.object(subprocess, "run",
return_value=subprocess.CompletedProcess([], 0)), \
patch.object(subprocess, "Popen", side_effect=popen_side_effect):
with pytest.raises(RuntimeError, match="tar create failed"):
mock_env._ssh_bulk_upload(files)
def test_ssh_extract_failure_raises(self, mock_env, tmp_path):
"""SSH tar extract failure should raise RuntimeError."""
f1 = tmp_path / "w.txt"
f1.write_text("w")
files = [(str(f1), "/home/testuser/.hermes/skills/w.txt")]
mock_tar = MagicMock()
mock_tar.stdout = MagicMock()
mock_tar.returncode = 0
mock_tar.poll.return_value = 0
mock_tar.communicate.return_value = (b"", b"")
mock_tar.stderr = MagicMock()
mock_tar.stderr.read.return_value = b""
mock_ssh = MagicMock()
mock_ssh.communicate.return_value = (b"", b"Permission denied")
mock_ssh.returncode = 1
def popen_side_effect(cmd, **kwargs):
if cmd[0] == "tar":
return mock_tar
return mock_ssh
with patch.object(subprocess, "run",
return_value=subprocess.CompletedProcess([], 0)), \
patch.object(subprocess, "Popen", side_effect=popen_side_effect):
with pytest.raises(RuntimeError, match="tar extract over SSH failed"):
mock_env._ssh_bulk_upload(files)
def test_ssh_command_uses_control_socket(self, mock_env, tmp_path):
"""SSH command for tar extract should reuse ControlMaster socket."""
f1 = tmp_path / "c.txt"
f1.write_text("c")
files = [(str(f1), "/home/testuser/.hermes/cache/c.txt")]
popen_cmds = []
def capture_popen(cmd, **kwargs):
popen_cmds.append(cmd)
mock = MagicMock()
mock.stdout = MagicMock()
mock.returncode = 0
mock.poll.return_value = 0
mock.communicate.return_value = (b"", b"")
mock.stderr = MagicMock()
mock.stderr.read.return_value = b""
return mock
with patch.object(subprocess, "run",
return_value=subprocess.CompletedProcess([], 0)), \
patch.object(subprocess, "Popen", side_effect=capture_popen):
mock_env._ssh_bulk_upload(files)
# The SSH command (second Popen call) should include ControlPath
ssh_cmd = popen_cmds[1]
assert f"ControlPath={mock_env.control_socket}" in " ".join(ssh_cmd)
def test_custom_port_and_key_in_ssh_command(self, monkeypatch, tmp_path):
"""Bulk upload SSH command should include custom port and key."""
monkeypatch.setattr(ssh_env.shutil, "which", lambda _name: "/usr/bin/ssh")
monkeypatch.setattr(ssh_env.SSHEnvironment, "_establish_connection", lambda self: None)
monkeypatch.setattr(ssh_env.SSHEnvironment, "_detect_remote_home", lambda self: "/home/u")
monkeypatch.setattr(ssh_env.SSHEnvironment, "_ensure_remote_dirs", lambda self: None)
monkeypatch.setattr(ssh_env.SSHEnvironment, "init_session", lambda self: None)
monkeypatch.setattr(
ssh_env, "FileSyncManager",
lambda **kw: type("M", (), {"sync": lambda self, **k: None})(),
)
env = SSHEnvironment(host="h", user="u", port=2222, key_path="/my/key")
f1 = tmp_path / "d.txt"
f1.write_text("d")
files = [(str(f1), "/home/u/.hermes/skills/d.txt")]
run_cmds = []
popen_cmds = []
def capture_run(cmd, **kwargs):
run_cmds.append(cmd)
return subprocess.CompletedProcess([], 0)
def capture_popen(cmd, **kwargs):
popen_cmds.append(cmd)
mock = MagicMock()
mock.stdout = MagicMock()
mock.returncode = 0
mock.poll.return_value = 0
mock.communicate.return_value = (b"", b"")
mock.stderr = MagicMock()
mock.stderr.read.return_value = b""
return mock
with patch.object(subprocess, "run", side_effect=capture_run), \
patch.object(subprocess, "Popen", side_effect=capture_popen):
env._ssh_bulk_upload(files)
# Check mkdir SSH call includes port and key
assert len(run_cmds) == 1
mkdir_cmd = run_cmds[0]
assert "-p" in mkdir_cmd and "2222" in mkdir_cmd
assert "-i" in mkdir_cmd and "/my/key" in mkdir_cmd
# Check tar extract SSH call includes port and key
ssh_cmd = popen_cmds[1]
assert "-p" in ssh_cmd and "2222" in ssh_cmd
assert "-i" in ssh_cmd and "/my/key" in ssh_cmd
def test_parent_dirs_deduplicated(self, mock_env, tmp_path):
"""Multiple files in the same dir should produce one mkdir entry."""
f1 = tmp_path / "a.txt"
f1.write_text("a")
f2 = tmp_path / "b.txt"
f2.write_text("b")
f3 = tmp_path / "c.txt"
f3.write_text("c")
files = [
(str(f1), "/home/testuser/.hermes/skills/a.txt"),
(str(f2), "/home/testuser/.hermes/skills/b.txt"),
(str(f3), "/home/testuser/.hermes/credentials/c.txt"),
]
run_cmds = []
def capture_run(cmd, **kwargs):
run_cmds.append(cmd)
return subprocess.CompletedProcess([], 0)
def make_mock_proc(cmd, **kwargs):
mock = MagicMock()
mock.stdout = MagicMock()
mock.returncode = 0
mock.poll.return_value = 0
mock.communicate.return_value = (b"", b"")
mock.stderr = MagicMock()
mock.stderr.read.return_value = b""
return mock
with patch.object(subprocess, "run", side_effect=capture_run), \
patch.object(subprocess, "Popen", side_effect=make_mock_proc):
mock_env._ssh_bulk_upload(files)
# Only one mkdir call
assert len(run_cmds) == 1
mkdir_str = " ".join(run_cmds[0])
# skills dir should appear exactly once despite two files
assert mkdir_str.count("/home/testuser/.hermes/skills") == 1
assert "/home/testuser/.hermes/credentials" in mkdir_str
def test_tar_stdout_closed_for_sigpipe(self, mock_env, tmp_path):
"""tar_proc.stdout must be closed so SIGPIPE propagates correctly."""
f1 = tmp_path / "s.txt"
f1.write_text("s")
files = [(str(f1), "/home/testuser/.hermes/skills/s.txt")]
mock_tar_stdout = MagicMock()
def make_proc(cmd, **kwargs):
mock = MagicMock()
if cmd[0] == "tar":
mock.stdout = mock_tar_stdout
else:
mock.stdout = MagicMock()
mock.returncode = 0
mock.poll.return_value = 0
mock.communicate.return_value = (b"", b"")
mock.stderr = MagicMock()
mock.stderr.read.return_value = b""
return mock
with patch.object(subprocess, "run",
return_value=subprocess.CompletedProcess([], 0)), \
patch.object(subprocess, "Popen", side_effect=make_proc):
mock_env._ssh_bulk_upload(files)
mock_tar_stdout.close.assert_called_once()
def test_timeout_kills_both_processes(self, mock_env, tmp_path):
"""TimeoutExpired during communicate should kill both processes."""
f1 = tmp_path / "t.txt"
f1.write_text("t")
files = [(str(f1), "/home/testuser/.hermes/skills/t.txt")]
mock_tar = MagicMock()
mock_tar.stdout = MagicMock()
mock_tar.returncode = None
mock_tar.poll.return_value = None
mock_ssh = MagicMock()
mock_ssh.communicate.side_effect = subprocess.TimeoutExpired("ssh", 120)
mock_ssh.returncode = None
def make_proc(cmd, **kwargs):
if cmd[0] == "tar":
return mock_tar
return mock_ssh
with patch.object(subprocess, "run",
return_value=subprocess.CompletedProcess([], 0)), \
patch.object(subprocess, "Popen", side_effect=make_proc):
with pytest.raises(RuntimeError, match="SSH bulk upload timed out"):
mock_env._ssh_bulk_upload(files)
mock_tar.kill.assert_called_once()
mock_ssh.kill.assert_called_once()
class TestSSHBulkUploadWiring:
"""Verify bulk_upload_fn is wired into FileSyncManager."""
def test_filesyncmanager_receives_bulk_upload_fn(self, monkeypatch):
"""SSHEnvironment should pass _ssh_bulk_upload to FileSyncManager."""
monkeypatch.setattr(ssh_env.shutil, "which", lambda _name: "/usr/bin/ssh")
monkeypatch.setattr(ssh_env.SSHEnvironment, "_establish_connection", lambda self: None)
monkeypatch.setattr(ssh_env.SSHEnvironment, "_detect_remote_home", lambda self: "/root")
monkeypatch.setattr(ssh_env.SSHEnvironment, "_ensure_remote_dirs", lambda self: None)
monkeypatch.setattr(ssh_env.SSHEnvironment, "init_session", lambda self: None)
captured_kwargs = {}
class FakeSyncManager:
def __init__(self, **kwargs):
captured_kwargs.update(kwargs)
def sync(self, **kw):
pass
monkeypatch.setattr(ssh_env, "FileSyncManager", FakeSyncManager)
env = SSHEnvironment(host="h", user="u")
assert "bulk_upload_fn" in captured_kwargs
assert captured_kwargs["bulk_upload_fn"] is not None
# Should be the bound method
assert callable(captured_kwargs["bulk_upload_fn"])
class TestSharedHelpers:
"""Direct unit tests for file_sync.py helpers."""
def test_quoted_mkdir_command_basic(self):
result = quoted_mkdir_command(["/a", "/b/c"])
assert result == "mkdir -p /a /b/c"
def test_quoted_mkdir_command_quotes_special_chars(self):
result = quoted_mkdir_command(["/path/with spaces", "/path/'quotes'"])
assert "mkdir -p" in result
# shlex.quote wraps in single quotes
assert "'/path/with spaces'" in result
def test_quoted_mkdir_command_empty(self):
result = quoted_mkdir_command([])
assert result == "mkdir -p "
def test_unique_parent_dirs_deduplicates(self):
files = [
("/local/a.txt", "/remote/dir/a.txt"),
("/local/b.txt", "/remote/dir/b.txt"),
("/local/c.txt", "/remote/other/c.txt"),
]
result = unique_parent_dirs(files)
assert result == ["/remote/dir", "/remote/other"]
def test_unique_parent_dirs_sorted(self):
files = [
("/local/z.txt", "/z/file.txt"),
("/local/a.txt", "/a/file.txt"),
]
result = unique_parent_dirs(files)
assert result == ["/a", "/z"]
def test_unique_parent_dirs_empty(self):
assert unique_parent_dirs([]) == []
class TestSSHBulkUploadEdgeCases:
"""Edge cases for _ssh_bulk_upload."""
def test_ssh_popen_failure_kills_tar(self, mock_env, tmp_path):
"""If SSH Popen raises, tar process must be killed and cleaned up."""
f1 = tmp_path / "e.txt"
f1.write_text("e")
files = [(str(f1), "/home/testuser/.hermes/skills/e.txt")]
mock_tar = _mock_proc()
call_count = 0
def failing_ssh_popen(cmd, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
return mock_tar # tar Popen succeeds
raise OSError("SSH binary not found")
with patch.object(subprocess, "run",
return_value=subprocess.CompletedProcess([], 0)), \
patch.object(subprocess, "Popen", side_effect=failing_ssh_popen):
with pytest.raises(OSError, match="SSH binary not found"):
mock_env._ssh_bulk_upload(files)
mock_tar.kill.assert_called_once()
mock_tar.wait.assert_called_once()