fix(file): strip leaked terminal fences from reads

This commit is contained in:
LeonSGP43 2026-05-03 22:59:04 +08:00 committed by Teknium
parent 2d7543c61f
commit 6713274a42
2 changed files with 89 additions and 7 deletions

View file

@ -271,6 +271,58 @@ class TestShellFileOpsHelpers:
ops = ShellFileOperations(env) ops = ShellFileOperations(env)
assert ops.cwd == "/" assert ops.cwd == "/"
def test_read_file_strips_leaked_terminal_fence_markers(self, mock_env):
leaked = (
"'\x07__HERMES_FENCE_a9f7b3__\x1b]0;cat "
"'/tmp/test/a.py' 2> /dev/null\x07\n"
"print('ok')\n"
"__HERMES_FENCE_a9f7b3__\x07'\n"
)
def side_effect(command, **kwargs):
if command.startswith("wc -c"):
return {"output": "12\n", "returncode": 0}
if command.startswith("head -c"):
return {"output": "print('ok')\n", "returncode": 0}
if command.startswith("sed -n"):
return {"output": leaked, "returncode": 0}
if command.startswith("wc -l"):
return {"output": "1\n", "returncode": 0}
return {"output": "", "returncode": 0}
mock_env.execute.side_effect = side_effect
ops = ShellFileOperations(mock_env)
result = ops.read_file("/tmp/test/a.py")
assert result.error is None
assert "HERMES_FENCE" not in result.content
assert "\x1b]" not in result.content
assert "\x07" not in result.content
assert " 1|print('ok')" in result.content
def test_read_file_raw_strips_leaked_terminal_fence_markers(self, mock_env):
leaked = (
"__HERMES_FENCE_a9f7b3__\x07'\n"
"alpha\n"
"\x1b]0;cat '/tmp/test/a.txt'\x07__HERMES_FENCE_a9f7b3__\n"
)
def side_effect(command, **kwargs):
if command.startswith("wc -c"):
return {"output": "6\n", "returncode": 0}
if command.startswith("head -c"):
return {"output": "alpha\n", "returncode": 0}
if command.startswith("cat "):
return {"output": leaked, "returncode": 0}
return {"output": "", "returncode": 0}
mock_env.execute.side_effect = side_effect
ops = ShellFileOperations(mock_env)
result = ops.read_file_raw("/tmp/test/a.txt")
assert result.error is None
assert result.content == "alpha\n"
class TestSearchPathValidation: class TestSearchPathValidation:
"""Test that search() returns an error for non-existent paths.""" """Test that search() returns an error for non-existent paths."""

View file

@ -53,6 +53,27 @@ WRITE_DENIED_PATHS = build_write_denied_paths(_HOME)
WRITE_DENIED_PREFIXES = build_write_denied_prefixes(_HOME) WRITE_DENIED_PREFIXES = build_write_denied_prefixes(_HOME)
_OSC_SEQUENCE_RE = re.compile(r"\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)")
_FENCE_MARKER_RE = re.compile(r"'?\x07?__HERMES_FENCE_[A-Za-z0-9]+__\x07?'?")
def _strip_terminal_fence_leaks(text: str) -> str:
"""Strip leaked terminal fence wrappers from file read output."""
if not text:
return text
cleaned_lines: List[str] = []
for line in text.splitlines(keepends=True):
had_terminal_wrapper = "__HERMES_FENCE_" in line or "\x1b]" in line
cleaned = _OSC_SEQUENCE_RE.sub("", line)
cleaned = _FENCE_MARKER_RE.sub("", cleaned)
cleaned = cleaned.replace("\x07", "")
if had_terminal_wrapper and cleaned.strip("'\r\n\t ") == "":
continue
cleaned_lines.append(cleaned)
return "".join(cleaned_lines)
def _get_safe_write_root() -> Optional[str]: def _get_safe_write_root() -> Optional[str]:
"""Return the resolved HERMES_WRITE_SAFE_ROOT path, or None if unset. """Return the resolved HERMES_WRITE_SAFE_ROOT path, or None if unset.
@ -511,8 +532,9 @@ class ShellFileOperations(FileOperations):
# File not found - try to suggest similar files # File not found - try to suggest similar files
return self._suggest_similar_files(path) return self._suggest_similar_files(path)
stat_output = _strip_terminal_fence_leaks(stat_result.stdout)
try: try:
file_size = int(stat_result.stdout.strip()) file_size = int(stat_output.strip())
except ValueError: except ValueError:
file_size = 0 file_size = 0
@ -536,8 +558,9 @@ class ShellFileOperations(FileOperations):
# Read a sample to check for binary content # Read a sample to check for binary content
sample_cmd = f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null" sample_cmd = f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null"
sample_result = self._exec(sample_cmd) sample_result = self._exec(sample_cmd)
sample_output = _strip_terminal_fence_leaks(sample_result.stdout)
if self._is_likely_binary(path, sample_result.stdout): if self._is_likely_binary(path, sample_output):
return ReadResult( return ReadResult(
is_binary=True, is_binary=True,
file_size=file_size, file_size=file_size,
@ -551,12 +574,14 @@ class ShellFileOperations(FileOperations):
if read_result.exit_code != 0: if read_result.exit_code != 0:
return ReadResult(error=f"Failed to read file: {read_result.stdout}") return ReadResult(error=f"Failed to read file: {read_result.stdout}")
read_output = _strip_terminal_fence_leaks(read_result.stdout)
# Get total line count # Get total line count
wc_cmd = f"wc -l < {self._escape_shell_arg(path)}" wc_cmd = f"wc -l < {self._escape_shell_arg(path)}"
wc_result = self._exec(wc_cmd) wc_result = self._exec(wc_cmd)
wc_output = _strip_terminal_fence_leaks(wc_result.stdout)
try: try:
total_lines = int(wc_result.stdout.strip()) total_lines = int(wc_output.strip())
except ValueError: except ValueError:
total_lines = 0 total_lines = 0
@ -567,7 +592,7 @@ class ShellFileOperations(FileOperations):
hint = f"Use offset={end_line + 1} to continue reading (showing {offset}-{end_line} of {total_lines} lines)" hint = f"Use offset={end_line + 1} to continue reading (showing {offset}-{end_line} of {total_lines} lines)"
return ReadResult( return ReadResult(
content=self._add_line_numbers(read_result.stdout, offset), content=self._add_line_numbers(read_output, offset),
total_lines=total_lines, total_lines=total_lines,
file_size=file_size, file_size=file_size,
truncated=truncated, truncated=truncated,
@ -637,14 +662,16 @@ class ShellFileOperations(FileOperations):
stat_result = self._exec(stat_cmd) stat_result = self._exec(stat_cmd)
if stat_result.exit_code != 0: if stat_result.exit_code != 0:
return self._suggest_similar_files(path) return self._suggest_similar_files(path)
stat_output = _strip_terminal_fence_leaks(stat_result.stdout)
try: try:
file_size = int(stat_result.stdout.strip()) file_size = int(stat_output.strip())
except ValueError: except ValueError:
file_size = 0 file_size = 0
if self._is_image(path): if self._is_image(path):
return ReadResult(is_image=True, is_binary=True, file_size=file_size) return ReadResult(is_image=True, is_binary=True, file_size=file_size)
sample_result = self._exec(f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null") sample_result = self._exec(f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null")
if self._is_likely_binary(path, sample_result.stdout): sample_output = _strip_terminal_fence_leaks(sample_result.stdout)
if self._is_likely_binary(path, sample_output):
return ReadResult( return ReadResult(
is_binary=True, file_size=file_size, is_binary=True, file_size=file_size,
error="Binary file — cannot display as text." error="Binary file — cannot display as text."
@ -652,7 +679,10 @@ class ShellFileOperations(FileOperations):
cat_result = self._exec(f"cat {self._escape_shell_arg(path)}") cat_result = self._exec(f"cat {self._escape_shell_arg(path)}")
if cat_result.exit_code != 0: if cat_result.exit_code != 0:
return ReadResult(error=f"Failed to read file: {cat_result.stdout}") return ReadResult(error=f"Failed to read file: {cat_result.stdout}")
return ReadResult(content=cat_result.stdout, file_size=file_size) return ReadResult(
content=_strip_terminal_fence_leaks(cat_result.stdout),
file_size=file_size,
)
def delete_file(self, path: str) -> WriteResult: def delete_file(self, path: str) -> WriteResult:
"""Delete a file via rm.""" """Delete a file via rm."""