feat(patch): indentation preservation, CRLF preservation, per-file failure escalation (#507) (#32273)

Three granular patch-tool refinements from the Roo Code deep-dive (#507).

## Indentation preservation (fuzzy_match.py)

When fuzzy_find_and_replace matches via a non-exact strategy, the file's
indentation may differ from what the LLM sent in old_string/new_string
(common case: model sends zero-indent old/new for a method body that
lives inside an 8-space-indented class). Before this commit the
replacement was spliced in verbatim, producing a file with a broken
indent level that may still parse but is logically wrong.

The fix computes the indent delta between old_string's first meaningful
line and the matched region's first meaningful line, then re-indents
every line of new_string by that delta. Exact-strategy matches are
untouched (passthrough). Same approach as Roo Code's
multi-search-replace.ts:466-500.

## CRLF preservation (file_operations.py)

Models nearly always send tool args with bare LF endings (JSON-encoded),
but the file on disk may have CRLF (Windows-line-ending configs, .bat,
.cmd, .ini files). Before this commit:

- write_file silently normalized CRLF to LF on every overwrite
- patch produced mixed-ending files: the substituted region had LF,
  the surrounding context kept CRLF

The fix detects the file's existing line endings (via pre_content if
already read for lint/LSP, otherwise a tiny head -c 4096 probe), and
normalizes the entire write to that ending. New files are written
verbatim (no detection possible).

## Per-file failure escalation (file_tools.py)

When the agent fails to patch the same file 3+ times in a row, the
existing 'old_string not found' hint isn't strong enough — the model
keeps retrying with variations against a stale view of the file.

The fix tracks consecutive failures per (task_id, resolved_path) and
injects an escalating hint after 3 failures: 'This is failure #N
patching X. Stop retrying. Either re-read fresh, use longer context,
or fall back to write_file.' Counter resets on a successful patch to
the same path.

## Validation

- 22 new tests across tests/tools/test_fuzzy_match.py (5),
  test_line_ending_preservation.py (12), test_patch_failure_tracking.py (5)
- All existing tests pass (165/165 in the touched files)
- E2E verified with real _handle_patch / _handle_write_file calls
  against real CRLF files and real failure loops

Closes part of #507. The remaining open items in #507 (2b start_line
hint, behavioral rules) were declined after audit:
- 2b adds schema bloat for a problem the existing 'multiple matches'
  contract already handles
- Behavioral rules conflict with the personality system

Items 1, 2d, 2e, 3, 4 of #507 were already landed in earlier work.
This commit is contained in:
Teknium 2026-05-25 15:18:45 -07:00 committed by GitHub
parent c2aa235328
commit 6bd0be30be
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 824 additions and 10 deletions

View file

@ -52,6 +52,106 @@ class TestIndentDifference:
assert "bar" in new
class TestIndentationPreservation:
"""When a non-exact strategy matches, ``new_string`` should be re-indented
so it lands at the file's actual indent depth — not at whatever indent the
LLM happened to send in the tool args. Without this fix the file gets a
silently-broken indent level that may even still parse but is logically
wrong."""
def test_unindented_input_reindented_to_match_file(self):
# File: 8-space-indented method body inside a class.
content = (
"class Calculator:\n"
" def add(self, a, b):\n"
" result = a + b\n"
" return result\n"
)
# LLM sends zero-indent old/new — common bug from frontier models
# that "remember" code instead of reading it.
old = "result = a + b\nreturn result"
new = "result = a + b\nresult *= 2\nreturn result"
out, count, strategy, err = fuzzy_find_and_replace(content, old, new)
assert err is None and count == 1
assert strategy != "exact" # must have gone through a fuzzy strategy
# Every replaced line should be at 8-space indent.
for marker in ("result = a + b", "result *= 2", "return result"):
line = next(line for line in out.split("\n") if marker in line)
indent = len(line) - len(line.lstrip())
assert indent == 8, f"Expected 8-space indent for {marker!r}, got {indent}: {line!r}"
# Resulting file must still be valid Python.
import ast
ast.parse(out)
def test_dedent_at_start_anchors_to_file_base(self):
# File: 2-space-indented function body. LLM sends zero-indent
# old/new where new_string contains a dedent (the new structure
# adds a top-level class wrapper). After re-indent, every line
# of new_string should be anchored to the file's 2-space base.
content = " return 1\n return 2\n"
old = "return 1\nreturn 2" # zero-indent — forces line_trimmed
new = "class X:\n return 99\n return 100"
out, count, strategy, err = fuzzy_find_and_replace(content, old, new)
assert err is None and count == 1
assert strategy != "exact"
lines = out.split("\n")
# 'class X:' anchored to file's 2-space base.
assert lines[0] == " class X:", repr(lines[0])
# Indented body lines lift to 4-space (file base + LLM's +2).
assert lines[1] == " return 99", repr(lines[1])
assert lines[2] == " return 100", repr(lines[2])
def test_exact_match_no_reindent(self):
# Exact strategy should be a pure passthrough — no shift logic
# should touch the result.
content = " def foo():\n return 1\n"
old = " def foo():\n return 1"
new = " def foo():\n return 2"
out, count, strategy, err = fuzzy_find_and_replace(content, old, new)
assert err is None and strategy == "exact"
assert out == " def foo():\n return 2\n"
def test_llm_zero_indent_shifts_to_file_two_space(self):
# LLM sent zero-indent old/new; file has 2-space indent. The
# re-indent shifts the whole replacement so 'def x()' lands at
# 2-space and the body keeps its relative +2 from new_string.
content = " def x():\n return 1\n"
old = "def x():\n return 1"
new = "def x():\n return 99"
out, count, _, err = fuzzy_find_and_replace(content, old, new)
assert err is None and count == 1
lines = out.strip("\n").split("\n")
assert lines[0] == " def x():"
assert lines[1] == " return 99"
def test_indent_already_matches_passthrough(self):
# When old_string's base indent already equals file_region's base
# indent, _reindent_replacement returns new_string unchanged.
# Verify with whitespace_normalized strategy (collapsed spaces).
content = " def x( ):\n return 1\n"
old = " def x():\n return 1" # same base indent (2), different inner whitespace
new = " def x():\n return 42"
out, count, strategy, err = fuzzy_find_and_replace(content, old, new)
assert err is None and count == 1
assert strategy != "exact" # non-exact strategy matched
# Body retains its 4-space indent (passthrough — no shift).
assert " return 42" in out
def test_blank_lines_left_alone(self):
# Blank lines in new_string should keep whatever whitespace they
# had — we never strip or pad them.
content = " a = 1\n b = 2\n"
old = "a = 1\nb = 2"
new = "a = 1\n\nb = 99"
out, count, _, err = fuzzy_find_and_replace(content, old, new)
assert err is None and count == 1
# blank line is preserved (empty), indented lines anchored.
lines = out.split("\n")
assert lines[0] == " a = 1"
assert lines[1] == ""
assert lines[2] == " b = 99"
class TestReplaceAll:
def test_multiple_matches_without_flag_errors(self):
content = "aaa bbb aaa"

View file

@ -0,0 +1,238 @@
"""Tests for CRLF line-ending preservation in write_file and patch.
Without this, the agent silently normalizes Windows-line-ending files
to LF whenever it edits them and patch produces a mixed-ending file
when only a substituted region changes (the rest of the file keeps its
CRLF endings while the replacement is LF-only).
See issue #507 (Roo Code deep-dive, item 2c).
"""
import json
import os
import tempfile
import pytest
@pytest.fixture
def hermes_home(monkeypatch, tmp_path):
"""Isolate HERMES_HOME so the tests don't pollute the real config.
Also clears module-level caches (file_ops, active_environments,
file-staleness state) after the test so subsequent tests in the
same pytest process aren't affected by our shell-out side effects
(real file_ops and terminal environments get created under
task_id='default' via _resolve_container_task_id).
"""
home = tmp_path / "hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
yield home
# Cleanup: drop the cached file_ops and active environment so the
# next test sees a fresh state. Without this, _get_live_tracking_cwd
# returns the stale cwd from this test's ops and breaks tests like
# test_resolve_path that rely on TERMINAL_CWD env var.
try:
from tools.file_tools import clear_file_ops_cache, _read_tracker_lock, _read_tracker
clear_file_ops_cache()
with _read_tracker_lock:
_read_tracker.clear()
except Exception:
pass
try:
from tools.terminal_tool import _active_environments, _env_lock
with _env_lock:
_active_environments.clear()
except Exception:
pass
def _crlf_count(b: bytes) -> int:
return b.count(b"\r\n")
def _bare_lf_count(b: bytes) -> int:
return b.count(b"\n") - b.count(b"\r\n")
class TestPatchCRLFPreservation:
def test_patch_on_crlf_file_stays_pure_crlf(self, hermes_home, tmp_path):
"""LLM sends LF old/new; file has CRLF. Result must be all CRLF,
no mixed endings."""
from tools.file_tools import _handle_patch
target = tmp_path / "config.ini"
target.write_bytes(b"[a]\r\nkey=1\r\n\r\n[b]\r\nkey=2\r\n")
result = _handle_patch(
{
"mode": "replace",
"path": str(target),
"old_string": "key=1",
"new_string": "key=99",
},
task_id="crlf_patch_1",
)
d = json.loads(result)
assert not d.get("error"), d
raw = target.read_bytes()
assert _bare_lf_count(raw) == 0, (
f"Mixed line endings after patch: {raw!r}"
)
# Same number of line breaks as before; just the value swapped.
assert _crlf_count(raw) == 5
assert b"key=99\r\n" in raw
def test_patch_on_lf_file_stays_lf(self, hermes_home, tmp_path):
"""LF file with LF new_string stays LF — no spurious CRLF added."""
from tools.file_tools import _handle_patch
target = tmp_path / "config.ini"
target.write_bytes(b"[a]\nkey=1\n\n[b]\nkey=2\n")
result = _handle_patch(
{
"mode": "replace",
"path": str(target),
"old_string": "key=1",
"new_string": "key=99",
},
task_id="crlf_patch_2",
)
d = json.loads(result)
assert not d.get("error"), d
raw = target.read_bytes()
assert _crlf_count(raw) == 0, (
f"Spurious CRLF added to LF file: {raw!r}"
)
def test_patch_multiline_replacement_on_crlf(self, hermes_home, tmp_path):
"""Multi-line new_string with bare LFs should be CRLF-converted
before write."""
from tools.file_tools import _handle_patch
target = tmp_path / "f.py"
target.write_bytes(b"def foo():\r\n return 1\r\n")
result = _handle_patch(
{
"mode": "replace",
"path": str(target),
"old_string": "def foo():\n return 1",
"new_string": "def foo():\n x = 1\n return x",
},
task_id="crlf_patch_3",
)
d = json.loads(result)
assert not d.get("error"), d
raw = target.read_bytes()
assert _bare_lf_count(raw) == 0, (
f"Mixed endings after multi-line patch: {raw!r}"
)
assert raw == b"def foo():\r\n x = 1\r\n return x\r\n"
class TestWriteFileCRLFPreservation:
def test_overwrite_crlf_file_with_lf_content_preserves_crlf(
self, hermes_home, tmp_path
):
"""The agent typically sends bare-LF content; if the file existed
with CRLF, the write should convert to CRLF rather than silently
flipping the endings."""
from tools.file_tools import _handle_write_file
target = tmp_path / "config.bat"
target.write_bytes(b"@echo off\r\nset X=1\r\n")
result = _handle_write_file(
{
"path": str(target),
"content": "@echo off\nset X=99\nset Y=42\n",
},
task_id="crlf_write_1",
)
d = json.loads(result)
assert "error" not in d, d
raw = target.read_bytes()
assert _bare_lf_count(raw) == 0, (
f"CRLF file got normalized to LF: {raw!r}"
)
assert _crlf_count(raw) == 3
def test_new_file_written_as_is(self, hermes_home, tmp_path):
"""No pre-existing file → write content verbatim (LF by default)."""
from tools.file_tools import _handle_write_file
target = tmp_path / "new.txt"
result = _handle_write_file(
{"path": str(target), "content": "a\nb\nc\n"},
task_id="crlf_write_2",
)
d = json.loads(result)
assert "error" not in d, d
assert target.read_bytes() == b"a\nb\nc\n"
def test_overwrite_lf_file_stays_lf(self, hermes_home, tmp_path):
"""Pre-existing LF file should not get spurious CRLFs."""
from tools.file_tools import _handle_write_file
target = tmp_path / "lf.txt"
target.write_bytes(b"line1\nline2\n")
result = _handle_write_file(
{"path": str(target), "content": "X\nY\nZ\n"},
task_id="crlf_write_3",
)
d = json.loads(result)
assert "error" not in d, d
raw = target.read_bytes()
assert _crlf_count(raw) == 0
assert raw == b"X\nY\nZ\n"
class TestLineEndingHelpers:
"""Direct unit tests for the pure helpers — easier to debug than the
integration tests above."""
def test_detect_crlf(self):
from tools.file_operations import _detect_line_ending
assert _detect_line_ending("a\r\nb\r\n") == "\r\n"
def test_detect_lf(self):
from tools.file_operations import _detect_line_ending
assert _detect_line_ending("a\nb\n") == "\n"
def test_detect_empty(self):
from tools.file_operations import _detect_line_ending
assert _detect_line_ending("") is None
assert _detect_line_ending("no newline here") is None
def test_detect_mixed_picks_crlf(self):
"""Mixed-ending content (any CRLF in the head) returns CRLF —
we prefer to normalize TO CRLF rather than away from it, since
a single CRLF in the file is usually a Windows-origin marker."""
from tools.file_operations import _detect_line_ending
assert _detect_line_ending("a\nb\r\nc\n") == "\r\n"
def test_normalize_to_lf_strips_cr(self):
from tools.file_operations import _normalize_line_endings
assert _normalize_line_endings("a\r\nb\rc\n", "\n") == "a\nb\nc\n"
def test_normalize_to_crlf_idempotent(self):
from tools.file_operations import _normalize_line_endings
once = _normalize_line_endings("a\nb\n", "\r\n")
twice = _normalize_line_endings(once, "\r\n")
assert once == twice == "a\r\nb\r\n"

View file

@ -0,0 +1,222 @@
"""Tests for per-file consecutive patch-failure tracking.
When the agent repeatedly fails to patch the same file with similar but
non-matching old_strings, it's usually stuck in a loop with a stale view
of the file. After 3 consecutive failures on the same path, the patch
tool injects an escalating ``_hint`` that tells the model to break out
of the loop (re-read, use longer context, or fall back to write_file).
See issue #507 (Roo Code deep-dive, item 2f).
"""
import json
import pytest
@pytest.fixture
def hermes_home(monkeypatch, tmp_path):
"""Isolate HERMES_HOME and clear module-level caches afterward so the
real shell-out side effects from _handle_patch don't leak into
subsequent tests (see test_line_ending_preservation.py for details)."""
home = tmp_path / "hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
yield home
try:
from tools.file_tools import clear_file_ops_cache, _read_tracker_lock, _read_tracker
clear_file_ops_cache()
with _read_tracker_lock:
_read_tracker.clear()
except Exception:
pass
try:
from tools.terminal_tool import _active_environments, _env_lock
with _env_lock:
_active_environments.clear()
except Exception:
pass
@pytest.fixture
def fresh_tracker():
"""Reset the module-level tracker before each test so the count starts
at zero regardless of prior test order."""
from tools.file_tools import _patch_failure_tracker, _patch_failure_lock
with _patch_failure_lock:
_patch_failure_tracker.clear()
yield
with _patch_failure_lock:
_patch_failure_tracker.clear()
class TestPatchFailureEscalation:
def test_first_two_failures_use_normal_hint(self, hermes_home, tmp_path, fresh_tracker):
from tools.file_tools import _handle_patch
target = tmp_path / "f.py"
target.write_text("def foo():\n return 1\n")
for _i in range(2):
result = _handle_patch(
{
"mode": "replace",
"path": str(target),
"old_string": f"NONEXISTENT_{_i}_XYZQQQ",
"new_string": "x",
},
task_id="esc_t1",
)
d = json.loads(result)
hint = d.get("_hint", "") or ""
assert "failure #" not in hint, (
f"Escalating hint fired too early on attempt {_i + 1}: {hint!r}"
)
def test_third_consecutive_failure_escalates(self, hermes_home, tmp_path, fresh_tracker):
from tools.file_tools import _handle_patch
target = tmp_path / "f.py"
target.write_text("def foo():\n return 1\n")
last_hint = ""
for _i in range(3):
result = _handle_patch(
{
"mode": "replace",
"path": str(target),
"old_string": f"DOES_NOT_EXIST_{_i}_FOOFOOFOO",
"new_string": "x",
},
task_id="esc_t2",
)
d = json.loads(result)
last_hint = d.get("_hint", "") or ""
assert "failure #3" in last_hint, repr(last_hint)
assert "Stop retrying" in last_hint
assert "write_file" in last_hint, (
"Escalating hint should mention write_file fallback"
)
def test_success_clears_failure_counter(self, hermes_home, tmp_path, fresh_tracker):
from tools.file_tools import _handle_patch
target = tmp_path / "f.py"
target.write_text("def foo():\n return 1\n")
# Three failures: counter at 3.
for _i in range(3):
_handle_patch(
{
"mode": "replace",
"path": str(target),
"old_string": f"GHOST_{_i}_ABCABC",
"new_string": "x",
},
task_id="esc_t3",
)
# Successful patch: clears the counter.
result = _handle_patch(
{
"mode": "replace",
"path": str(target),
"old_string": "return 1",
"new_string": "return 99",
},
task_id="esc_t3",
)
d = json.loads(result)
assert not d.get("error"), d
# Next failure should be back to "attempt 1" — generic hint only.
result = _handle_patch(
{
"mode": "replace",
"path": str(target),
"old_string": "STILL_GHOST_XYZ",
"new_string": "x",
},
task_id="esc_t3",
)
d = json.loads(result)
hint = d.get("_hint", "") or ""
assert "failure #" not in hint, (
f"Counter should have been reset after success: {hint!r}"
)
def test_different_paths_have_independent_counters(
self, hermes_home, tmp_path, fresh_tracker
):
from tools.file_tools import _handle_patch
a = tmp_path / "a.py"
a.write_text("x = 1\n")
b = tmp_path / "b.py"
b.write_text("y = 2\n")
# Three failures on a.py.
for _i in range(3):
_handle_patch(
{
"mode": "replace",
"path": str(a),
"old_string": f"NONE_A_{_i}_ZZZ",
"new_string": "x",
},
task_id="esc_t4",
)
# One failure on b.py — should NOT inherit a.py's count.
result = _handle_patch(
{
"mode": "replace",
"path": str(b),
"old_string": "NONE_B_ZZZ",
"new_string": "x",
},
task_id="esc_t4",
)
d = json.loads(result)
hint = d.get("_hint", "") or ""
assert "failure #" not in hint, (
f"b.py's hint inherited a.py's count: {hint!r}"
)
def test_different_tasks_have_independent_counters(
self, hermes_home, tmp_path, fresh_tracker
):
from tools.file_tools import _handle_patch
target = tmp_path / "shared.py"
target.write_text("z = 0\n")
# Three failures under task A.
for _i in range(3):
_handle_patch(
{
"mode": "replace",
"path": str(target),
"old_string": f"GHOST_A_{_i}_QWE",
"new_string": "x",
},
task_id="task_A",
)
# First failure under task B — should NOT see escalation.
result = _handle_patch(
{
"mode": "replace",
"path": str(target),
"old_string": "GHOST_B_QWE",
"new_string": "x",
},
task_id="task_B",
)
d = json.loads(result)
hint = d.get("_hint", "") or ""
assert "failure #" not in hint, (
f"task_B's hint cross-contaminated from task_A: {hint!r}"
)