From 6bd0be30bee215ff5dab20dcebfb9481050bf96d Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Mon, 25 May 2026 15:18:45 -0700 Subject: [PATCH] feat(patch): indentation preservation, CRLF preservation, per-file failure escalation (#507) (#32273) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three granular patch-tool refinements from the Roo Code deep-dive (#507). ## Indentation preservation (fuzzy_match.py) When fuzzy_find_and_replace matches via a non-exact strategy, the file's indentation may differ from what the LLM sent in old_string/new_string (common case: model sends zero-indent old/new for a method body that lives inside an 8-space-indented class). Before this commit the replacement was spliced in verbatim, producing a file with a broken indent level that may still parse but is logically wrong. The fix computes the indent delta between old_string's first meaningful line and the matched region's first meaningful line, then re-indents every line of new_string by that delta. Exact-strategy matches are untouched (passthrough). Same approach as Roo Code's multi-search-replace.ts:466-500. ## CRLF preservation (file_operations.py) Models nearly always send tool args with bare LF endings (JSON-encoded), but the file on disk may have CRLF (Windows-line-ending configs, .bat, .cmd, .ini files). Before this commit: - write_file silently normalized CRLF to LF on every overwrite - patch produced mixed-ending files: the substituted region had LF, the surrounding context kept CRLF The fix detects the file's existing line endings (via pre_content if already read for lint/LSP, otherwise a tiny head -c 4096 probe), and normalizes the entire write to that ending. New files are written verbatim (no detection possible). ## Per-file failure escalation (file_tools.py) When the agent fails to patch the same file 3+ times in a row, the existing 'old_string not found' hint isn't strong enough — the model keeps retrying with variations against a stale view of the file. The fix tracks consecutive failures per (task_id, resolved_path) and injects an escalating hint after 3 failures: 'This is failure #N patching X. Stop retrying. Either re-read fresh, use longer context, or fall back to write_file.' Counter resets on a successful patch to the same path. ## Validation - 22 new tests across tests/tools/test_fuzzy_match.py (5), test_line_ending_preservation.py (12), test_patch_failure_tracking.py (5) - All existing tests pass (165/165 in the touched files) - E2E verified with real _handle_patch / _handle_write_file calls against real CRLF files and real failure loops Closes part of #507. The remaining open items in #507 (2b start_line hint, behavioral rules) were declined after audit: - 2b adds schema bloat for a problem the existing 'multiple matches' contract already handles - Behavioral rules conflict with the personality system Items 1, 2d, 2e, 3, 4 of #507 were already landed in earlier work. --- tests/tools/test_fuzzy_match.py | 100 ++++++++ tests/tools/test_line_ending_preservation.py | 238 +++++++++++++++++++ tests/tools/test_patch_failure_tracking.py | 222 +++++++++++++++++ tools/file_operations.py | 88 ++++++- tools/file_tools.py | 70 +++++- tools/fuzzy_match.py | 116 ++++++++- 6 files changed, 824 insertions(+), 10 deletions(-) create mode 100644 tests/tools/test_line_ending_preservation.py create mode 100644 tests/tools/test_patch_failure_tracking.py diff --git a/tests/tools/test_fuzzy_match.py b/tests/tools/test_fuzzy_match.py index 3f7d3158202..b4e3640e2bd 100644 --- a/tests/tools/test_fuzzy_match.py +++ b/tests/tools/test_fuzzy_match.py @@ -52,6 +52,106 @@ class TestIndentDifference: assert "bar" in new +class TestIndentationPreservation: + """When a non-exact strategy matches, ``new_string`` should be re-indented + so it lands at the file's actual indent depth — not at whatever indent the + LLM happened to send in the tool args. Without this fix the file gets a + silently-broken indent level that may even still parse but is logically + wrong.""" + + def test_unindented_input_reindented_to_match_file(self): + # File: 8-space-indented method body inside a class. + content = ( + "class Calculator:\n" + " def add(self, a, b):\n" + " result = a + b\n" + " return result\n" + ) + # LLM sends zero-indent old/new — common bug from frontier models + # that "remember" code instead of reading it. + old = "result = a + b\nreturn result" + new = "result = a + b\nresult *= 2\nreturn result" + out, count, strategy, err = fuzzy_find_and_replace(content, old, new) + assert err is None and count == 1 + assert strategy != "exact" # must have gone through a fuzzy strategy + # Every replaced line should be at 8-space indent. + for marker in ("result = a + b", "result *= 2", "return result"): + line = next(line for line in out.split("\n") if marker in line) + indent = len(line) - len(line.lstrip()) + assert indent == 8, f"Expected 8-space indent for {marker!r}, got {indent}: {line!r}" + # Resulting file must still be valid Python. + import ast + ast.parse(out) + + def test_dedent_at_start_anchors_to_file_base(self): + # File: 2-space-indented function body. LLM sends zero-indent + # old/new where new_string contains a dedent (the new structure + # adds a top-level class wrapper). After re-indent, every line + # of new_string should be anchored to the file's 2-space base. + content = " return 1\n return 2\n" + old = "return 1\nreturn 2" # zero-indent — forces line_trimmed + new = "class X:\n return 99\n return 100" + out, count, strategy, err = fuzzy_find_and_replace(content, old, new) + assert err is None and count == 1 + assert strategy != "exact" + lines = out.split("\n") + # 'class X:' anchored to file's 2-space base. + assert lines[0] == " class X:", repr(lines[0]) + # Indented body lines lift to 4-space (file base + LLM's +2). + assert lines[1] == " return 99", repr(lines[1]) + assert lines[2] == " return 100", repr(lines[2]) + + def test_exact_match_no_reindent(self): + # Exact strategy should be a pure passthrough — no shift logic + # should touch the result. + content = " def foo():\n return 1\n" + old = " def foo():\n return 1" + new = " def foo():\n return 2" + out, count, strategy, err = fuzzy_find_and_replace(content, old, new) + assert err is None and strategy == "exact" + assert out == " def foo():\n return 2\n" + + def test_llm_zero_indent_shifts_to_file_two_space(self): + # LLM sent zero-indent old/new; file has 2-space indent. The + # re-indent shifts the whole replacement so 'def x()' lands at + # 2-space and the body keeps its relative +2 from new_string. + content = " def x():\n return 1\n" + old = "def x():\n return 1" + new = "def x():\n return 99" + out, count, _, err = fuzzy_find_and_replace(content, old, new) + assert err is None and count == 1 + lines = out.strip("\n").split("\n") + assert lines[0] == " def x():" + assert lines[1] == " return 99" + + def test_indent_already_matches_passthrough(self): + # When old_string's base indent already equals file_region's base + # indent, _reindent_replacement returns new_string unchanged. + # Verify with whitespace_normalized strategy (collapsed spaces). + content = " def x( ):\n return 1\n" + old = " def x():\n return 1" # same base indent (2), different inner whitespace + new = " def x():\n return 42" + out, count, strategy, err = fuzzy_find_and_replace(content, old, new) + assert err is None and count == 1 + assert strategy != "exact" # non-exact strategy matched + # Body retains its 4-space indent (passthrough — no shift). + assert " return 42" in out + + def test_blank_lines_left_alone(self): + # Blank lines in new_string should keep whatever whitespace they + # had — we never strip or pad them. + content = " a = 1\n b = 2\n" + old = "a = 1\nb = 2" + new = "a = 1\n\nb = 99" + out, count, _, err = fuzzy_find_and_replace(content, old, new) + assert err is None and count == 1 + # blank line is preserved (empty), indented lines anchored. + lines = out.split("\n") + assert lines[0] == " a = 1" + assert lines[1] == "" + assert lines[2] == " b = 99" + + class TestReplaceAll: def test_multiple_matches_without_flag_errors(self): content = "aaa bbb aaa" diff --git a/tests/tools/test_line_ending_preservation.py b/tests/tools/test_line_ending_preservation.py new file mode 100644 index 00000000000..82c055cb810 --- /dev/null +++ b/tests/tools/test_line_ending_preservation.py @@ -0,0 +1,238 @@ +"""Tests for CRLF line-ending preservation in write_file and patch. + +Without this, the agent silently normalizes Windows-line-ending files +to LF whenever it edits them — and patch produces a mixed-ending file +when only a substituted region changes (the rest of the file keeps its +CRLF endings while the replacement is LF-only). + +See issue #507 (Roo Code deep-dive, item 2c). +""" + +import json +import os +import tempfile + +import pytest + + +@pytest.fixture +def hermes_home(monkeypatch, tmp_path): + """Isolate HERMES_HOME so the tests don't pollute the real config. + + Also clears module-level caches (file_ops, active_environments, + file-staleness state) after the test so subsequent tests in the + same pytest process aren't affected by our shell-out side effects + (real file_ops and terminal environments get created under + task_id='default' via _resolve_container_task_id). + """ + home = tmp_path / "hermes" + home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(home)) + yield home + # Cleanup: drop the cached file_ops and active environment so the + # next test sees a fresh state. Without this, _get_live_tracking_cwd + # returns the stale cwd from this test's ops and breaks tests like + # test_resolve_path that rely on TERMINAL_CWD env var. + try: + from tools.file_tools import clear_file_ops_cache, _read_tracker_lock, _read_tracker + clear_file_ops_cache() + with _read_tracker_lock: + _read_tracker.clear() + except Exception: + pass + try: + from tools.terminal_tool import _active_environments, _env_lock + with _env_lock: + _active_environments.clear() + except Exception: + pass + + +def _crlf_count(b: bytes) -> int: + return b.count(b"\r\n") + + +def _bare_lf_count(b: bytes) -> int: + return b.count(b"\n") - b.count(b"\r\n") + + +class TestPatchCRLFPreservation: + def test_patch_on_crlf_file_stays_pure_crlf(self, hermes_home, tmp_path): + """LLM sends LF old/new; file has CRLF. Result must be all CRLF, + no mixed endings.""" + from tools.file_tools import _handle_patch + + target = tmp_path / "config.ini" + target.write_bytes(b"[a]\r\nkey=1\r\n\r\n[b]\r\nkey=2\r\n") + + result = _handle_patch( + { + "mode": "replace", + "path": str(target), + "old_string": "key=1", + "new_string": "key=99", + }, + task_id="crlf_patch_1", + ) + d = json.loads(result) + assert not d.get("error"), d + + raw = target.read_bytes() + assert _bare_lf_count(raw) == 0, ( + f"Mixed line endings after patch: {raw!r}" + ) + # Same number of line breaks as before; just the value swapped. + assert _crlf_count(raw) == 5 + assert b"key=99\r\n" in raw + + def test_patch_on_lf_file_stays_lf(self, hermes_home, tmp_path): + """LF file with LF new_string stays LF — no spurious CRLF added.""" + from tools.file_tools import _handle_patch + + target = tmp_path / "config.ini" + target.write_bytes(b"[a]\nkey=1\n\n[b]\nkey=2\n") + + result = _handle_patch( + { + "mode": "replace", + "path": str(target), + "old_string": "key=1", + "new_string": "key=99", + }, + task_id="crlf_patch_2", + ) + d = json.loads(result) + assert not d.get("error"), d + + raw = target.read_bytes() + assert _crlf_count(raw) == 0, ( + f"Spurious CRLF added to LF file: {raw!r}" + ) + + def test_patch_multiline_replacement_on_crlf(self, hermes_home, tmp_path): + """Multi-line new_string with bare LFs should be CRLF-converted + before write.""" + from tools.file_tools import _handle_patch + + target = tmp_path / "f.py" + target.write_bytes(b"def foo():\r\n return 1\r\n") + + result = _handle_patch( + { + "mode": "replace", + "path": str(target), + "old_string": "def foo():\n return 1", + "new_string": "def foo():\n x = 1\n return x", + }, + task_id="crlf_patch_3", + ) + d = json.loads(result) + assert not d.get("error"), d + + raw = target.read_bytes() + assert _bare_lf_count(raw) == 0, ( + f"Mixed endings after multi-line patch: {raw!r}" + ) + assert raw == b"def foo():\r\n x = 1\r\n return x\r\n" + + +class TestWriteFileCRLFPreservation: + def test_overwrite_crlf_file_with_lf_content_preserves_crlf( + self, hermes_home, tmp_path + ): + """The agent typically sends bare-LF content; if the file existed + with CRLF, the write should convert to CRLF rather than silently + flipping the endings.""" + from tools.file_tools import _handle_write_file + + target = tmp_path / "config.bat" + target.write_bytes(b"@echo off\r\nset X=1\r\n") + + result = _handle_write_file( + { + "path": str(target), + "content": "@echo off\nset X=99\nset Y=42\n", + }, + task_id="crlf_write_1", + ) + d = json.loads(result) + assert "error" not in d, d + + raw = target.read_bytes() + assert _bare_lf_count(raw) == 0, ( + f"CRLF file got normalized to LF: {raw!r}" + ) + assert _crlf_count(raw) == 3 + + def test_new_file_written_as_is(self, hermes_home, tmp_path): + """No pre-existing file → write content verbatim (LF by default).""" + from tools.file_tools import _handle_write_file + + target = tmp_path / "new.txt" + result = _handle_write_file( + {"path": str(target), "content": "a\nb\nc\n"}, + task_id="crlf_write_2", + ) + d = json.loads(result) + assert "error" not in d, d + + assert target.read_bytes() == b"a\nb\nc\n" + + def test_overwrite_lf_file_stays_lf(self, hermes_home, tmp_path): + """Pre-existing LF file should not get spurious CRLFs.""" + from tools.file_tools import _handle_write_file + + target = tmp_path / "lf.txt" + target.write_bytes(b"line1\nline2\n") + + result = _handle_write_file( + {"path": str(target), "content": "X\nY\nZ\n"}, + task_id="crlf_write_3", + ) + d = json.loads(result) + assert "error" not in d, d + + raw = target.read_bytes() + assert _crlf_count(raw) == 0 + assert raw == b"X\nY\nZ\n" + + +class TestLineEndingHelpers: + """Direct unit tests for the pure helpers — easier to debug than the + integration tests above.""" + + def test_detect_crlf(self): + from tools.file_operations import _detect_line_ending + + assert _detect_line_ending("a\r\nb\r\n") == "\r\n" + + def test_detect_lf(self): + from tools.file_operations import _detect_line_ending + + assert _detect_line_ending("a\nb\n") == "\n" + + def test_detect_empty(self): + from tools.file_operations import _detect_line_ending + + assert _detect_line_ending("") is None + assert _detect_line_ending("no newline here") is None + + def test_detect_mixed_picks_crlf(self): + """Mixed-ending content (any CRLF in the head) returns CRLF — + we prefer to normalize TO CRLF rather than away from it, since + a single CRLF in the file is usually a Windows-origin marker.""" + from tools.file_operations import _detect_line_ending + + assert _detect_line_ending("a\nb\r\nc\n") == "\r\n" + + def test_normalize_to_lf_strips_cr(self): + from tools.file_operations import _normalize_line_endings + + assert _normalize_line_endings("a\r\nb\rc\n", "\n") == "a\nb\nc\n" + + def test_normalize_to_crlf_idempotent(self): + from tools.file_operations import _normalize_line_endings + + once = _normalize_line_endings("a\nb\n", "\r\n") + twice = _normalize_line_endings(once, "\r\n") + assert once == twice == "a\r\nb\r\n" diff --git a/tests/tools/test_patch_failure_tracking.py b/tests/tools/test_patch_failure_tracking.py new file mode 100644 index 00000000000..3bed0cf0123 --- /dev/null +++ b/tests/tools/test_patch_failure_tracking.py @@ -0,0 +1,222 @@ +"""Tests for per-file consecutive patch-failure tracking. + +When the agent repeatedly fails to patch the same file with similar but +non-matching old_strings, it's usually stuck in a loop with a stale view +of the file. After 3 consecutive failures on the same path, the patch +tool injects an escalating ``_hint`` that tells the model to break out +of the loop (re-read, use longer context, or fall back to write_file). + +See issue #507 (Roo Code deep-dive, item 2f). +""" + +import json + +import pytest + + +@pytest.fixture +def hermes_home(monkeypatch, tmp_path): + """Isolate HERMES_HOME and clear module-level caches afterward so the + real shell-out side effects from _handle_patch don't leak into + subsequent tests (see test_line_ending_preservation.py for details).""" + home = tmp_path / "hermes" + home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(home)) + yield home + try: + from tools.file_tools import clear_file_ops_cache, _read_tracker_lock, _read_tracker + clear_file_ops_cache() + with _read_tracker_lock: + _read_tracker.clear() + except Exception: + pass + try: + from tools.terminal_tool import _active_environments, _env_lock + with _env_lock: + _active_environments.clear() + except Exception: + pass + + +@pytest.fixture +def fresh_tracker(): + """Reset the module-level tracker before each test so the count starts + at zero regardless of prior test order.""" + from tools.file_tools import _patch_failure_tracker, _patch_failure_lock + + with _patch_failure_lock: + _patch_failure_tracker.clear() + yield + with _patch_failure_lock: + _patch_failure_tracker.clear() + + +class TestPatchFailureEscalation: + def test_first_two_failures_use_normal_hint(self, hermes_home, tmp_path, fresh_tracker): + from tools.file_tools import _handle_patch + + target = tmp_path / "f.py" + target.write_text("def foo():\n return 1\n") + + for _i in range(2): + result = _handle_patch( + { + "mode": "replace", + "path": str(target), + "old_string": f"NONEXISTENT_{_i}_XYZQQQ", + "new_string": "x", + }, + task_id="esc_t1", + ) + d = json.loads(result) + hint = d.get("_hint", "") or "" + assert "failure #" not in hint, ( + f"Escalating hint fired too early on attempt {_i + 1}: {hint!r}" + ) + + def test_third_consecutive_failure_escalates(self, hermes_home, tmp_path, fresh_tracker): + from tools.file_tools import _handle_patch + + target = tmp_path / "f.py" + target.write_text("def foo():\n return 1\n") + + last_hint = "" + for _i in range(3): + result = _handle_patch( + { + "mode": "replace", + "path": str(target), + "old_string": f"DOES_NOT_EXIST_{_i}_FOOFOOFOO", + "new_string": "x", + }, + task_id="esc_t2", + ) + d = json.loads(result) + last_hint = d.get("_hint", "") or "" + + assert "failure #3" in last_hint, repr(last_hint) + assert "Stop retrying" in last_hint + assert "write_file" in last_hint, ( + "Escalating hint should mention write_file fallback" + ) + + def test_success_clears_failure_counter(self, hermes_home, tmp_path, fresh_tracker): + from tools.file_tools import _handle_patch + + target = tmp_path / "f.py" + target.write_text("def foo():\n return 1\n") + + # Three failures: counter at 3. + for _i in range(3): + _handle_patch( + { + "mode": "replace", + "path": str(target), + "old_string": f"GHOST_{_i}_ABCABC", + "new_string": "x", + }, + task_id="esc_t3", + ) + + # Successful patch: clears the counter. + result = _handle_patch( + { + "mode": "replace", + "path": str(target), + "old_string": "return 1", + "new_string": "return 99", + }, + task_id="esc_t3", + ) + d = json.loads(result) + assert not d.get("error"), d + + # Next failure should be back to "attempt 1" — generic hint only. + result = _handle_patch( + { + "mode": "replace", + "path": str(target), + "old_string": "STILL_GHOST_XYZ", + "new_string": "x", + }, + task_id="esc_t3", + ) + d = json.loads(result) + hint = d.get("_hint", "") or "" + assert "failure #" not in hint, ( + f"Counter should have been reset after success: {hint!r}" + ) + + def test_different_paths_have_independent_counters( + self, hermes_home, tmp_path, fresh_tracker + ): + from tools.file_tools import _handle_patch + + a = tmp_path / "a.py" + a.write_text("x = 1\n") + b = tmp_path / "b.py" + b.write_text("y = 2\n") + + # Three failures on a.py. + for _i in range(3): + _handle_patch( + { + "mode": "replace", + "path": str(a), + "old_string": f"NONE_A_{_i}_ZZZ", + "new_string": "x", + }, + task_id="esc_t4", + ) + + # One failure on b.py — should NOT inherit a.py's count. + result = _handle_patch( + { + "mode": "replace", + "path": str(b), + "old_string": "NONE_B_ZZZ", + "new_string": "x", + }, + task_id="esc_t4", + ) + d = json.loads(result) + hint = d.get("_hint", "") or "" + assert "failure #" not in hint, ( + f"b.py's hint inherited a.py's count: {hint!r}" + ) + + def test_different_tasks_have_independent_counters( + self, hermes_home, tmp_path, fresh_tracker + ): + from tools.file_tools import _handle_patch + + target = tmp_path / "shared.py" + target.write_text("z = 0\n") + + # Three failures under task A. + for _i in range(3): + _handle_patch( + { + "mode": "replace", + "path": str(target), + "old_string": f"GHOST_A_{_i}_QWE", + "new_string": "x", + }, + task_id="task_A", + ) + + # First failure under task B — should NOT see escalation. + result = _handle_patch( + { + "mode": "replace", + "path": str(target), + "old_string": "GHOST_B_QWE", + "new_string": "x", + }, + task_id="task_B", + ) + d = json.loads(result) + hint = d.get("_hint", "") or "" + assert "failure #" not in hint, ( + f"task_B's hint cross-contaminated from task_A: {hint!r}" + ) diff --git a/tools/file_operations.py b/tools/file_operations.py index c25dc332cb0..72d9f06779f 100644 --- a/tools/file_operations.py +++ b/tools/file_operations.py @@ -74,6 +74,46 @@ def _strip_terminal_fence_leaks(text: str) -> str: return "".join(cleaned_lines) +def _detect_line_ending(sample: str) -> Optional[str]: + """Return the dominant line ending in ``sample`` or None if undetermined. + + Looks at the first few line breaks and picks ``\\r\\n`` if any are + present (Windows / DOS), otherwise ``\\n`` (Unix). Returns ``None`` + for empty / single-line content where we can't tell. Used to + preserve the file's original line endings across write_file and + patch operations — without this the agent's bare-LF tool args + silently normalize Windows-line-ending files, and patch produces + mixed endings when only a substituted region changes. + """ + if not sample: + return None + # Look at the first chunk — enough to tell, cheap to scan. + head = sample[:4096] + if "\r\n" in head: + return "\r\n" + if "\n" in head: + return "\n" + return None + + +def _normalize_line_endings(text: str, target: str) -> str: + """Convert all line endings in ``text`` to ``target`` (``\\n`` or ``\\r\\n``). + + Idempotent: ``_normalize_line_endings(_normalize_line_endings(x, "\\r\\n"), "\\r\\n") == _normalize_line_endings(x, "\\r\\n")``. + Strips lone ``\\r`` characters as well, so mixed-ending content is + homogenized in a single pass. + """ + # First collapse to LF (handle CRLF and lone CR), then expand if target + # is CRLF. Order matters: doing the replacements separately would + # double-convert a CRLF -> LFLF. + lf_normalized = text.replace("\r\n", "\n").replace("\r", "\n") + if target == "\n": + return lf_normalized + if target == "\r\n": + return lf_normalized.replace("\n", "\r\n") + return text + + def _get_safe_write_root() -> Optional[str]: """Return the resolved HERMES_WRITE_SAFE_ROOT path, or None if unset. @@ -697,7 +737,29 @@ class ShellFileOperations(FileOperations): """Escape a string for safe use in shell commands.""" # Use single quotes and escape any single quotes in the string return "'" + arg.replace("'", "'\"'\"'") + "'" - + + def _detect_file_line_ending(self, path: str, pre_content: Optional[str] = None) -> Optional[str]: + """Detect the dominant line ending of a file on disk. + + If ``pre_content`` is already available (we just read the file + for lint/LSP purposes), inspect that — zero extra exec calls. + Otherwise issue a tiny ``head -c 4096`` to sample the first 4KB. + + Returns ``"\\r\\n"`` for CRLF (Windows), ``"\\n"`` for LF (Unix), + or ``None`` if undetermined (new file, empty file, single-line + file with no line break in the first chunk). + """ + if pre_content: + return _detect_line_ending(pre_content) + # File may not exist (new write) — `head` exits 0 with empty + # stdout in that case which yields None below. Cheap probe. + head_cmd = f"head -c 4096 {self._escape_shell_arg(path)} 2>/dev/null" + head_result = self._exec(head_cmd) + if head_result.exit_code != 0 or not head_result.stdout: + return None + return _detect_line_ending(head_result.stdout) + + def _unified_diff(self, old_content: str, new_content: str, filename: str) -> str: """Generate unified diff between old and new content.""" old_lines = old_content.splitlines(keepends=True) @@ -975,6 +1037,17 @@ class ShellFileOperations(FileOperations): if read_result.exit_code == 0 and read_result.stdout: pre_content = read_result.stdout + # ── Line-ending preservation (Roo Code pattern) ────────────── + # If the file existed with CRLF endings and the agent's content + # has bare LFs, convert to CRLF before writing. Otherwise the + # write silently normalizes a Windows-line-ending file (and patch + # produces mixed endings when only a substituted region changes). + # Detect from a small head sample to avoid reading the full file + # for line-ending purposes alone. + original_ending = self._detect_file_line_ending(path, pre_content) + if original_ending == "\r\n": + content = _normalize_line_endings(content, "\r\n") + # Snapshot LSP diagnostics for this file (best-effort) so the # post-write LSP layer can return only diagnostics introduced # by this specific edit. Mirrors claude-code's @@ -1082,6 +1155,19 @@ class ShellFileOperations(FileOperations): except Exception: pass return PatchResult(error=err_msg) + + # ── Line-ending preservation ────────────────────────────────── + # Models nearly always send old_string/new_string with bare LF + # in tool args (JSON-encoded), but the file may have CRLF on + # disk. After fuzzy_find_and_replace, ``new_content`` is a + # mixed-ending string: the substituted region is LF, surrounding + # text keeps the file's CRLF. Normalize the whole thing to the + # file's detected line ending so the on-disk file is consistent + # and the unified diff below reflects the actual change. + file_ending = _detect_line_ending(content) + if file_ending: + new_content = _normalize_line_endings(new_content, file_ending) + # Write back write_result = self.write_file(path, new_content) if write_result.error: diff --git a/tools/file_tools.py b/tools/file_tools.py index ef0921c1c9b..c65c6ef9b4b 100644 --- a/tools/file_tools.py +++ b/tools/file_tools.py @@ -254,6 +254,43 @@ _file_ops_cache: dict = {} _read_tracker_lock = threading.Lock() _read_tracker: dict = {} +# Track consecutive patch failures per (task_id, resolved_path). Used to +# escalate the hint when the model repeatedly fails to patch the same file +# (typical cause: stale view of file contents, ambiguous old_string, or +# the file was modified externally between the agent's read and patch +# attempt). Reset on a successful patch to that path. +_patch_failure_lock = threading.Lock() +_patch_failure_tracker: dict = {} # {task_id: {resolved_path: count}} + + +def _record_patch_failure(task_id: str, resolved_path: str) -> int: + """Increment and return the consecutive-failure count for this path.""" + with _patch_failure_lock: + task_failures = _patch_failure_tracker.setdefault(task_id, {}) + # Cap dict size per task to avoid unbounded growth in long sessions + # where the agent fails on many distinct files. 64 distinct + # failing files per task is generous; older entries get evicted. + if len(task_failures) >= 64 and resolved_path not in task_failures: + try: + first_key = next(iter(task_failures)) + del task_failures[first_key] + except StopIteration: + pass + task_failures[resolved_path] = task_failures.get(resolved_path, 0) + 1 + return task_failures[resolved_path] + + +def _reset_patch_failures(task_id: str, resolved_paths: list) -> None: + """Clear consecutive-failure counts for the given paths.""" + if not resolved_paths: + return + with _patch_failure_lock: + task_failures = _patch_failure_tracker.get(task_id) + if not task_failures: + return + for rp in resolved_paths: + task_failures.pop(rp, None) + # Per-task bounds for the containers inside each _read_tracker[task_id]. # A CLI session uses one stable task_id for its lifetime; without these # caps, a 10k-read session would accumulate ~1.5MB of dict/set state that @@ -1020,12 +1057,43 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None, _r = _path_to_resolved.get(_p) if _r: file_state.note_write(task_id, _r) + # Successful patch: clear any prior consecutive-failure + # counters for the touched paths so a future failure on + # the same path starts the escalation cycle fresh. + _reset_patch_failures(task_id, [ + _r for _r in (_path_to_resolved.get(_p) for _p in _paths_to_check) if _r + ]) # Hint when old_string not found — saves iterations where the agent # retries with stale content instead of re-reading the file. # Suppressed when patch_replace already attached a rich "Did you mean?" # snippet (which is strictly more useful than the generic hint). if result_dict.get("error") and "Could not find" in str(result_dict["error"]): - if "Did you mean one of these sections?" not in str(result_dict["error"]): + # Track per-file consecutive failures for replace mode. The + # ``path`` arg only exists for replace mode; for V4A patches + # we'd need to walk the headers, but in practice V4A failures + # are far rarer and the existing _hint covers them adequately. + failure_count = 0 + if mode == "replace" and path: + resolved = _path_to_resolved.get(path) or path + failure_count = _record_patch_failure(task_id, resolved) + + if failure_count >= 3: + # Escalating hint after multiple consecutive failures on the + # same path. Most common cause is a stale view of the file — + # the model is retrying with the same old_string against + # content that has since changed. Surface the failure count + # so the model recognises it's in a loop and breaks out by + # re-reading or falling back to write_file. + result_dict["_hint"] = ( + f"This is failure #{failure_count} patching {path!r}. " + "Stop retrying with variations of the same old_string. " + "Either: (1) re-read the file fresh to verify current " + "content, (2) use a longer / more unique old_string with " + "surrounding context lines, or (3) use write_file to " + "replace the entire file if the targeted region is hard " + "to anchor." + ) + elif "Did you mean one of these sections?" not in str(result_dict["error"]): result_dict["_hint"] = ( "old_string not found. Use read_file to verify the current " "content, or search_files to locate the text." diff --git a/tools/fuzzy_match.py b/tools/fuzzy_match.py index 15cedd40e46..ef6248494a4 100644 --- a/tools/fuzzy_match.py +++ b/tools/fuzzy_match.py @@ -108,8 +108,15 @@ def fuzzy_find_and_replace(content: str, old_string: str, new_string: str, if drift_err: return content, 0, None, drift_err - # Perform replacement - new_content = _apply_replacements(content, matches, new_string) + # Perform replacement. When the matched strategy is NOT `exact`, + # the file's indentation may differ from what the LLM sent in + # old_string/new_string — e.g. LLM used 2-space indent but the + # file is 4-space. Shift new_string by the indentation delta so + # the replacement matches the file's actual indent pattern. + new_content = _apply_replacements( + content, matches, new_string, + old_string=old_string if strategy_name != "exact" else None, + ) return new_content, len(matches), strategy_name, None # No strategy found a match @@ -156,26 +163,119 @@ def _detect_escape_drift(content: str, matches: List[Tuple[int, int]], return None -def _apply_replacements(content: str, matches: List[Tuple[int, int]], new_string: str) -> str: +def _leading_whitespace(line: str) -> str: + """Return the leading whitespace prefix of a line (spaces/tabs).""" + i = 0 + while i < len(line) and line[i] in (" ", "\t"): + i += 1 + return line[:i] + + +def _first_meaningful_line(text: str) -> Optional[str]: + """Return the first line of ``text`` that has any non-whitespace content. + + Returns ``None`` if no such line exists (text is empty or all whitespace). + """ + for line in text.split("\n"): + if line.strip(): + return line + return None + + +def _reindent_replacement(file_region: str, old_string: str, new_string: str) -> str: + """Adjust ``new_string`` so its indentation matches ``file_region``. + + Used after a non-exact fuzzy match: the LLM may have sent old_string and + new_string with a different indent than the file actually has (e.g. + 2-space indent in tool args vs 4-space indent on disk). The fuzzy + strategy successfully matched anyway, but writing ``new_string`` verbatim + would corrupt the file's indentation. + + Approach: + + 1. For each non-blank line in ``new_string``, compute its indent + *relative* to the shallowest non-blank line of ``old_string`` (the + LLM's base indent). + 2. Anchor that relative indent onto the file's actual base indent (the + leading whitespace of the file_region's first non-blank line). + 3. Re-emit each non-blank line as ``file_base + (line_indent - llm_base)``. + + Blank lines and lines less-indented than the LLM's base are anchored + directly to the file's base indent. + + No-op cases (returns ``new_string`` unchanged): + - file_region or old_string has no meaningful line + - LLM base indent equals file base indent + - new_string is empty + """ + if not new_string: + return new_string + + old_first = _first_meaningful_line(old_string) + file_first = _first_meaningful_line(file_region) + if old_first is None or file_first is None: + return new_string + + old_indent = _leading_whitespace(old_first) + file_indent = _leading_whitespace(file_first) + + if old_indent == file_indent: + return new_string + + # Re-indent each line of new_string. Strategy: replace the LLM's base + # indent prefix with the file's base indent prefix, preserving any + # additional indent the LLM added on top. This is the same approach + # Roo Code uses (multi-search-replace.ts:466-500). It preserves the + # LLM's intended *relative* nesting between lines while anchoring to + # the file's actual indent style. + out_lines: List[str] = [] + for line in new_string.split("\n"): + if not line.strip(): + # Blank lines: leave whitespace untouched. + out_lines.append(line) + continue + line_indent = _leading_whitespace(line) + if line_indent.startswith(old_indent): + # Common case: line has the LLM's base indent (possibly plus + # extra). Swap base prefix for the file's base prefix. + remainder = line[len(old_indent):] + out_lines.append(file_indent + remainder) + else: + # Line is less-indented than the LLM's base — e.g. a dedent at + # the start of new_string. Anchor to the file's base. + out_lines.append(file_indent + line.lstrip(" \t")) + return "\n".join(out_lines) + + +def _apply_replacements(content: str, matches: List[Tuple[int, int]], + new_string: str, old_string: Optional[str] = None) -> str: """ Apply replacements at the given positions. - + Args: content: Original content matches: List of (start, end) positions to replace new_string: Replacement text - + old_string: When non-None, signals that the match came from a + non-exact fuzzy strategy; ``new_string`` is re-indented to + match the file's actual indentation before substitution. + Returns: Content with replacements applied """ # Sort matches by position (descending) to replace from end to start # This preserves positions of earlier matches sorted_matches = sorted(matches, key=lambda x: x[0], reverse=True) - + result = content for start, end in sorted_matches: - result = result[:start] + new_string + result[end:] - + if old_string is not None: + file_region = content[start:end] + adjusted = _reindent_replacement(file_region, old_string, new_string) + else: + adjusted = new_string + result = result[:start] + adjusted + result[end:] + return result