From 6713274a4297ab1cf601d93655b458ab3e66d083 Mon Sep 17 00:00:00 2001 From: LeonSGP43 Date: Sun, 3 May 2026 22:59:04 +0800 Subject: [PATCH] fix(file): strip leaked terminal fences from reads --- tests/tools/test_file_operations.py | 52 +++++++++++++++++++++++++++++ tools/file_operations.py | 44 ++++++++++++++++++++---- 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/tests/tools/test_file_operations.py b/tests/tools/test_file_operations.py index dfd54ba634..500cd6141a 100644 --- a/tests/tools/test_file_operations.py +++ b/tests/tools/test_file_operations.py @@ -271,6 +271,58 @@ class TestShellFileOpsHelpers: ops = ShellFileOperations(env) assert ops.cwd == "/" + def test_read_file_strips_leaked_terminal_fence_markers(self, mock_env): + leaked = ( + "'\x07__HERMES_FENCE_a9f7b3__\x1b]0;cat " + "'/tmp/test/a.py' 2> /dev/null\x07\n" + "print('ok')\n" + "__HERMES_FENCE_a9f7b3__\x07'\n" + ) + + def side_effect(command, **kwargs): + if command.startswith("wc -c"): + return {"output": "12\n", "returncode": 0} + if command.startswith("head -c"): + return {"output": "print('ok')\n", "returncode": 0} + if command.startswith("sed -n"): + return {"output": leaked, "returncode": 0} + if command.startswith("wc -l"): + return {"output": "1\n", "returncode": 0} + return {"output": "", "returncode": 0} + + mock_env.execute.side_effect = side_effect + ops = ShellFileOperations(mock_env) + result = ops.read_file("/tmp/test/a.py") + + assert result.error is None + assert "HERMES_FENCE" not in result.content + assert "\x1b]" not in result.content + assert "\x07" not in result.content + assert " 1|print('ok')" in result.content + + def test_read_file_raw_strips_leaked_terminal_fence_markers(self, mock_env): + leaked = ( + "__HERMES_FENCE_a9f7b3__\x07'\n" + "alpha\n" + "\x1b]0;cat '/tmp/test/a.txt'\x07__HERMES_FENCE_a9f7b3__\n" + ) + + def side_effect(command, **kwargs): + if command.startswith("wc -c"): + return {"output": "6\n", "returncode": 0} + if command.startswith("head -c"): + return {"output": "alpha\n", "returncode": 0} + if command.startswith("cat "): + return {"output": leaked, "returncode": 0} + return {"output": "", "returncode": 0} + + mock_env.execute.side_effect = side_effect + ops = ShellFileOperations(mock_env) + result = ops.read_file_raw("/tmp/test/a.txt") + + assert result.error is None + assert result.content == "alpha\n" + class TestSearchPathValidation: """Test that search() returns an error for non-existent paths.""" diff --git a/tools/file_operations.py b/tools/file_operations.py index aa7a482509..73e739e730 100644 --- a/tools/file_operations.py +++ b/tools/file_operations.py @@ -53,6 +53,27 @@ WRITE_DENIED_PATHS = build_write_denied_paths(_HOME) WRITE_DENIED_PREFIXES = build_write_denied_prefixes(_HOME) +_OSC_SEQUENCE_RE = re.compile(r"\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)") +_FENCE_MARKER_RE = re.compile(r"'?\x07?__HERMES_FENCE_[A-Za-z0-9]+__\x07?'?") + + +def _strip_terminal_fence_leaks(text: str) -> str: + """Strip leaked terminal fence wrappers from file read output.""" + if not text: + return text + + cleaned_lines: List[str] = [] + for line in text.splitlines(keepends=True): + had_terminal_wrapper = "__HERMES_FENCE_" in line or "\x1b]" in line + cleaned = _OSC_SEQUENCE_RE.sub("", line) + cleaned = _FENCE_MARKER_RE.sub("", cleaned) + cleaned = cleaned.replace("\x07", "") + if had_terminal_wrapper and cleaned.strip("'\r\n\t ") == "": + continue + cleaned_lines.append(cleaned) + return "".join(cleaned_lines) + + def _get_safe_write_root() -> Optional[str]: """Return the resolved HERMES_WRITE_SAFE_ROOT path, or None if unset. @@ -511,8 +532,9 @@ class ShellFileOperations(FileOperations): # File not found - try to suggest similar files return self._suggest_similar_files(path) + stat_output = _strip_terminal_fence_leaks(stat_result.stdout) try: - file_size = int(stat_result.stdout.strip()) + file_size = int(stat_output.strip()) except ValueError: file_size = 0 @@ -536,8 +558,9 @@ class ShellFileOperations(FileOperations): # Read a sample to check for binary content sample_cmd = f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null" sample_result = self._exec(sample_cmd) + sample_output = _strip_terminal_fence_leaks(sample_result.stdout) - if self._is_likely_binary(path, sample_result.stdout): + if self._is_likely_binary(path, sample_output): return ReadResult( is_binary=True, file_size=file_size, @@ -551,12 +574,14 @@ class ShellFileOperations(FileOperations): if read_result.exit_code != 0: return ReadResult(error=f"Failed to read file: {read_result.stdout}") + read_output = _strip_terminal_fence_leaks(read_result.stdout) # Get total line count wc_cmd = f"wc -l < {self._escape_shell_arg(path)}" wc_result = self._exec(wc_cmd) + wc_output = _strip_terminal_fence_leaks(wc_result.stdout) try: - total_lines = int(wc_result.stdout.strip()) + total_lines = int(wc_output.strip()) except ValueError: total_lines = 0 @@ -567,7 +592,7 @@ class ShellFileOperations(FileOperations): hint = f"Use offset={end_line + 1} to continue reading (showing {offset}-{end_line} of {total_lines} lines)" return ReadResult( - content=self._add_line_numbers(read_result.stdout, offset), + content=self._add_line_numbers(read_output, offset), total_lines=total_lines, file_size=file_size, truncated=truncated, @@ -637,14 +662,16 @@ class ShellFileOperations(FileOperations): stat_result = self._exec(stat_cmd) if stat_result.exit_code != 0: return self._suggest_similar_files(path) + stat_output = _strip_terminal_fence_leaks(stat_result.stdout) try: - file_size = int(stat_result.stdout.strip()) + file_size = int(stat_output.strip()) except ValueError: file_size = 0 if self._is_image(path): return ReadResult(is_image=True, is_binary=True, file_size=file_size) sample_result = self._exec(f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null") - if self._is_likely_binary(path, sample_result.stdout): + sample_output = _strip_terminal_fence_leaks(sample_result.stdout) + if self._is_likely_binary(path, sample_output): return ReadResult( is_binary=True, file_size=file_size, error="Binary file — cannot display as text." @@ -652,7 +679,10 @@ class ShellFileOperations(FileOperations): cat_result = self._exec(f"cat {self._escape_shell_arg(path)}") if cat_result.exit_code != 0: return ReadResult(error=f"Failed to read file: {cat_result.stdout}") - return ReadResult(content=cat_result.stdout, file_size=file_size) + return ReadResult( + content=_strip_terminal_fence_leaks(cat_result.stdout), + file_size=file_size, + ) def delete_file(self, path: str) -> WriteResult: """Delete a file via rm."""