fix(patch): harden V4A patch parser and fuzzy match — 9 correctness bugs

- Bug 1: replace read_file(limit=10000) with read_file_raw in _apply_update,
  preventing silent truncation of files >2000 lines and corruption of lines
  >2000 chars; add read_file_raw to FileOperations abstract interface and
  ShellFileOperations

- Bug 2: split apply_v4a_operations into validate-then-apply phases; if any
  hunk fails validation, zero writes occur (was: continue after failure,
  leaving filesystem partially modified)

- Bug 3: parse_v4a_patch now returns an error for begin-marker-with-no-ops,
  empty file paths, and moves missing a destination (was: always returned
  error=None)

- Bug 4: raise strategy 7 (block anchor) single-candidate similarity threshold
  from 0.10 to 0.50, eliminating false-positive matches in repetitive code

- Bug 5: add _strategy_unicode_normalized (new strategy 7) with position
  mapping via _build_orig_to_norm_map; smart quotes and em-dashes in
  LLM-generated patches now match via strategies 1-6 before falling through
  to fuzzy strategies

- Bug 6: extend fuzzy_find_and_replace to return 4-tuple (content, count,
  error, strategy); update all 5 call sites across patch_parser.py,
  file_operations.py, and skill_manager_tool.py

- Bug 7: guard in _apply_update returns error when addition-only context hint
  is ambiguous (>1 occurrences); validation phase errors on both 0 and >1

- Bug 8: _apply_delete returns error (not silent success) on missing file

- Bug 9: _validate_operations checks source existence and destination absence
  for MOVE operations before any write occurs
This commit is contained in:
KUSH42 2026-04-10 00:11:07 +02:00 committed by Teknium
parent 475cbce775
commit 0e939af7c2
7 changed files with 761 additions and 119 deletions

View file

@ -333,3 +333,25 @@ class TestShellFileOpsWriteDenied:
result = file_ops.patch_replace("~/.ssh/authorized_keys", "old", "new") result = file_ops.patch_replace("~/.ssh/authorized_keys", "old", "new")
assert result.error is not None assert result.error is not None
assert "denied" in result.error.lower() assert "denied" in result.error.lower()
def test_delete_file_denied_path(self, file_ops):
result = file_ops.delete_file("~/.ssh/authorized_keys")
assert result.error is not None
assert "denied" in result.error.lower()
def test_move_file_src_denied(self, file_ops):
result = file_ops.move_file("~/.ssh/id_rsa", "/tmp/dest.txt")
assert result.error is not None
assert "denied" in result.error.lower()
def test_move_file_dst_denied(self, file_ops):
result = file_ops.move_file("/tmp/src.txt", "~/.aws/credentials")
assert result.error is not None
assert "denied" in result.error.lower()
def test_move_file_failure_path(self, mock_env):
mock_env.execute.return_value = {"output": "No such file or directory", "returncode": 1}
ops = ShellFileOperations(mock_env)
result = ops.move_file("/tmp/nonexistent.txt", "/tmp/dest.txt")
assert result.error is not None
assert "Failed to move" in result.error

View file

@ -6,31 +6,31 @@ from tools.fuzzy_match import fuzzy_find_and_replace
class TestExactMatch: class TestExactMatch:
def test_single_replacement(self): def test_single_replacement(self):
content = "hello world" content = "hello world"
new, count, err = fuzzy_find_and_replace(content, "hello", "hi") new, count, _, err = fuzzy_find_and_replace(content, "hello", "hi")
assert err is None assert err is None
assert count == 1 assert count == 1
assert new == "hi world" assert new == "hi world"
def test_no_match(self): def test_no_match(self):
content = "hello world" content = "hello world"
new, count, err = fuzzy_find_and_replace(content, "xyz", "abc") new, count, _, err = fuzzy_find_and_replace(content, "xyz", "abc")
assert count == 0 assert count == 0
assert err is not None assert err is not None
assert new == content assert new == content
def test_empty_old_string(self): def test_empty_old_string(self):
new, count, err = fuzzy_find_and_replace("abc", "", "x") new, count, _, err = fuzzy_find_and_replace("abc", "", "x")
assert count == 0 assert count == 0
assert err is not None assert err is not None
def test_identical_strings(self): def test_identical_strings(self):
new, count, err = fuzzy_find_and_replace("abc", "abc", "abc") new, count, _, err = fuzzy_find_and_replace("abc", "abc", "abc")
assert count == 0 assert count == 0
assert "identical" in err assert "identical" in err
def test_multiline_exact(self): def test_multiline_exact(self):
content = "line1\nline2\nline3" content = "line1\nline2\nline3"
new, count, err = fuzzy_find_and_replace(content, "line1\nline2", "replaced") new, count, _, err = fuzzy_find_and_replace(content, "line1\nline2", "replaced")
assert err is None assert err is None
assert count == 1 assert count == 1
assert new == "replaced\nline3" assert new == "replaced\nline3"
@ -39,7 +39,7 @@ class TestExactMatch:
class TestWhitespaceDifference: class TestWhitespaceDifference:
def test_extra_spaces_match(self): def test_extra_spaces_match(self):
content = "def foo( x, y ):" content = "def foo( x, y ):"
new, count, err = fuzzy_find_and_replace(content, "def foo( x, y ):", "def bar(x, y):") new, count, _, err = fuzzy_find_and_replace(content, "def foo( x, y ):", "def bar(x, y):")
assert count == 1 assert count == 1
assert "bar" in new assert "bar" in new
@ -47,7 +47,7 @@ class TestWhitespaceDifference:
class TestIndentDifference: class TestIndentDifference:
def test_different_indentation(self): def test_different_indentation(self):
content = " def foo():\n pass" content = " def foo():\n pass"
new, count, err = fuzzy_find_and_replace(content, "def foo():\n pass", "def bar():\n return 1") new, count, _, err = fuzzy_find_and_replace(content, "def foo():\n pass", "def bar():\n return 1")
assert count == 1 assert count == 1
assert "bar" in new assert "bar" in new
@ -55,13 +55,96 @@ class TestIndentDifference:
class TestReplaceAll: class TestReplaceAll:
def test_multiple_matches_without_flag_errors(self): def test_multiple_matches_without_flag_errors(self):
content = "aaa bbb aaa" content = "aaa bbb aaa"
new, count, err = fuzzy_find_and_replace(content, "aaa", "ccc", replace_all=False) new, count, _, err = fuzzy_find_and_replace(content, "aaa", "ccc", replace_all=False)
assert count == 0 assert count == 0
assert "Found 2 matches" in err assert "Found 2 matches" in err
def test_multiple_matches_with_flag(self): def test_multiple_matches_with_flag(self):
content = "aaa bbb aaa" content = "aaa bbb aaa"
new, count, err = fuzzy_find_and_replace(content, "aaa", "ccc", replace_all=True) new, count, _, err = fuzzy_find_and_replace(content, "aaa", "ccc", replace_all=True)
assert err is None assert err is None
assert count == 2 assert count == 2
assert new == "ccc bbb ccc" assert new == "ccc bbb ccc"
class TestUnicodeNormalized:
"""Tests for the unicode_normalized strategy (Bug 5)."""
def test_em_dash_matched(self):
"""Em-dash in content should match ASCII '--' in pattern."""
content = "return value\u2014fallback"
new, count, strategy, err = fuzzy_find_and_replace(
content, "return value--fallback", "return value or fallback"
)
assert count == 1, f"Expected match via unicode_normalized, got err={err}"
assert strategy == "unicode_normalized"
assert "return value or fallback" in new
def test_smart_quotes_matched(self):
"""Smart double quotes in content should match straight quotes in pattern."""
content = 'print(\u201chello\u201d)'
new, count, strategy, err = fuzzy_find_and_replace(
content, 'print("hello")', 'print("world")'
)
assert count == 1, f"Expected match via unicode_normalized, got err={err}"
assert "world" in new
def test_no_unicode_skips_strategy(self):
"""When content and pattern have no Unicode variants, strategy is skipped."""
content = "hello world"
# Should match via exact, not unicode_normalized
new, count, strategy, err = fuzzy_find_and_replace(content, "hello", "hi")
assert count == 1
assert strategy == "exact"
class TestBlockAnchorThreshold:
"""Tests for the raised block_anchor threshold (Bug 4)."""
def test_high_similarity_matches(self):
"""A block with >50% middle similarity should match."""
content = "def foo():\n x = 1\n y = 2\n return x + y\n"
pattern = "def foo():\n x = 1\n y = 9\n return x + y"
new, count, strategy, err = fuzzy_find_and_replace(content, pattern, "def foo():\n return 0\n")
# Should match via block_anchor or earlier strategy
assert count == 1
def test_completely_different_middle_does_not_match(self):
"""A block where only first+last lines match but middle is completely different
should NOT match under the raised 0.50 threshold."""
content = (
"class Foo:\n"
" completely = 'unrelated'\n"
" content = 'here'\n"
" nothing = 'in common'\n"
" pass\n"
)
# Pattern has same first/last lines but completely different middle
pattern = (
"class Foo:\n"
" x = 1\n"
" y = 2\n"
" z = 3\n"
" pass"
)
new, count, strategy, err = fuzzy_find_and_replace(content, pattern, "replaced")
# With threshold=0.50, this near-zero-similarity middle should not match
assert count == 0, (
f"Block with unrelated middle should not match under threshold=0.50, "
f"but matched via strategy={strategy}"
)
class TestStrategyNameSurfaced:
"""Tests for the strategy name in the 4-tuple return (Bug 6)."""
def test_exact_strategy_name(self):
new, count, strategy, err = fuzzy_find_and_replace("hello", "hello", "world")
assert strategy == "exact"
assert count == 1
def test_failed_match_returns_none_strategy(self):
new, count, strategy, err = fuzzy_find_and_replace("hello", "xyz", "world")
assert count == 0
assert strategy is None
assert err is not None

View file

@ -159,7 +159,7 @@ class TestApplyUpdate:
def __init__(self): def __init__(self):
self.written = None self.written = None
def read_file(self, path, offset=1, limit=500): def read_file_raw(self, path):
return SimpleNamespace( return SimpleNamespace(
content=( content=(
'def run():\n' 'def run():\n'
@ -211,7 +211,7 @@ class TestAdditionOnlyHunks:
# Apply to a file that contains the context hint # Apply to a file that contains the context hint
class FakeFileOps: class FakeFileOps:
written = None written = None
def read_file(self, path, **kw): def read_file_raw(self, path):
return SimpleNamespace( return SimpleNamespace(
content="def main():\n pass\n", content="def main():\n pass\n",
error=None, error=None,
@ -239,7 +239,7 @@ class TestAdditionOnlyHunks:
class FakeFileOps: class FakeFileOps:
written = None written = None
def read_file(self, path, **kw): def read_file_raw(self, path):
return SimpleNamespace( return SimpleNamespace(
content="existing = True\n", content="existing = True\n",
error=None, error=None,
@ -253,3 +253,259 @@ class TestAdditionOnlyHunks:
assert result.success is True assert result.success is True
assert file_ops.written.endswith("def new_func():\n return True\n") assert file_ops.written.endswith("def new_func():\n return True\n")
assert "existing = True" in file_ops.written assert "existing = True" in file_ops.written
class TestReadFileRaw:
"""Bug 1 regression tests — files > 2000 lines and lines > 2000 chars."""
def test_apply_update_file_over_2000_lines(self):
"""A hunk targeting line 2200 must not truncate the file to 2000 lines."""
patch = """\
*** Begin Patch
*** Update File: big.py
@@ marker_at_2200 @@
line_2200
-old_value
+new_value
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
# Build a 2500-line file; the hunk targets a region at line 2200
lines = [f"line_{i}" for i in range(1, 2501)]
lines[2199] = "line_2200" # index 2199 = line 2200
lines[2200] = "old_value"
file_content = "\n".join(lines)
class FakeFileOps:
written = None
def read_file_raw(self, path):
return SimpleNamespace(content=file_content, error=None)
def write_file(self, path, content):
self.written = content
return SimpleNamespace(error=None)
file_ops = FakeFileOps()
result = apply_v4a_operations(ops, file_ops)
assert result.success is True
written_lines = file_ops.written.split("\n")
assert len(written_lines) == 2500, (
f"Expected 2500 lines, got {len(written_lines)}"
)
assert "new_value" in file_ops.written
assert "old_value" not in file_ops.written
def test_apply_update_preserves_long_lines(self):
"""A line > 2000 chars must be preserved verbatim after an unrelated hunk."""
long_line = "x" * 3000
patch = """\
*** Begin Patch
*** Update File: wide.py
@@ short_func @@
def short_func():
- return 1
+ return 2
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
file_content = f"def short_func():\n return 1\n{long_line}\n"
class FakeFileOps:
written = None
def read_file_raw(self, path):
return SimpleNamespace(content=file_content, error=None)
def write_file(self, path, content):
self.written = content
return SimpleNamespace(error=None)
file_ops = FakeFileOps()
result = apply_v4a_operations(ops, file_ops)
assert result.success is True
assert long_line in file_ops.written, "Long line was truncated"
assert "... [truncated]" not in file_ops.written
class TestValidationPhase:
"""Bug 2 regression tests — validation prevents partial apply."""
def test_validation_failure_writes_nothing(self):
"""If one hunk is invalid, no files should be written."""
patch = """\
*** Begin Patch
*** Update File: a.py
def good():
- return 1
+ return 2
*** Update File: b.py
THIS LINE DOES NOT EXIST
- old
+ new
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
written = {}
class FakeFileOps:
def read_file_raw(self, path):
files = {
"a.py": "def good():\n return 1\n",
"b.py": "completely different content\n",
}
content = files.get(path)
if content is None:
return SimpleNamespace(content=None, error=f"File not found: {path}")
return SimpleNamespace(content=content, error=None)
def write_file(self, path, content):
written[path] = content
return SimpleNamespace(error=None)
result = apply_v4a_operations(ops, FakeFileOps())
assert result.success is False
assert written == {}, f"No files should have been written, got: {list(written.keys())}"
assert "validation failed" in result.error.lower()
def test_all_valid_operations_applied(self):
"""When all operations are valid, all files are written."""
patch = """\
*** Begin Patch
*** Update File: a.py
def foo():
- return 1
+ return 2
*** Update File: b.py
def bar():
- pass
+ return True
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
written = {}
class FakeFileOps:
def read_file_raw(self, path):
files = {
"a.py": "def foo():\n return 1\n",
"b.py": "def bar():\n pass\n",
}
return SimpleNamespace(content=files[path], error=None)
def write_file(self, path, content):
written[path] = content
return SimpleNamespace(error=None)
result = apply_v4a_operations(ops, FakeFileOps())
assert result.success is True
assert set(written.keys()) == {"a.py", "b.py"}
class TestApplyDelete:
"""Tests for _apply_delete producing a real unified diff."""
def test_delete_diff_contains_removed_lines(self):
"""_apply_delete must embed the actual file content in the diff, not a placeholder."""
patch = """\
*** Begin Patch
*** Delete File: old/stuff.py
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
class FakeFileOps:
deleted = False
def read_file_raw(self, path):
return SimpleNamespace(
content="def old_func():\n return 42\n",
error=None,
)
def delete_file(self, path):
self.deleted = True
return SimpleNamespace(error=None)
file_ops = FakeFileOps()
result = apply_v4a_operations(ops, file_ops)
assert result.success is True
assert file_ops.deleted is True
# Diff must contain the actual removed lines, not a bare comment
assert "-def old_func():" in result.diff
assert "- return 42" in result.diff
assert "/dev/null" in result.diff
def test_delete_diff_fallback_on_empty_file(self):
"""An empty file should produce the fallback comment diff."""
patch = """\
*** Begin Patch
*** Delete File: empty.py
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
class FakeFileOps:
def read_file_raw(self, path):
return SimpleNamespace(content="", error=None)
def delete_file(self, path):
return SimpleNamespace(error=None)
result = apply_v4a_operations(ops, FakeFileOps())
assert result.success is True
# unified_diff produces nothing for two empty inputs — fallback comment expected
assert "Deleted" in result.diff or result.diff.strip() == ""
class TestCountOccurrences:
def test_basic(self):
from tools.patch_parser import _count_occurrences
assert _count_occurrences("aaa", "a") == 3
assert _count_occurrences("aaa", "aa") == 2
assert _count_occurrences("hello world", "xyz") == 0
assert _count_occurrences("", "x") == 0
class TestParseErrorSignalling:
"""Bug 3 regression tests — parse_v4a_patch must signal errors, not swallow them."""
def test_update_with_no_hunks_returns_error(self):
"""An UPDATE with no hunk lines is a malformed patch and should error."""
patch = """\
*** Begin Patch
*** Update File: foo.py
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is not None, "Expected a parse error for hunk-less UPDATE"
assert ops == []
def test_move_without_destination_returns_error(self):
"""A MOVE without '->' syntax should not silently produce a broken operation."""
# The move regex requires '->' so this will be treated as an unrecognised
# line and the op is never created. Confirm nothing crashes and ops is empty.
patch = """\
*** Begin Patch
*** Move File: src/foo.py
*** End Patch"""
ops, err = parse_v4a_patch(patch)
# Either parse sees zero ops (fine) or returns an error (also fine).
# What is NOT acceptable is ops=[MOVE op with empty new_path] + err=None.
if ops:
assert err is not None, (
"MOVE with missing destination must either produce empty ops or an error"
)
def test_valid_patch_returns_no_error(self):
"""A well-formed patch must still return err=None."""
patch = """\
*** Begin Patch
*** Update File: f.py
ctx
-old
+new
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
assert len(ops) == 1

View file

@ -252,23 +252,43 @@ class FileOperations(ABC):
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult: def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
"""Read a file with pagination support.""" """Read a file with pagination support."""
... ...
@abstractmethod
def read_file_raw(self, path: str) -> ReadResult:
"""Read the complete file content as a plain string.
No pagination, no line-number prefixes, no per-line truncation.
Returns ReadResult with .content = full file text, .error set on
failure. Always reads to EOF regardless of file size.
"""
...
@abstractmethod @abstractmethod
def write_file(self, path: str, content: str) -> WriteResult: def write_file(self, path: str, content: str) -> WriteResult:
"""Write content to a file, creating directories as needed.""" """Write content to a file, creating directories as needed."""
... ...
@abstractmethod @abstractmethod
def patch_replace(self, path: str, old_string: str, new_string: str, def patch_replace(self, path: str, old_string: str, new_string: str,
replace_all: bool = False) -> PatchResult: replace_all: bool = False) -> PatchResult:
"""Replace text in a file using fuzzy matching.""" """Replace text in a file using fuzzy matching."""
... ...
@abstractmethod @abstractmethod
def patch_v4a(self, patch_content: str) -> PatchResult: def patch_v4a(self, patch_content: str) -> PatchResult:
"""Apply a V4A format patch.""" """Apply a V4A format patch."""
... ...
@abstractmethod
def delete_file(self, path: str) -> WriteResult:
"""Delete a file. Returns WriteResult with .error set on failure."""
...
@abstractmethod
def move_file(self, src: str, dst: str) -> WriteResult:
"""Move/rename a file from src to dst. Returns WriteResult with .error set on failure."""
...
@abstractmethod @abstractmethod
def search(self, pattern: str, path: str = ".", target: str = "content", def search(self, pattern: str, path: str = ".", target: str = "content",
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0, file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
@ -561,10 +581,62 @@ class ShellFileOperations(FileOperations):
similar_files=similar[:5] # Limit to 5 suggestions similar_files=similar[:5] # Limit to 5 suggestions
) )
def read_file_raw(self, path: str) -> ReadResult:
"""Read the complete file content as a plain string.
No pagination, no line-number prefixes, no per-line truncation.
Uses cat so the full file is returned regardless of size.
"""
path = self._expand_path(path)
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
stat_result = self._exec(stat_cmd)
if stat_result.exit_code != 0:
return self._suggest_similar_files(path)
try:
file_size = int(stat_result.stdout.strip())
except ValueError:
file_size = 0
if self._is_image(path):
return ReadResult(is_image=True, is_binary=True, file_size=file_size)
sample_result = self._exec(f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null")
if self._is_likely_binary(path, sample_result.stdout):
return ReadResult(
is_binary=True, file_size=file_size,
error="Binary file — cannot display as text."
)
cat_result = self._exec(f"cat {self._escape_shell_arg(path)}")
if cat_result.exit_code != 0:
return ReadResult(error=f"Failed to read file: {cat_result.stdout}")
return ReadResult(content=cat_result.stdout, file_size=file_size)
def delete_file(self, path: str) -> WriteResult:
"""Delete a file via rm."""
path = self._expand_path(path)
if _is_write_denied(path):
return WriteResult(error=f"Delete denied: {path} is a protected path")
result = self._exec(f"rm -f {self._escape_shell_arg(path)}")
if result.exit_code != 0:
return WriteResult(error=f"Failed to delete {path}: {result.stdout}")
return WriteResult()
def move_file(self, src: str, dst: str) -> WriteResult:
"""Move a file via mv."""
src = self._expand_path(src)
dst = self._expand_path(dst)
for p in (src, dst):
if _is_write_denied(p):
return WriteResult(error=f"Move denied: {p} is a protected path")
result = self._exec(
f"mv {self._escape_shell_arg(src)} {self._escape_shell_arg(dst)}"
)
if result.exit_code != 0:
return WriteResult(error=f"Failed to move {src} -> {dst}: {result.stdout}")
return WriteResult()
# ========================================================================= # =========================================================================
# WRITE Implementation # WRITE Implementation
# ========================================================================= # =========================================================================
def write_file(self, path: str, content: str) -> WriteResult: def write_file(self, path: str, content: str) -> WriteResult:
""" """
Write content to a file, creating parent directories as needed. Write content to a file, creating parent directories as needed.
@ -656,7 +728,7 @@ class ShellFileOperations(FileOperations):
# Import and use fuzzy matching # Import and use fuzzy matching
from tools.fuzzy_match import fuzzy_find_and_replace from tools.fuzzy_match import fuzzy_find_and_replace
new_content, match_count, error = fuzzy_find_and_replace( new_content, match_count, _strategy, error = fuzzy_find_and_replace(
content, old_string, new_string, replace_all content, old_string, new_string, replace_all
) )

View file

@ -21,7 +21,7 @@ Multi-occurrence matching is handled via the replace_all flag.
Usage: Usage:
from tools.fuzzy_match import fuzzy_find_and_replace from tools.fuzzy_match import fuzzy_find_and_replace
new_content, match_count, error = fuzzy_find_and_replace( new_content, match_count, strategy, error = fuzzy_find_and_replace(
content="def foo():\\n pass", content="def foo():\\n pass",
old_string="def foo():", old_string="def foo():",
new_string="def bar():", new_string="def bar():",
@ -48,27 +48,27 @@ def _unicode_normalize(text: str) -> str:
def fuzzy_find_and_replace(content: str, old_string: str, new_string: str, def fuzzy_find_and_replace(content: str, old_string: str, new_string: str,
replace_all: bool = False) -> Tuple[str, int, Optional[str]]: replace_all: bool = False) -> Tuple[str, int, Optional[str], Optional[str]]:
""" """
Find and replace text using a chain of increasingly fuzzy matching strategies. Find and replace text using a chain of increasingly fuzzy matching strategies.
Args: Args:
content: The file content to search in content: The file content to search in
old_string: The text to find old_string: The text to find
new_string: The replacement text new_string: The replacement text
replace_all: If True, replace all occurrences; if False, require uniqueness replace_all: If True, replace all occurrences; if False, require uniqueness
Returns: Returns:
Tuple of (new_content, match_count, error_message) Tuple of (new_content, match_count, strategy_name, error_message)
- If successful: (modified_content, number_of_replacements, None) - If successful: (modified_content, number_of_replacements, strategy_used, None)
- If failed: (original_content, 0, error_description) - If failed: (original_content, 0, None, error_description)
""" """
if not old_string: if not old_string:
return content, 0, "old_string cannot be empty" return content, 0, None, "old_string cannot be empty"
if old_string == new_string: if old_string == new_string:
return content, 0, "old_string and new_string are identical" return content, 0, None, "old_string and new_string are identical"
# Try each matching strategy in order # Try each matching strategy in order
strategies: List[Tuple[str, Callable]] = [ strategies: List[Tuple[str, Callable]] = [
("exact", _strategy_exact), ("exact", _strategy_exact),
@ -77,27 +77,28 @@ def fuzzy_find_and_replace(content: str, old_string: str, new_string: str,
("indentation_flexible", _strategy_indentation_flexible), ("indentation_flexible", _strategy_indentation_flexible),
("escape_normalized", _strategy_escape_normalized), ("escape_normalized", _strategy_escape_normalized),
("trimmed_boundary", _strategy_trimmed_boundary), ("trimmed_boundary", _strategy_trimmed_boundary),
("unicode_normalized", _strategy_unicode_normalized),
("block_anchor", _strategy_block_anchor), ("block_anchor", _strategy_block_anchor),
("context_aware", _strategy_context_aware), ("context_aware", _strategy_context_aware),
] ]
for _strategy_name, strategy_fn in strategies: for strategy_name, strategy_fn in strategies:
matches = strategy_fn(content, old_string) matches = strategy_fn(content, old_string)
if matches: if matches:
# Found matches with this strategy # Found matches with this strategy
if len(matches) > 1 and not replace_all: if len(matches) > 1 and not replace_all:
return content, 0, ( return content, 0, None, (
f"Found {len(matches)} matches for old_string. " f"Found {len(matches)} matches for old_string. "
f"Provide more context to make it unique, or use replace_all=True." f"Provide more context to make it unique, or use replace_all=True."
) )
# Perform replacement # Perform replacement
new_content = _apply_replacements(content, matches, new_string) new_content = _apply_replacements(content, matches, new_string)
return new_content, len(matches), None return new_content, len(matches), strategy_name, None
# No strategy found a match # No strategy found a match
return content, 0, "Could not find a match for old_string in the file" return content, 0, None, "Could not find a match for old_string in the file"
def _apply_replacements(content: str, matches: List[Tuple[int, int]], new_string: str) -> str: def _apply_replacements(content: str, matches: List[Tuple[int, int]], new_string: str) -> str:
@ -258,9 +259,90 @@ def _strategy_trimmed_boundary(content: str, pattern: str) -> List[Tuple[int, in
return matches return matches
def _build_orig_to_norm_map(original: str) -> List[int]:
"""Build a list mapping each original character index to its normalized index.
Because UNICODE_MAP replacements may expand characters (e.g. em-dash '--',
ellipsis '...'), the normalised string can be longer than the original.
This map lets us convert positions in the normalised string back to the
corresponding positions in the original string.
Returns a list of length ``len(original) + 1``; entry ``i`` is the
normalised index that character ``i`` maps to.
"""
result: List[int] = []
norm_pos = 0
for char in original:
result.append(norm_pos)
repl = UNICODE_MAP.get(char)
norm_pos += len(repl) if repl is not None else 1
result.append(norm_pos) # sentinel: one past the last character
return result
def _map_positions_norm_to_orig(
orig_to_norm: List[int],
norm_matches: List[Tuple[int, int]],
) -> List[Tuple[int, int]]:
"""Convert (start, end) positions in the normalised string to original positions."""
# Invert the map: norm_pos -> first original position with that norm_pos
norm_to_orig_start: dict[int, int] = {}
for orig_pos, norm_pos in enumerate(orig_to_norm[:-1]):
if norm_pos not in norm_to_orig_start:
norm_to_orig_start[norm_pos] = orig_pos
results: List[Tuple[int, int]] = []
orig_len = len(orig_to_norm) - 1 # number of original characters
for norm_start, norm_end in norm_matches:
if norm_start not in norm_to_orig_start:
continue
orig_start = norm_to_orig_start[norm_start]
# Walk forward until orig_to_norm[orig_end] >= norm_end
orig_end = orig_start
while orig_end < orig_len and orig_to_norm[orig_end] < norm_end:
orig_end += 1
results.append((orig_start, orig_end))
return results
def _strategy_unicode_normalized(content: str, pattern: str) -> List[Tuple[int, int]]:
"""Strategy 7: Unicode normalisation.
Normalises smart quotes, em/en-dashes, ellipsis, and non-breaking spaces
to their ASCII equivalents in both *content* and *pattern*, then runs
exact and line_trimmed matching on the normalised copies.
Positions are mapped back to the *original* string via
``_build_orig_to_norm_map`` necessary because some UNICODE_MAP
replacements expand a single character into multiple ASCII characters,
making a naïve position copy incorrect.
"""
# Normalize both sides. Either the content or the pattern (or both) may
# carry unicode variants — e.g. content has an em-dash that should match
# the LLM's ASCII '--', or vice-versa. Skip only when neither changes.
norm_pattern = _unicode_normalize(pattern)
norm_content = _unicode_normalize(content)
if norm_content == content and norm_pattern == pattern:
return []
norm_matches = _strategy_exact(norm_content, norm_pattern)
if not norm_matches:
norm_matches = _strategy_line_trimmed(norm_content, norm_pattern)
if not norm_matches:
return []
orig_to_norm = _build_orig_to_norm_map(content)
return _map_positions_norm_to_orig(orig_to_norm, norm_matches)
def _strategy_block_anchor(content: str, pattern: str) -> List[Tuple[int, int]]: def _strategy_block_anchor(content: str, pattern: str) -> List[Tuple[int, int]]:
""" """
Strategy 7: Match by anchoring on first and last lines. Strategy 8: Match by anchoring on first and last lines.
Adjusted with permissive thresholds and unicode normalization. Adjusted with permissive thresholds and unicode normalization.
""" """
# Normalize both strings for comparison while keeping original content for offset calculation # Normalize both strings for comparison while keeping original content for offset calculation
@ -290,8 +372,10 @@ def _strategy_block_anchor(content: str, pattern: str) -> List[Tuple[int, int]]:
matches = [] matches = []
candidate_count = len(potential_matches) candidate_count = len(potential_matches)
# Thresholding logic: 0.10 for unique matches (max flexibility), 0.30 for multiple candidates # Thresholding logic: 0.50 for unique matches, 0.70 for multiple candidates.
threshold = 0.10 if candidate_count == 1 else 0.30 # Previous values (0.10 / 0.30) were dangerously loose — a 10% middle-section
# similarity could match completely unrelated blocks.
threshold = 0.50 if candidate_count == 1 else 0.70
for i in potential_matches: for i in potential_matches:
if pattern_line_count <= 2: if pattern_line_count <= 2:
@ -314,7 +398,7 @@ def _strategy_block_anchor(content: str, pattern: str) -> List[Tuple[int, int]]:
def _strategy_context_aware(content: str, pattern: str) -> List[Tuple[int, int]]: def _strategy_context_aware(content: str, pattern: str) -> List[Tuple[int, int]]:
""" """
Strategy 8: Line-by-line similarity with 50% threshold. Strategy 9: Line-by-line similarity with 50% threshold.
Finds blocks where at least 50% of lines have high similarity. Finds blocks where at least 50% of lines have high similarity.
""" """

View file

@ -28,6 +28,7 @@ Usage:
result = apply_v4a_operations(operations, file_ops) result = apply_v4a_operations(operations, file_ops)
""" """
import difflib
import re import re
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import List, Optional, Tuple, Any from typing import List, Optional, Tuple, Any
@ -202,31 +203,162 @@ def parse_v4a_patch(patch_content: str) -> Tuple[List[PatchOperation], Optional[
if current_hunk and current_hunk.lines: if current_hunk and current_hunk.lines:
current_op.hunks.append(current_hunk) current_op.hunks.append(current_hunk)
operations.append(current_op) operations.append(current_op)
# Validate the parsed result
if not operations:
# Empty patch is not an error — callers get [] and can decide
return operations, None
parse_errors: List[str] = []
for op in operations:
if not op.file_path:
parse_errors.append("Operation with empty file path")
if op.operation == OperationType.UPDATE and not op.hunks:
parse_errors.append(f"UPDATE {op.file_path!r}: no hunks found")
if op.operation == OperationType.MOVE and not op.new_path:
parse_errors.append(f"MOVE {op.file_path!r}: missing destination path (expected 'src -> dst')")
if parse_errors:
return [], "Parse error: " + "; ".join(parse_errors)
return operations, None return operations, None
def apply_v4a_operations(operations: List[PatchOperation], def _count_occurrences(text: str, pattern: str) -> int:
file_ops: Any) -> 'PatchResult': """Count non-overlapping occurrences of *pattern* in *text*."""
count = 0
start = 0
while True:
pos = text.find(pattern, start)
if pos == -1:
break
count += 1
start = pos + 1
return count
def _validate_operations(
operations: List[PatchOperation],
file_ops: Any,
) -> List[str]:
"""Validate all operations without writing any files.
Returns a list of error strings; an empty list means all operations
are valid and the apply phase can proceed safely.
For UPDATE operations, hunks are simulated in order so that later
hunks validate against post-earlier-hunk content (matching apply order).
""" """
Apply V4A patch operations using a file operations interface. # Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency
from tools.fuzzy_match import fuzzy_find_and_replace
errors: List[str] = []
for op in operations:
if op.operation == OperationType.UPDATE:
read_result = file_ops.read_file_raw(op.file_path)
if read_result.error:
errors.append(f"{op.file_path}: {read_result.error}")
continue
simulated = read_result.content
for hunk in op.hunks:
search_lines = [l.content for l in hunk.lines if l.prefix in (' ', '-')]
if not search_lines:
# Addition-only hunk: validate context hint uniqueness
if hunk.context_hint:
occurrences = _count_occurrences(simulated, hunk.context_hint)
if occurrences == 0:
errors.append(
f"{op.file_path}: addition-only hunk context hint "
f"'{hunk.context_hint}' not found"
)
elif occurrences > 1:
errors.append(
f"{op.file_path}: addition-only hunk context hint "
f"'{hunk.context_hint}' is ambiguous "
f"({occurrences} occurrences)"
)
continue
search_pattern = '\n'.join(search_lines)
replace_lines = [l.content for l in hunk.lines if l.prefix in (' ', '+')]
replacement = '\n'.join(replace_lines)
new_simulated, count, _strategy, match_error = fuzzy_find_and_replace(
simulated, search_pattern, replacement, replace_all=False
)
if count == 0:
label = f"'{hunk.context_hint}'" if hunk.context_hint else "(no hint)"
errors.append(
f"{op.file_path}: hunk {label} not found"
+ (f"{match_error}" if match_error else "")
)
else:
# Advance simulation so subsequent hunks validate correctly.
# Reuse the result from the call above — no second fuzzy run.
simulated = new_simulated
elif op.operation == OperationType.DELETE:
read_result = file_ops.read_file_raw(op.file_path)
if read_result.error:
errors.append(f"{op.file_path}: file not found for deletion")
elif op.operation == OperationType.MOVE:
if not op.new_path:
errors.append(f"{op.file_path}: MOVE operation missing destination path")
continue
src_result = file_ops.read_file_raw(op.file_path)
if src_result.error:
errors.append(f"{op.file_path}: source file not found for move")
dst_result = file_ops.read_file_raw(op.new_path)
if not dst_result.error:
errors.append(
f"{op.new_path}: destination already exists — move would overwrite"
)
# ADD: parent directory creation handled by write_file; no pre-check needed.
return errors
def apply_v4a_operations(operations: List[PatchOperation],
file_ops: Any) -> 'PatchResult':
"""Apply V4A patch operations using a file operations interface.
Uses a two-phase validate-then-apply approach:
- Phase 1: validate all operations against current file contents without
writing anything. If any validation error is found, return immediately
with no filesystem changes.
- Phase 2: apply all operations. A failure here (e.g. a race between
validation and apply) is reported with a note to run ``git diff``.
Args: Args:
operations: List of PatchOperation from parse_v4a_patch operations: List of PatchOperation from parse_v4a_patch
file_ops: Object with read_file, write_file methods file_ops: Object with read_file_raw, write_file methods
Returns: Returns:
PatchResult with results of all operations PatchResult with results of all operations
""" """
# Import here to avoid circular imports # Import here to avoid circular imports
from tools.file_operations import PatchResult from tools.file_operations import PatchResult
# ---- Phase 1: validate ----
validation_errors = _validate_operations(operations, file_ops)
if validation_errors:
return PatchResult(
success=False,
error="Patch validation failed (no files were modified):\n"
+ "\n".join(f"{e}" for e in validation_errors),
)
# ---- Phase 2: apply ----
files_modified = [] files_modified = []
files_created = [] files_created = []
files_deleted = [] files_deleted = []
all_diffs = [] all_diffs = []
errors = [] errors = []
for op in operations: for op in operations:
try: try:
if op.operation == OperationType.ADD: if op.operation == OperationType.ADD:
@ -236,7 +368,7 @@ def apply_v4a_operations(operations: List[PatchOperation],
all_diffs.append(result[1]) all_diffs.append(result[1])
else: else:
errors.append(f"Failed to add {op.file_path}: {result[1]}") errors.append(f"Failed to add {op.file_path}: {result[1]}")
elif op.operation == OperationType.DELETE: elif op.operation == OperationType.DELETE:
result = _apply_delete(op, file_ops) result = _apply_delete(op, file_ops)
if result[0]: if result[0]:
@ -244,7 +376,7 @@ def apply_v4a_operations(operations: List[PatchOperation],
all_diffs.append(result[1]) all_diffs.append(result[1])
else: else:
errors.append(f"Failed to delete {op.file_path}: {result[1]}") errors.append(f"Failed to delete {op.file_path}: {result[1]}")
elif op.operation == OperationType.MOVE: elif op.operation == OperationType.MOVE:
result = _apply_move(op, file_ops) result = _apply_move(op, file_ops)
if result[0]: if result[0]:
@ -252,7 +384,7 @@ def apply_v4a_operations(operations: List[PatchOperation],
all_diffs.append(result[1]) all_diffs.append(result[1])
else: else:
errors.append(f"Failed to move {op.file_path}: {result[1]}") errors.append(f"Failed to move {op.file_path}: {result[1]}")
elif op.operation == OperationType.UPDATE: elif op.operation == OperationType.UPDATE:
result = _apply_update(op, file_ops) result = _apply_update(op, file_ops)
if result[0]: if result[0]:
@ -260,19 +392,19 @@ def apply_v4a_operations(operations: List[PatchOperation],
all_diffs.append(result[1]) all_diffs.append(result[1])
else: else:
errors.append(f"Failed to update {op.file_path}: {result[1]}") errors.append(f"Failed to update {op.file_path}: {result[1]}")
except Exception as e: except Exception as e:
errors.append(f"Error processing {op.file_path}: {str(e)}") errors.append(f"Error processing {op.file_path}: {str(e)}")
# Run lint on all modified/created files # Run lint on all modified/created files
lint_results = {} lint_results = {}
for f in files_modified + files_created: for f in files_modified + files_created:
if hasattr(file_ops, '_check_lint'): if hasattr(file_ops, '_check_lint'):
lint_result = file_ops._check_lint(f) lint_result = file_ops._check_lint(f)
lint_results[f] = lint_result.to_dict() lint_results[f] = lint_result.to_dict()
combined_diff = '\n'.join(all_diffs) combined_diff = '\n'.join(all_diffs)
if errors: if errors:
return PatchResult( return PatchResult(
success=False, success=False,
@ -281,16 +413,17 @@ def apply_v4a_operations(operations: List[PatchOperation],
files_created=files_created, files_created=files_created,
files_deleted=files_deleted, files_deleted=files_deleted,
lint=lint_results if lint_results else None, lint=lint_results if lint_results else None,
error='; '.join(errors) error="Apply phase failed (state may be inconsistent — run `git diff` to assess):\n"
+ "\n".join(f"{e}" for e in errors),
) )
return PatchResult( return PatchResult(
success=True, success=True,
diff=combined_diff, diff=combined_diff,
files_modified=files_modified, files_modified=files_modified,
files_created=files_created, files_created=files_created,
files_deleted=files_deleted, files_deleted=files_deleted,
lint=lint_results if lint_results else None lint=lint_results if lint_results else None,
) )
@ -317,68 +450,56 @@ def _apply_add(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
def _apply_delete(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: def _apply_delete(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
"""Apply a delete file operation.""" """Apply a delete file operation."""
# Read file first for diff # Read before deleting so we can produce a real unified diff.
read_result = file_ops.read_file(op.file_path) # Validation already confirmed existence; this guards against races.
read_result = file_ops.read_file_raw(op.file_path)
if read_result.error and "not found" in read_result.error.lower(): if read_result.error:
# File doesn't exist, nothing to delete return False, f"Cannot delete {op.file_path}: file not found"
return True, f"# {op.file_path} already deleted or doesn't exist"
result = file_ops.delete_file(op.file_path)
# Delete directly via shell command using the underlying environment if result.error:
rm_result = file_ops._exec(f"rm -f {file_ops._escape_shell_arg(op.file_path)}") return False, result.error
if rm_result.exit_code != 0: removed_lines = read_result.content.splitlines(keepends=True)
return False, rm_result.stdout diff = ''.join(difflib.unified_diff(
removed_lines, [],
diff = f"--- a/{op.file_path}\n+++ /dev/null\n# File deleted" fromfile=f"a/{op.file_path}",
return True, diff tofile="/dev/null",
))
return True, diff or f"# Deleted: {op.file_path}"
def _apply_move(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: def _apply_move(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
"""Apply a move file operation.""" """Apply a move file operation."""
# Use shell mv command result = file_ops.move_file(op.file_path, op.new_path)
mv_result = file_ops._exec( if result.error:
f"mv {file_ops._escape_shell_arg(op.file_path)} {file_ops._escape_shell_arg(op.new_path)}" return False, result.error
)
if mv_result.exit_code != 0:
return False, mv_result.stdout
diff = f"# Moved: {op.file_path} -> {op.new_path}" diff = f"# Moved: {op.file_path} -> {op.new_path}"
return True, diff return True, diff
def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
"""Apply an update file operation.""" """Apply an update file operation."""
# Read current content # Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency
read_result = file_ops.read_file(op.file_path, limit=10000) from tools.fuzzy_match import fuzzy_find_and_replace
# Read current content — raw so no line-number prefixes or per-line truncation
read_result = file_ops.read_file_raw(op.file_path)
if read_result.error: if read_result.error:
return False, f"Cannot read file: {read_result.error}" return False, f"Cannot read file: {read_result.error}"
# Parse content (remove line numbers) current_content = read_result.content
current_lines = []
for line in read_result.content.split('\n'):
if re.match(r'^\s*\d+\|', line):
# Line format: " 123|content"
parts = line.split('|', 1)
if len(parts) == 2:
current_lines.append(parts[1])
else:
current_lines.append(line)
else:
current_lines.append(line)
current_content = '\n'.join(current_lines)
# Apply each hunk # Apply each hunk
new_content = current_content new_content = current_content
for hunk in op.hunks: for hunk in op.hunks:
# Build search pattern from context and removed lines # Build search pattern from context and removed lines
search_lines = [] search_lines = []
replace_lines = [] replace_lines = []
for line in hunk.lines: for line in hunk.lines:
if line.prefix == ' ': if line.prefix == ' ':
search_lines.append(line.content) search_lines.append(line.content)
@ -387,17 +508,15 @@ def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
search_lines.append(line.content) search_lines.append(line.content)
elif line.prefix == '+': elif line.prefix == '+':
replace_lines.append(line.content) replace_lines.append(line.content)
if search_lines: if search_lines:
search_pattern = '\n'.join(search_lines) search_pattern = '\n'.join(search_lines)
replacement = '\n'.join(replace_lines) replacement = '\n'.join(replace_lines)
# Use fuzzy matching new_content, count, _strategy, error = fuzzy_find_and_replace(
from tools.fuzzy_match import fuzzy_find_and_replace
new_content, count, error = fuzzy_find_and_replace(
new_content, search_pattern, replacement, replace_all=False new_content, search_pattern, replacement, replace_all=False
) )
if error and count == 0: if error and count == 0:
# Try with context hint if available # Try with context hint if available
if hunk.context_hint: if hunk.context_hint:
@ -408,8 +527,8 @@ def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
window_start = max(0, hint_pos - 500) window_start = max(0, hint_pos - 500)
window_end = min(len(new_content), hint_pos + 2000) window_end = min(len(new_content), hint_pos + 2000)
window = new_content[window_start:window_end] window = new_content[window_start:window_end]
window_new, count, error = fuzzy_find_and_replace( window_new, count, _strategy, error = fuzzy_find_and_replace(
window, search_pattern, replacement, replace_all=False window, search_pattern, replacement, replace_all=False
) )
@ -424,16 +543,23 @@ def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
# Insert at the location indicated by the context hint, or at end of file. # Insert at the location indicated by the context hint, or at end of file.
insert_text = '\n'.join(replace_lines) insert_text = '\n'.join(replace_lines)
if hunk.context_hint: if hunk.context_hint:
hint_pos = new_content.find(hunk.context_hint) occurrences = _count_occurrences(new_content, hunk.context_hint)
if hint_pos != -1: if occurrences == 0:
# Hint not found — append at end as a safe fallback
new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n'
elif occurrences > 1:
return False, (
f"Addition-only hunk: context hint '{hunk.context_hint}' is ambiguous "
f"({occurrences} occurrences) — provide a more unique hint"
)
else:
hint_pos = new_content.find(hunk.context_hint)
# Insert after the line containing the context hint # Insert after the line containing the context hint
eol = new_content.find('\n', hint_pos) eol = new_content.find('\n', hint_pos)
if eol != -1: if eol != -1:
new_content = new_content[:eol + 1] + insert_text + '\n' + new_content[eol + 1:] new_content = new_content[:eol + 1] + insert_text + '\n' + new_content[eol + 1:]
else: else:
new_content = new_content + '\n' + insert_text new_content = new_content + '\n' + insert_text
else:
new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n'
else: else:
new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n' new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n'
@ -443,7 +569,6 @@ def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
return False, write_result.error return False, write_result.error
# Generate diff # Generate diff
import difflib
diff_lines = difflib.unified_diff( diff_lines = difflib.unified_diff(
current_content.splitlines(keepends=True), current_content.splitlines(keepends=True),
new_content.splitlines(keepends=True), new_content.splitlines(keepends=True),

View file

@ -426,7 +426,7 @@ def _patch_skill(
# from exact-match failures on minor formatting mismatches. # from exact-match failures on minor formatting mismatches.
from tools.fuzzy_match import fuzzy_find_and_replace from tools.fuzzy_match import fuzzy_find_and_replace
new_content, match_count, match_error = fuzzy_find_and_replace( new_content, match_count, _strategy, match_error = fuzzy_find_and_replace(
content, old_string, new_string, replace_all content, old_string, new_string, replace_all
) )
if match_error: if match_error: