mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(modal): pipe stdin to avoid ARG_MAX, clean up review findings
- Modal bulk upload: stream base64 payload through proc.stdin in 1MB chunks instead of embedding in command string (Modal SDK enforces 64KB ARG_MAX_BYTES — typical payloads are ~4.3MB) - Modal single-file upload: same stdin fix, add exit code checking - Remove what-narrating comments in ssh.py and modal.py (keep WHY comments: symlink staging rationale, SIGPIPE, deadlock avoidance) - Remove unnecessary `sandbox = self._sandbox` alias in modal bulk - Daytona: use shared helpers (unique_parent_dirs, quoted_mkdir_command) instead of inlined duplicates
This commit is contained in:
parent
04d4f41e77
commit
9000d3163a
4 changed files with 137 additions and 50 deletions
|
|
@ -27,26 +27,46 @@ def _make_mock_modal_env(monkeypatch, tmp_path):
|
|||
return env
|
||||
|
||||
|
||||
def _make_mock_stdin():
|
||||
"""Create a mock stdin that captures written data."""
|
||||
stdin = MagicMock()
|
||||
written_chunks = []
|
||||
|
||||
def mock_write(data):
|
||||
written_chunks.append(data)
|
||||
|
||||
stdin.write = mock_write
|
||||
stdin.write_eof = MagicMock()
|
||||
stdin.drain = MagicMock()
|
||||
stdin.drain.aio = AsyncMock()
|
||||
stdin._written_chunks = written_chunks
|
||||
return stdin
|
||||
|
||||
|
||||
def _wire_async_exec(env, exec_calls=None):
|
||||
"""Wire mock sandbox.exec.aio and a real run_coroutine on the env.
|
||||
|
||||
Optionally captures exec call args into *exec_calls* list.
|
||||
Returns a dict that will contain ``run_coroutine`` kwargs after
|
||||
the next call (useful for timeout assertions).
|
||||
Returns (exec_calls, run_kwargs, stdin_mock).
|
||||
"""
|
||||
if exec_calls is None:
|
||||
exec_calls = []
|
||||
run_kwargs: dict = {}
|
||||
stdin_mock = _make_mock_stdin()
|
||||
|
||||
async def mock_exec(*args, **kwargs):
|
||||
async def mock_exec_fn(*args, **kwargs):
|
||||
exec_calls.append(args)
|
||||
proc = MagicMock()
|
||||
proc.wait = MagicMock()
|
||||
proc.wait.aio = AsyncMock(return_value=0)
|
||||
proc.stdin = stdin_mock
|
||||
proc.stderr = MagicMock()
|
||||
proc.stderr.read = MagicMock()
|
||||
proc.stderr.read.aio = AsyncMock(return_value="")
|
||||
return proc
|
||||
|
||||
env._sandbox.exec = MagicMock()
|
||||
env._sandbox.exec.aio = mock_exec
|
||||
env._sandbox.exec.aio = mock_exec_fn
|
||||
|
||||
def real_run_coroutine(coro, **kwargs):
|
||||
run_kwargs.update(kwargs)
|
||||
|
|
@ -57,7 +77,7 @@ def _wire_async_exec(env, exec_calls=None):
|
|||
loop.close()
|
||||
|
||||
env._worker.run_coroutine = real_run_coroutine
|
||||
return exec_calls, run_kwargs
|
||||
return exec_calls, run_kwargs, stdin_mock
|
||||
|
||||
|
||||
class TestModalBulkUpload:
|
||||
|
|
@ -70,7 +90,7 @@ class TestModalBulkUpload:
|
|||
env._worker.run_coroutine.assert_not_called()
|
||||
|
||||
def test_tar_archive_contains_all_files(self, monkeypatch, tmp_path):
|
||||
"""The tar archive sent to the sandbox should contain all files."""
|
||||
"""The tar archive sent via stdin should contain all files."""
|
||||
env = _make_mock_modal_env(monkeypatch, tmp_path)
|
||||
|
||||
src_a = tmp_path / "a.json"
|
||||
|
|
@ -83,10 +103,10 @@ class TestModalBulkUpload:
|
|||
(str(src_b), "/root/.hermes/skills/b.py"),
|
||||
]
|
||||
|
||||
exec_calls, _ = _wire_async_exec(env)
|
||||
exec_calls, _, stdin_mock = _wire_async_exec(env)
|
||||
env._modal_bulk_upload(files)
|
||||
|
||||
# Verify exec was called with bash -c and a tar command
|
||||
# Verify the command reads from stdin (no echo with embedded payload)
|
||||
assert len(exec_calls) == 1
|
||||
args = exec_calls[0]
|
||||
assert args[0] == "bash"
|
||||
|
|
@ -97,12 +117,8 @@ class TestModalBulkUpload:
|
|||
assert "tar xzf" in cmd
|
||||
assert "-C /" in cmd
|
||||
|
||||
# Extract the base64 payload and verify tar contents
|
||||
import re
|
||||
match = re.search(r"echo '?([A-Za-z0-9+/=]+)'?", cmd)
|
||||
assert match, f"Could not find base64 payload in command: {cmd}"
|
||||
payload = match.group(1)
|
||||
|
||||
# Reassemble the base64 payload from stdin chunks and verify tar contents
|
||||
payload = "".join(stdin_mock._written_chunks)
|
||||
tar_data = base64.b64decode(payload)
|
||||
buf = io.BytesIO(tar_data)
|
||||
with tarfile.open(fileobj=buf, mode="r:gz") as tar:
|
||||
|
|
@ -116,6 +132,9 @@ class TestModalBulkUpload:
|
|||
b_content = tar.extractfile("root/.hermes/skills/b.py").read()
|
||||
assert b_content == b"skill_content"
|
||||
|
||||
# Verify stdin was closed
|
||||
stdin_mock.write_eof.assert_called_once()
|
||||
|
||||
def test_mkdir_includes_all_parents(self, monkeypatch, tmp_path):
|
||||
"""Remote parent directories should be pre-created in the command."""
|
||||
env = _make_mock_modal_env(monkeypatch, tmp_path)
|
||||
|
|
@ -128,7 +147,7 @@ class TestModalBulkUpload:
|
|||
(str(src), "/root/.hermes/skills/deep/nested/f.txt"),
|
||||
]
|
||||
|
||||
exec_calls, _ = _wire_async_exec(env)
|
||||
exec_calls, _, _ = _wire_async_exec(env)
|
||||
env._modal_bulk_upload(files)
|
||||
|
||||
cmd = exec_calls[0][2]
|
||||
|
|
@ -145,7 +164,7 @@ class TestModalBulkUpload:
|
|||
src.write_text(f"content_{i}")
|
||||
files.append((str(src), f"/root/.hermes/cache/file_{i}.txt"))
|
||||
|
||||
exec_calls, _ = _wire_async_exec(env)
|
||||
exec_calls, _, _ = _wire_async_exec(env)
|
||||
env._modal_bulk_upload(files)
|
||||
|
||||
# Should be exactly 1 exec call, not 20
|
||||
|
|
@ -189,7 +208,7 @@ class TestModalBulkUpload:
|
|||
src.write_text("data")
|
||||
files = [(str(src), "/root/.hermes/f.txt")]
|
||||
|
||||
_, run_kwargs = _wire_async_exec(env)
|
||||
_, run_kwargs, _ = _wire_async_exec(env)
|
||||
env._modal_bulk_upload(files)
|
||||
|
||||
assert run_kwargs.get("timeout") == 120
|
||||
|
|
@ -202,14 +221,20 @@ class TestModalBulkUpload:
|
|||
src.write_text("data")
|
||||
files = [(str(src), "/root/.hermes/f.txt")]
|
||||
|
||||
async def mock_exec(*args, **kwargs):
|
||||
stdin_mock = _make_mock_stdin()
|
||||
|
||||
async def mock_exec_fn(*args, **kwargs):
|
||||
proc = MagicMock()
|
||||
proc.wait = MagicMock()
|
||||
proc.wait.aio = AsyncMock(return_value=1) # non-zero exit
|
||||
proc.stdin = stdin_mock
|
||||
proc.stderr = MagicMock()
|
||||
proc.stderr.read = MagicMock()
|
||||
proc.stderr.read.aio = AsyncMock(return_value="tar: error")
|
||||
return proc
|
||||
|
||||
env._sandbox.exec = MagicMock()
|
||||
env._sandbox.exec.aio = mock_exec
|
||||
env._sandbox.exec.aio = mock_exec_fn
|
||||
|
||||
def real_run_coroutine(coro, **kwargs):
|
||||
loop = asyncio.new_event_loop()
|
||||
|
|
@ -222,3 +247,49 @@ class TestModalBulkUpload:
|
|||
|
||||
with pytest.raises(RuntimeError, match="Modal bulk upload failed"):
|
||||
env._modal_bulk_upload(files)
|
||||
|
||||
def test_payload_not_in_command_string(self, monkeypatch, tmp_path):
|
||||
"""The base64 payload must NOT appear in the bash -c argument.
|
||||
|
||||
This is the core ARG_MAX fix: the payload goes through stdin,
|
||||
not embedded in the command string.
|
||||
"""
|
||||
env = _make_mock_modal_env(monkeypatch, tmp_path)
|
||||
|
||||
src = tmp_path / "f.txt"
|
||||
src.write_text("some data to upload")
|
||||
files = [(str(src), "/root/.hermes/f.txt")]
|
||||
|
||||
exec_calls, _, stdin_mock = _wire_async_exec(env)
|
||||
env._modal_bulk_upload(files)
|
||||
|
||||
# The command should NOT contain an echo with the payload
|
||||
cmd = exec_calls[0][2]
|
||||
assert "echo" not in cmd
|
||||
# The payload should go through stdin
|
||||
assert len(stdin_mock._written_chunks) > 0
|
||||
|
||||
def test_stdin_chunked_for_large_payloads(self, monkeypatch, tmp_path):
|
||||
"""Payloads larger than _STDIN_CHUNK_SIZE should be split into multiple writes."""
|
||||
env = _make_mock_modal_env(monkeypatch, tmp_path)
|
||||
|
||||
# Use random bytes so gzip cannot compress them -- ensures the
|
||||
# base64 payload exceeds one 1 MB chunk.
|
||||
import os as _os
|
||||
src = tmp_path / "large.bin"
|
||||
src.write_bytes(_os.urandom(1024 * 1024 + 512 * 1024))
|
||||
files = [(str(src), "/root/.hermes/large.bin")]
|
||||
|
||||
exec_calls, _, stdin_mock = _wire_async_exec(env)
|
||||
env._modal_bulk_upload(files)
|
||||
|
||||
# Should have multiple stdin write chunks
|
||||
assert len(stdin_mock._written_chunks) >= 2
|
||||
|
||||
# Reassembled payload should still decode to valid tar
|
||||
payload = "".join(stdin_mock._written_chunks)
|
||||
tar_data = base64.b64decode(payload)
|
||||
buf = io.BytesIO(tar_data)
|
||||
with tarfile.open(fileobj=buf, mode="r:gz") as tar:
|
||||
names = tar.getnames()
|
||||
assert "root/.hermes/large.bin" in names
|
||||
|
|
|
|||
|
|
@ -15,7 +15,13 @@ from tools.environments.base import (
|
|||
BaseEnvironment,
|
||||
_ThreadedProcessHandle,
|
||||
)
|
||||
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
|
||||
from tools.environments.file_sync import (
|
||||
FileSyncManager,
|
||||
iter_sync_files,
|
||||
quoted_mkdir_command,
|
||||
quoted_rm_command,
|
||||
unique_parent_dirs,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -150,11 +156,9 @@ class DaytonaEnvironment(BaseEnvironment):
|
|||
if not files:
|
||||
return
|
||||
|
||||
# Pre-create all unique parent directories in one shell call
|
||||
parents = sorted({str(Path(remote).parent) for _, remote in files})
|
||||
parents = unique_parent_dirs(files)
|
||||
if parents:
|
||||
mkdir_cmd = "mkdir -p " + " ".join(shlex.quote(p) for p in parents)
|
||||
self._sandbox.process.exec(mkdir_cmd)
|
||||
self._sandbox.process.exec(quoted_mkdir_command(parents))
|
||||
|
||||
uploads = [
|
||||
FileUpload(source=host_path, destination=remote_path)
|
||||
|
|
|
|||
|
|
@ -274,55 +274,75 @@ class ModalEnvironment(BaseEnvironment):
|
|||
self.init_session()
|
||||
|
||||
def _modal_upload(self, host_path: str, remote_path: str) -> None:
|
||||
"""Upload a single file via base64-over-exec."""
|
||||
"""Upload a single file via base64 piped through stdin."""
|
||||
content = Path(host_path).read_bytes()
|
||||
b64 = base64.b64encode(content).decode("ascii")
|
||||
container_dir = str(Path(remote_path).parent)
|
||||
cmd = (
|
||||
f"mkdir -p {shlex.quote(container_dir)} && "
|
||||
f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(remote_path)}"
|
||||
f"base64 -d > {shlex.quote(remote_path)}"
|
||||
)
|
||||
|
||||
async def _write():
|
||||
proc = await self._sandbox.exec.aio("bash", "-c", cmd)
|
||||
offset = 0
|
||||
chunk_size = self._STDIN_CHUNK_SIZE
|
||||
while offset < len(b64):
|
||||
proc.stdin.write(b64[offset:offset + chunk_size])
|
||||
await proc.stdin.drain.aio()
|
||||
offset += chunk_size
|
||||
proc.stdin.write_eof()
|
||||
await proc.stdin.drain.aio()
|
||||
await proc.wait.aio()
|
||||
|
||||
self._worker.run_coroutine(_write(), timeout=15)
|
||||
self._worker.run_coroutine(_write(), timeout=30)
|
||||
|
||||
# Modal SDK stdin buffer limit (legacy server path). The command-router
|
||||
# path allows 16 MB, but we must stay under the smaller 2 MB cap for
|
||||
# compatibility. Chunks are written below this threshold and flushed
|
||||
# individually via drain().
|
||||
_STDIN_CHUNK_SIZE = 1 * 1024 * 1024 # 1 MB — safe for both transport paths
|
||||
|
||||
def _modal_bulk_upload(self, files: list[tuple[str, str]]) -> None:
|
||||
"""Upload many files in a single exec call via tar archive.
|
||||
"""Upload many files via tar archive piped through stdin.
|
||||
|
||||
Builds a gzipped tar archive in memory, base64-encodes it, and
|
||||
decodes+extracts in one ``exec`` call. Avoids per-file
|
||||
exec+encoding overhead (~580 files goes from minutes to seconds).
|
||||
Builds a gzipped tar archive in memory and streams it into a
|
||||
``base64 -d | tar xzf -`` pipeline via the process's stdin,
|
||||
avoiding the Modal SDK's 64 KB ``ARG_MAX_BYTES`` exec-arg limit.
|
||||
"""
|
||||
if not files:
|
||||
return
|
||||
|
||||
# Build a tar archive in memory with files at their remote paths
|
||||
buf = io.BytesIO()
|
||||
with tarfile.open(fileobj=buf, mode="w:gz") as tar:
|
||||
for host_path, remote_path in files:
|
||||
# Store with leading '/' stripped so extracting at '/'
|
||||
# recreates the full absolute path
|
||||
tar.add(host_path, arcname=remote_path.lstrip("/"))
|
||||
payload = base64.b64encode(buf.getvalue()).decode("ascii")
|
||||
|
||||
# Pre-create parent dirs + decode + extract in one exec call
|
||||
parents = unique_parent_dirs(files)
|
||||
mkdir_part = quoted_mkdir_command(parents)
|
||||
cmd = (
|
||||
f"{mkdir_part} && "
|
||||
f"echo {shlex.quote(payload)} | base64 -d | tar xzf - -C /"
|
||||
)
|
||||
sandbox = self._sandbox
|
||||
cmd = f"{mkdir_part} && base64 -d | tar xzf - -C /"
|
||||
|
||||
async def _bulk():
|
||||
proc = await sandbox.exec.aio("bash", "-c", cmd)
|
||||
proc = await self._sandbox.exec.aio("bash", "-c", cmd)
|
||||
|
||||
# Stream payload through stdin in chunks to stay under the
|
||||
# SDK's per-write buffer limit (2 MB legacy / 16 MB router).
|
||||
offset = 0
|
||||
chunk_size = self._STDIN_CHUNK_SIZE
|
||||
while offset < len(payload):
|
||||
proc.stdin.write(payload[offset:offset + chunk_size])
|
||||
await proc.stdin.drain.aio()
|
||||
offset += chunk_size
|
||||
|
||||
proc.stdin.write_eof()
|
||||
await proc.stdin.drain.aio()
|
||||
|
||||
exit_code = await proc.wait.aio()
|
||||
if exit_code != 0:
|
||||
stderr_text = await proc.stderr.read.aio()
|
||||
raise RuntimeError(
|
||||
f"Modal bulk upload failed (exit {exit_code})"
|
||||
f"Modal bulk upload failed (exit {exit_code}): {stderr_text}"
|
||||
)
|
||||
|
||||
self._worker.run_coroutine(_bulk(), timeout=120)
|
||||
|
|
|
|||
|
|
@ -152,7 +152,6 @@ class SSHEnvironment(BaseEnvironment):
|
|||
if not files:
|
||||
return
|
||||
|
||||
# Pre-create all unique parent directories in one SSH call
|
||||
parents = unique_parent_dirs(files)
|
||||
if parents:
|
||||
cmd = self._build_ssh_command()
|
||||
|
|
@ -164,18 +163,11 @@ class SSHEnvironment(BaseEnvironment):
|
|||
# Symlink staging avoids fragile GNU tar --transform rules.
|
||||
with tempfile.TemporaryDirectory(prefix="hermes-ssh-bulk-") as staging:
|
||||
for host_path, remote_path in files:
|
||||
# remote_path is absolute (e.g. /home/user/.hermes/skills/foo.md)
|
||||
# Create the directory structure under staging
|
||||
staged = os.path.join(staging, remote_path.lstrip("/"))
|
||||
os.makedirs(os.path.dirname(staged), exist_ok=True)
|
||||
# Symlink to the actual file (avoid copying)
|
||||
os.symlink(os.path.abspath(host_path), staged)
|
||||
|
||||
# tar: dereference symlinks (-h), create archive from staging root
|
||||
# The archive paths are relative to staging, which mirrors / on remote
|
||||
tar_cmd = ["tar", "-chf", "-", "-C", staging, "."]
|
||||
|
||||
# ssh: extract on remote at /
|
||||
ssh_cmd = self._build_ssh_command()
|
||||
ssh_cmd.append("tar xf - -C /")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue