diff --git a/tests/tools/test_file_operations.py b/tests/tools/test_file_operations.py index f809ea5d912..b5f06248f5a 100644 --- a/tests/tools/test_file_operations.py +++ b/tests/tools/test_file_operations.py @@ -638,12 +638,14 @@ class TestPatchReplacePostWriteVerification: state = {"content": "hello world\n"} def side_effect(command, stdin_data=None, **kwargs): - # Write is `cat > path` — detect by the `>` redirect, NOT just `cat ` - if command.startswith("cat >"): - if stdin_data is not None: - state["content"] = stdin_data + # A write is the only call that pipes content over stdin — key + # on that behavioral signal rather than the exact write command, + # which is an atomic temp-file + mv script (`set -e; ... mv ...`), + # not a bare `cat > path`. + if stdin_data is not None: + state["content"] = stdin_data return {"output": "", "returncode": 0} - if command.startswith("cat "): # read + if command.startswith("cat "): # read / verify return {"output": state["content"], "returncode": 0} if command.startswith("mkdir "): return {"output": "", "returncode": 0} @@ -664,9 +666,8 @@ class TestPatchReplacePostWriteVerification: state = {"content": "hello world\n"} def side_effect(command, stdin_data=None, **kwargs): - if command.startswith("cat >"): # write - if stdin_data is not None: - state["content"] = stdin_data + if stdin_data is not None: # write (atomic temp-file + mv script) + state["content"] = stdin_data return {"output": "", "returncode": 0} if command.startswith("cat "): # read call_count["cat"] += 1 diff --git a/tests/tools/test_file_write_safety.py b/tests/tools/test_file_write_safety.py index e2eef17ab1d..a2bb05dd13a 100644 --- a/tests/tools/test_file_write_safety.py +++ b/tests/tools/test_file_write_safety.py @@ -107,5 +107,81 @@ class TestCheckSensitivePathMacOSBypass: assert _check_sensitive_path("/tmp/safe_file.txt") is None +class TestAtomicWrite: + """write_file / patch land via a temp-file + atomic rename. + + The invariant: a write that fails partway NEVER corrupts the existing + file, and the swap is a real rename (so a reader either sees the full + old content or the full new content, never a half-written file). These + run against a real LocalEnvironment so the actual shell script executes. + """ + + @pytest.fixture + def ops(self, tmp_path: Path): + from tools.environments.local import LocalEnvironment + from tools.file_operations import ShellFileOperations + env = LocalEnvironment(cwd=str(tmp_path)) + return ShellFileOperations(env, cwd=str(tmp_path)) + + def test_overwrite_changes_inode(self, ops, tmp_path: Path): + # A real rename allocates a new inode for the target; an in-place + # rewrite would keep the same inode. This proves the swap is atomic. + target = tmp_path / "f.txt" + target.write_text("v1") + ino_before = os.stat(target).st_ino + res = ops.write_file(str(target), "v2 content") + assert res.error is None, res.error + assert target.read_text() == "v2 content" + assert os.stat(target).st_ino != ino_before + + def test_overwrite_preserves_mode(self, ops, tmp_path: Path): + target = tmp_path / "perms.txt" + target.write_text("old") + os.chmod(target, 0o640) + res = ops.write_file(str(target), "new") + assert res.error is None, res.error + assert (os.stat(target).st_mode & 0o777) == 0o640 + + def test_failed_write_leaves_original_intact(self, ops, tmp_path: Path): + # A read-only parent directory means the temp file can't be created, + # so the write fails BEFORE any rename. The original must survive + # byte-for-byte and no temp file may be left behind. + if hasattr(os, "geteuid") and os.geteuid() == 0: + pytest.skip("root bypasses directory permission bits") + locked = tmp_path / "locked" + locked.mkdir() + target = locked / "f.txt" + target.write_text("ORIGINAL\n") + os.chmod(locked, 0o500) # r-x: cannot create entries inside + try: + res = ops.write_file(str(target), "SHOULD NOT LAND") + finally: + os.chmod(locked, 0o700) # restore for cleanup + assert res.error is not None + assert target.read_text() == "ORIGINAL\n" + assert [p for p in os.listdir(locked) if ".hermes-tmp" in p] == [] + + def test_no_temp_file_leaked_on_success(self, ops, tmp_path: Path): + target = tmp_path / "f.txt" + ops.write_file(str(target), "hello\n") + assert [p for p in os.listdir(tmp_path) if ".hermes-tmp" in p] == [] + + def test_special_chars_roundtrip(self, ops, tmp_path: Path): + target = tmp_path / "special.txt" + tricky = "q 'single' \"double\" $VAR `cmd` \\back\nünïcödé 日本語\n" + res = ops.write_file(str(target), tricky) + assert res.error is None, res.error + assert target.read_text(encoding="utf-8") == tricky + + def test_patch_routes_through_atomic_write(self, ops, tmp_path: Path): + target = tmp_path / "edit.py" + target.write_text("a = 1\nb = 2\nc = 3\n") + os.chmod(target, 0o600) + res = ops.patch_replace(str(target), "b = 2", "b = 22") + assert res.success, res.error + assert target.read_text() == "a = 1\nb = 22\nc = 3\n" + assert (os.stat(target).st_mode & 0o777) == 0o600 + + if __name__ == "__main__": pytest.main([__file__, "-v"]) diff --git a/tools/file_operations.py b/tools/file_operations.py index b27405c58d7..386ca2171b2 100644 --- a/tools/file_operations.py +++ b/tools/file_operations.py @@ -726,6 +726,60 @@ class ShellFileOperations(FileOperations): # Use single quotes and escape any single quotes in the string return "'" + arg.replace("'", "'\"'\"'") + "'" + def _atomic_write(self, path: str, content: str) -> "ExecuteResult": + """Write ``content`` to ``path`` atomically via temp-file + rename. + + Streams ``content`` over stdin into a temp file in the SAME + directory as ``path`` (so the final ``mv`` is a real rename on the + same filesystem, not a non-atomic cross-device copy), preserves the + existing file's mode if it exists, then renames over the target. + On any failure the temp file is removed so we never leak a partial + ``.hermes-tmp`` file next to the user's data, and the original file + is left untouched. Content rides stdin so there is no ARG_MAX limit. + + Returns an :class:`ExecuteResult`; ``exit_code == 0`` means the file + was swapped into place atomically. A non-zero exit means nothing was + renamed and the original (if any) is intact. + """ + q_path = self._escape_shell_arg(path) + parent = os.path.dirname(path) or "." + q_parent = self._escape_shell_arg(parent) + # template basename: hidden so it doesn't show up in casual `ls`, + # carries a marker so an orphaned temp (only possible on a hard + # crash *between* cat and mv) is identifiable. + tmpl = self._escape_shell_arg(".hermes-tmp.XXXXXX") + + # One shell script, fully quoted. Notes: + # - `mktemp` lands the temp in the target's own dir (-p) so `mv` is + # same-FS atomic; we fall back to a PID-stamped name if the + # backend lacks mktemp (rare; busybox/macOS/Linux all ship it). + # - `chmod --reference` is GNU-only, so we read the octal mode with + # `stat` (GNU `-c%a` or BSD `-f%Lp`) and `chmod` it explicitly; + # silent best-effort — a perms-copy failure must not abort the + # write, the file still lands with default umask perms. + # - `trap ... EXIT` guarantees the temp is removed on every error + # path (cat failure, mv failure, signal) but NOT after a + # successful mv (the temp no longer exists by then). + # - we `cat >` the temp, then `mv -f` it over the target. + script = ( + "set -e; " + f"d={q_parent}; t={q_path}; " + 'tmp="$(mktemp -p "$d" ' + tmpl + ' 2>/dev/null ' + '|| mktemp "$d/.hermes-tmp.$$.XXXXXX" 2>/dev/null ' + '|| { tmp="$d/.hermes-tmp.$$"; : > "$tmp" && echo "$tmp"; })"; ' + '[ -n "$tmp" ] || { echo "atomic write: could not create temp file" >&2; exit 1; }; ' + "trap 'rm -f \"$tmp\"' EXIT; " + # preserve mode of an existing target (best-effort, never fatal) + 'if [ -e "$t" ]; then ' + 'm="$(stat -c%a "$t" 2>/dev/null || stat -f%Lp "$t" 2>/dev/null || true)"; ' + '[ -n "$m" ] && chmod "$m" "$tmp" 2>/dev/null || true; ' + "fi; " + 'cat > "$tmp"; ' + 'mv -f "$tmp" "$t"; ' + "trap - EXIT" + ) + return self._exec(script, stdin_data=content) + def _detect_file_line_ending(self, path: str, pre_content: Optional[str] = None) -> Optional[str]: """Detect the dominant line ending of a file on disk. @@ -1053,10 +1107,22 @@ class ShellFileOperations(FileOperations): if mkdir_result.exit_code == 0: dirs_created = True - # Write via stdin pipe — content bypasses shell arg parsing entirely, - # so there's no ARG_MAX limit regardless of file size. - write_cmd = f"cat > {self._escape_shell_arg(path)}" - write_result = self._exec(write_cmd, stdin_data=content) + # Write atomically: stream into a temp file in the SAME directory, + # then ``mv`` it over the target. The rename is atomic on POSIX + # (and on every backend FS we run on), so a crash / power loss / + # truncated pipe mid-write leaves the original file intact instead + # of a half-written corrupt file. Same-directory is load-bearing — + # ``mv`` across filesystems degrades to copy+unlink, which is NOT + # atomic; keeping the temp beside the target guarantees a real + # rename. Content still rides stdin so there's no ARG_MAX limit. + # + # The temp file is created with ``mktemp`` (collision-safe) when the + # backend has it, falling back to a PID-stamped name otherwise. We + # then chmod the temp to match the existing file's mode (if any) so + # the atomic swap doesn't silently widen or narrow permissions, and + # clean the temp up on any failure so we never leak a ``.hermes-tmp`` + # turd next to the user's file. + write_result = self._atomic_write(path, content) if write_result.exit_code != 0: return WriteResult(error=f"Failed to write file: {write_result.stdout}")