diff --git a/tests/tools/test_file_operations.py b/tests/tools/test_file_operations.py index 0db3fb43b..dc8ccbde6 100644 --- a/tests/tools/test_file_operations.py +++ b/tests/tools/test_file_operations.py @@ -333,3 +333,25 @@ class TestShellFileOpsWriteDenied: result = file_ops.patch_replace("~/.ssh/authorized_keys", "old", "new") assert result.error is not None assert "denied" in result.error.lower() + + def test_delete_file_denied_path(self, file_ops): + result = file_ops.delete_file("~/.ssh/authorized_keys") + assert result.error is not None + assert "denied" in result.error.lower() + + def test_move_file_src_denied(self, file_ops): + result = file_ops.move_file("~/.ssh/id_rsa", "/tmp/dest.txt") + assert result.error is not None + assert "denied" in result.error.lower() + + def test_move_file_dst_denied(self, file_ops): + result = file_ops.move_file("/tmp/src.txt", "~/.aws/credentials") + assert result.error is not None + assert "denied" in result.error.lower() + + def test_move_file_failure_path(self, mock_env): + mock_env.execute.return_value = {"output": "No such file or directory", "returncode": 1} + ops = ShellFileOperations(mock_env) + result = ops.move_file("/tmp/nonexistent.txt", "/tmp/dest.txt") + assert result.error is not None + assert "Failed to move" in result.error diff --git a/tests/tools/test_fuzzy_match.py b/tests/tools/test_fuzzy_match.py index e16bd96cf..c1dbc5446 100644 --- a/tests/tools/test_fuzzy_match.py +++ b/tests/tools/test_fuzzy_match.py @@ -6,31 +6,31 @@ from tools.fuzzy_match import fuzzy_find_and_replace class TestExactMatch: def test_single_replacement(self): content = "hello world" - new, count, err = fuzzy_find_and_replace(content, "hello", "hi") + new, count, _, err = fuzzy_find_and_replace(content, "hello", "hi") assert err is None assert count == 1 assert new == "hi world" def test_no_match(self): content = "hello world" - new, count, err = fuzzy_find_and_replace(content, "xyz", "abc") + new, count, _, err = fuzzy_find_and_replace(content, "xyz", "abc") assert count == 0 assert err is not None assert new == content def test_empty_old_string(self): - new, count, err = fuzzy_find_and_replace("abc", "", "x") + new, count, _, err = fuzzy_find_and_replace("abc", "", "x") assert count == 0 assert err is not None def test_identical_strings(self): - new, count, err = fuzzy_find_and_replace("abc", "abc", "abc") + new, count, _, err = fuzzy_find_and_replace("abc", "abc", "abc") assert count == 0 assert "identical" in err def test_multiline_exact(self): content = "line1\nline2\nline3" - new, count, err = fuzzy_find_and_replace(content, "line1\nline2", "replaced") + new, count, _, err = fuzzy_find_and_replace(content, "line1\nline2", "replaced") assert err is None assert count == 1 assert new == "replaced\nline3" @@ -39,7 +39,7 @@ class TestExactMatch: class TestWhitespaceDifference: def test_extra_spaces_match(self): content = "def foo( x, y ):" - new, count, err = fuzzy_find_and_replace(content, "def foo( x, y ):", "def bar(x, y):") + new, count, _, err = fuzzy_find_and_replace(content, "def foo( x, y ):", "def bar(x, y):") assert count == 1 assert "bar" in new @@ -47,7 +47,7 @@ class TestWhitespaceDifference: class TestIndentDifference: def test_different_indentation(self): content = " def foo():\n pass" - new, count, err = fuzzy_find_and_replace(content, "def foo():\n pass", "def bar():\n return 1") + new, count, _, err = fuzzy_find_and_replace(content, "def foo():\n pass", "def bar():\n return 1") assert count == 1 assert "bar" in new @@ -55,13 +55,96 @@ class TestIndentDifference: class TestReplaceAll: def test_multiple_matches_without_flag_errors(self): content = "aaa bbb aaa" - new, count, err = fuzzy_find_and_replace(content, "aaa", "ccc", replace_all=False) + new, count, _, err = fuzzy_find_and_replace(content, "aaa", "ccc", replace_all=False) assert count == 0 assert "Found 2 matches" in err def test_multiple_matches_with_flag(self): content = "aaa bbb aaa" - new, count, err = fuzzy_find_and_replace(content, "aaa", "ccc", replace_all=True) + new, count, _, err = fuzzy_find_and_replace(content, "aaa", "ccc", replace_all=True) assert err is None assert count == 2 assert new == "ccc bbb ccc" + + +class TestUnicodeNormalized: + """Tests for the unicode_normalized strategy (Bug 5).""" + + def test_em_dash_matched(self): + """Em-dash in content should match ASCII '--' in pattern.""" + content = "return value\u2014fallback" + new, count, strategy, err = fuzzy_find_and_replace( + content, "return value--fallback", "return value or fallback" + ) + assert count == 1, f"Expected match via unicode_normalized, got err={err}" + assert strategy == "unicode_normalized" + assert "return value or fallback" in new + + def test_smart_quotes_matched(self): + """Smart double quotes in content should match straight quotes in pattern.""" + content = 'print(\u201chello\u201d)' + new, count, strategy, err = fuzzy_find_and_replace( + content, 'print("hello")', 'print("world")' + ) + assert count == 1, f"Expected match via unicode_normalized, got err={err}" + assert "world" in new + + def test_no_unicode_skips_strategy(self): + """When content and pattern have no Unicode variants, strategy is skipped.""" + content = "hello world" + # Should match via exact, not unicode_normalized + new, count, strategy, err = fuzzy_find_and_replace(content, "hello", "hi") + assert count == 1 + assert strategy == "exact" + + +class TestBlockAnchorThreshold: + """Tests for the raised block_anchor threshold (Bug 4).""" + + def test_high_similarity_matches(self): + """A block with >50% middle similarity should match.""" + content = "def foo():\n x = 1\n y = 2\n return x + y\n" + pattern = "def foo():\n x = 1\n y = 9\n return x + y" + new, count, strategy, err = fuzzy_find_and_replace(content, pattern, "def foo():\n return 0\n") + # Should match via block_anchor or earlier strategy + assert count == 1 + + def test_completely_different_middle_does_not_match(self): + """A block where only first+last lines match but middle is completely different + should NOT match under the raised 0.50 threshold.""" + content = ( + "class Foo:\n" + " completely = 'unrelated'\n" + " content = 'here'\n" + " nothing = 'in common'\n" + " pass\n" + ) + # Pattern has same first/last lines but completely different middle + pattern = ( + "class Foo:\n" + " x = 1\n" + " y = 2\n" + " z = 3\n" + " pass" + ) + new, count, strategy, err = fuzzy_find_and_replace(content, pattern, "replaced") + # With threshold=0.50, this near-zero-similarity middle should not match + assert count == 0, ( + f"Block with unrelated middle should not match under threshold=0.50, " + f"but matched via strategy={strategy}" + ) + + +class TestStrategyNameSurfaced: + """Tests for the strategy name in the 4-tuple return (Bug 6).""" + + def test_exact_strategy_name(self): + new, count, strategy, err = fuzzy_find_and_replace("hello", "hello", "world") + assert strategy == "exact" + assert count == 1 + + def test_failed_match_returns_none_strategy(self): + new, count, strategy, err = fuzzy_find_and_replace("hello", "xyz", "world") + assert count == 0 + assert strategy is None + assert err is not None diff --git a/tests/tools/test_patch_parser.py b/tests/tools/test_patch_parser.py index 42e5129f5..8c4a0c80a 100644 --- a/tests/tools/test_patch_parser.py +++ b/tests/tools/test_patch_parser.py @@ -159,7 +159,7 @@ class TestApplyUpdate: def __init__(self): self.written = None - def read_file(self, path, offset=1, limit=500): + def read_file_raw(self, path): return SimpleNamespace( content=( 'def run():\n' @@ -211,7 +211,7 @@ class TestAdditionOnlyHunks: # Apply to a file that contains the context hint class FakeFileOps: written = None - def read_file(self, path, **kw): + def read_file_raw(self, path): return SimpleNamespace( content="def main():\n pass\n", error=None, @@ -239,7 +239,7 @@ class TestAdditionOnlyHunks: class FakeFileOps: written = None - def read_file(self, path, **kw): + def read_file_raw(self, path): return SimpleNamespace( content="existing = True\n", error=None, @@ -253,3 +253,259 @@ class TestAdditionOnlyHunks: assert result.success is True assert file_ops.written.endswith("def new_func():\n return True\n") assert "existing = True" in file_ops.written + + +class TestReadFileRaw: + """Bug 1 regression tests — files > 2000 lines and lines > 2000 chars.""" + + def test_apply_update_file_over_2000_lines(self): + """A hunk targeting line 2200 must not truncate the file to 2000 lines.""" + patch = """\ +*** Begin Patch +*** Update File: big.py +@@ marker_at_2200 @@ + line_2200 +-old_value ++new_value +*** End Patch""" + ops, err = parse_v4a_patch(patch) + assert err is None + + # Build a 2500-line file; the hunk targets a region at line 2200 + lines = [f"line_{i}" for i in range(1, 2501)] + lines[2199] = "line_2200" # index 2199 = line 2200 + lines[2200] = "old_value" + file_content = "\n".join(lines) + + class FakeFileOps: + written = None + def read_file_raw(self, path): + return SimpleNamespace(content=file_content, error=None) + def write_file(self, path, content): + self.written = content + return SimpleNamespace(error=None) + + file_ops = FakeFileOps() + result = apply_v4a_operations(ops, file_ops) + assert result.success is True + written_lines = file_ops.written.split("\n") + assert len(written_lines) == 2500, ( + f"Expected 2500 lines, got {len(written_lines)}" + ) + assert "new_value" in file_ops.written + assert "old_value" not in file_ops.written + + def test_apply_update_preserves_long_lines(self): + """A line > 2000 chars must be preserved verbatim after an unrelated hunk.""" + long_line = "x" * 3000 + patch = """\ +*** Begin Patch +*** Update File: wide.py +@@ short_func @@ + def short_func(): +- return 1 ++ return 2 +*** End Patch""" + ops, err = parse_v4a_patch(patch) + assert err is None + + file_content = f"def short_func():\n return 1\n{long_line}\n" + + class FakeFileOps: + written = None + def read_file_raw(self, path): + return SimpleNamespace(content=file_content, error=None) + def write_file(self, path, content): + self.written = content + return SimpleNamespace(error=None) + + file_ops = FakeFileOps() + result = apply_v4a_operations(ops, file_ops) + assert result.success is True + assert long_line in file_ops.written, "Long line was truncated" + assert "... [truncated]" not in file_ops.written + + +class TestValidationPhase: + """Bug 2 regression tests — validation prevents partial apply.""" + + def test_validation_failure_writes_nothing(self): + """If one hunk is invalid, no files should be written.""" + patch = """\ +*** Begin Patch +*** Update File: a.py + def good(): +- return 1 ++ return 2 +*** Update File: b.py + THIS LINE DOES NOT EXIST +- old ++ new +*** End Patch""" + ops, err = parse_v4a_patch(patch) + assert err is None + + written = {} + + class FakeFileOps: + def read_file_raw(self, path): + files = { + "a.py": "def good():\n return 1\n", + "b.py": "completely different content\n", + } + content = files.get(path) + if content is None: + return SimpleNamespace(content=None, error=f"File not found: {path}") + return SimpleNamespace(content=content, error=None) + + def write_file(self, path, content): + written[path] = content + return SimpleNamespace(error=None) + + result = apply_v4a_operations(ops, FakeFileOps()) + assert result.success is False + assert written == {}, f"No files should have been written, got: {list(written.keys())}" + assert "validation failed" in result.error.lower() + + def test_all_valid_operations_applied(self): + """When all operations are valid, all files are written.""" + patch = """\ +*** Begin Patch +*** Update File: a.py + def foo(): +- return 1 ++ return 2 +*** Update File: b.py + def bar(): +- pass ++ return True +*** End Patch""" + ops, err = parse_v4a_patch(patch) + assert err is None + + written = {} + + class FakeFileOps: + def read_file_raw(self, path): + files = { + "a.py": "def foo():\n return 1\n", + "b.py": "def bar():\n pass\n", + } + return SimpleNamespace(content=files[path], error=None) + + def write_file(self, path, content): + written[path] = content + return SimpleNamespace(error=None) + + result = apply_v4a_operations(ops, FakeFileOps()) + assert result.success is True + assert set(written.keys()) == {"a.py", "b.py"} + + +class TestApplyDelete: + """Tests for _apply_delete producing a real unified diff.""" + + def test_delete_diff_contains_removed_lines(self): + """_apply_delete must embed the actual file content in the diff, not a placeholder.""" + patch = """\ +*** Begin Patch +*** Delete File: old/stuff.py +*** End Patch""" + ops, err = parse_v4a_patch(patch) + assert err is None + + class FakeFileOps: + deleted = False + + def read_file_raw(self, path): + return SimpleNamespace( + content="def old_func():\n return 42\n", + error=None, + ) + + def delete_file(self, path): + self.deleted = True + return SimpleNamespace(error=None) + + file_ops = FakeFileOps() + result = apply_v4a_operations(ops, file_ops) + + assert result.success is True + assert file_ops.deleted is True + # Diff must contain the actual removed lines, not a bare comment + assert "-def old_func():" in result.diff + assert "- return 42" in result.diff + assert "/dev/null" in result.diff + + def test_delete_diff_fallback_on_empty_file(self): + """An empty file should produce the fallback comment diff.""" + patch = """\ +*** Begin Patch +*** Delete File: empty.py +*** End Patch""" + ops, err = parse_v4a_patch(patch) + assert err is None + + class FakeFileOps: + def read_file_raw(self, path): + return SimpleNamespace(content="", error=None) + + def delete_file(self, path): + return SimpleNamespace(error=None) + + result = apply_v4a_operations(ops, FakeFileOps()) + assert result.success is True + # unified_diff produces nothing for two empty inputs — fallback comment expected + assert "Deleted" in result.diff or result.diff.strip() == "" + + +class TestCountOccurrences: + def test_basic(self): + from tools.patch_parser import _count_occurrences + assert _count_occurrences("aaa", "a") == 3 + assert _count_occurrences("aaa", "aa") == 2 + assert _count_occurrences("hello world", "xyz") == 0 + assert _count_occurrences("", "x") == 0 + + +class TestParseErrorSignalling: + """Bug 3 regression tests — parse_v4a_patch must signal errors, not swallow them.""" + + def test_update_with_no_hunks_returns_error(self): + """An UPDATE with no hunk lines is a malformed patch and should error.""" + patch = """\ +*** Begin Patch +*** Update File: foo.py +*** End Patch""" + ops, err = parse_v4a_patch(patch) + assert err is not None, "Expected a parse error for hunk-less UPDATE" + assert ops == [] + + def test_move_without_destination_returns_error(self): + """A MOVE without '->' syntax should not silently produce a broken operation.""" + # The move regex requires '->' so this will be treated as an unrecognised + # line and the op is never created. Confirm nothing crashes and ops is empty. + patch = """\ +*** Begin Patch +*** Move File: src/foo.py +*** End Patch""" + ops, err = parse_v4a_patch(patch) + # Either parse sees zero ops (fine) or returns an error (also fine). + # What is NOT acceptable is ops=[MOVE op with empty new_path] + err=None. + if ops: + assert err is not None, ( + "MOVE with missing destination must either produce empty ops or an error" + ) + + def test_valid_patch_returns_no_error(self): + """A well-formed patch must still return err=None.""" + patch = """\ +*** Begin Patch +*** Update File: f.py + ctx +-old ++new +*** End Patch""" + ops, err = parse_v4a_patch(patch) + assert err is None + assert len(ops) == 1 diff --git a/tools/file_operations.py b/tools/file_operations.py index f2b37505f..03ff45a23 100644 --- a/tools/file_operations.py +++ b/tools/file_operations.py @@ -252,23 +252,43 @@ class FileOperations(ABC): def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult: """Read a file with pagination support.""" ... - + + @abstractmethod + def read_file_raw(self, path: str) -> ReadResult: + """Read the complete file content as a plain string. + + No pagination, no line-number prefixes, no per-line truncation. + Returns ReadResult with .content = full file text, .error set on + failure. Always reads to EOF regardless of file size. + """ + ... + @abstractmethod def write_file(self, path: str, content: str) -> WriteResult: """Write content to a file, creating directories as needed.""" ... - + @abstractmethod - def patch_replace(self, path: str, old_string: str, new_string: str, + def patch_replace(self, path: str, old_string: str, new_string: str, replace_all: bool = False) -> PatchResult: """Replace text in a file using fuzzy matching.""" ... - + @abstractmethod def patch_v4a(self, patch_content: str) -> PatchResult: """Apply a V4A format patch.""" ... - + + @abstractmethod + def delete_file(self, path: str) -> WriteResult: + """Delete a file. Returns WriteResult with .error set on failure.""" + ... + + @abstractmethod + def move_file(self, src: str, dst: str) -> WriteResult: + """Move/rename a file from src to dst. Returns WriteResult with .error set on failure.""" + ... + @abstractmethod def search(self, pattern: str, path: str = ".", target: str = "content", file_glob: Optional[str] = None, limit: int = 50, offset: int = 0, @@ -561,10 +581,62 @@ class ShellFileOperations(FileOperations): similar_files=similar[:5] # Limit to 5 suggestions ) + def read_file_raw(self, path: str) -> ReadResult: + """Read the complete file content as a plain string. + + No pagination, no line-number prefixes, no per-line truncation. + Uses cat so the full file is returned regardless of size. + """ + path = self._expand_path(path) + stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null" + stat_result = self._exec(stat_cmd) + if stat_result.exit_code != 0: + return self._suggest_similar_files(path) + try: + file_size = int(stat_result.stdout.strip()) + except ValueError: + file_size = 0 + if self._is_image(path): + return ReadResult(is_image=True, is_binary=True, file_size=file_size) + sample_result = self._exec(f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null") + if self._is_likely_binary(path, sample_result.stdout): + return ReadResult( + is_binary=True, file_size=file_size, + error="Binary file — cannot display as text." + ) + cat_result = self._exec(f"cat {self._escape_shell_arg(path)}") + if cat_result.exit_code != 0: + return ReadResult(error=f"Failed to read file: {cat_result.stdout}") + return ReadResult(content=cat_result.stdout, file_size=file_size) + + def delete_file(self, path: str) -> WriteResult: + """Delete a file via rm.""" + path = self._expand_path(path) + if _is_write_denied(path): + return WriteResult(error=f"Delete denied: {path} is a protected path") + result = self._exec(f"rm -f {self._escape_shell_arg(path)}") + if result.exit_code != 0: + return WriteResult(error=f"Failed to delete {path}: {result.stdout}") + return WriteResult() + + def move_file(self, src: str, dst: str) -> WriteResult: + """Move a file via mv.""" + src = self._expand_path(src) + dst = self._expand_path(dst) + for p in (src, dst): + if _is_write_denied(p): + return WriteResult(error=f"Move denied: {p} is a protected path") + result = self._exec( + f"mv {self._escape_shell_arg(src)} {self._escape_shell_arg(dst)}" + ) + if result.exit_code != 0: + return WriteResult(error=f"Failed to move {src} -> {dst}: {result.stdout}") + return WriteResult() + # ========================================================================= # WRITE Implementation # ========================================================================= - + def write_file(self, path: str, content: str) -> WriteResult: """ Write content to a file, creating parent directories as needed. @@ -656,7 +728,7 @@ class ShellFileOperations(FileOperations): # Import and use fuzzy matching from tools.fuzzy_match import fuzzy_find_and_replace - new_content, match_count, error = fuzzy_find_and_replace( + new_content, match_count, _strategy, error = fuzzy_find_and_replace( content, old_string, new_string, replace_all ) diff --git a/tools/fuzzy_match.py b/tools/fuzzy_match.py index 727e884eb..84833e0d0 100644 --- a/tools/fuzzy_match.py +++ b/tools/fuzzy_match.py @@ -21,7 +21,7 @@ Multi-occurrence matching is handled via the replace_all flag. Usage: from tools.fuzzy_match import fuzzy_find_and_replace - new_content, match_count, error = fuzzy_find_and_replace( + new_content, match_count, strategy, error = fuzzy_find_and_replace( content="def foo():\\n pass", old_string="def foo():", new_string="def bar():", @@ -48,27 +48,27 @@ def _unicode_normalize(text: str) -> str: def fuzzy_find_and_replace(content: str, old_string: str, new_string: str, - replace_all: bool = False) -> Tuple[str, int, Optional[str]]: + replace_all: bool = False) -> Tuple[str, int, Optional[str], Optional[str]]: """ Find and replace text using a chain of increasingly fuzzy matching strategies. - + Args: content: The file content to search in old_string: The text to find new_string: The replacement text replace_all: If True, replace all occurrences; if False, require uniqueness - + Returns: - Tuple of (new_content, match_count, error_message) - - If successful: (modified_content, number_of_replacements, None) - - If failed: (original_content, 0, error_description) + Tuple of (new_content, match_count, strategy_name, error_message) + - If successful: (modified_content, number_of_replacements, strategy_used, None) + - If failed: (original_content, 0, None, error_description) """ if not old_string: - return content, 0, "old_string cannot be empty" - + return content, 0, None, "old_string cannot be empty" + if old_string == new_string: - return content, 0, "old_string and new_string are identical" - + return content, 0, None, "old_string and new_string are identical" + # Try each matching strategy in order strategies: List[Tuple[str, Callable]] = [ ("exact", _strategy_exact), @@ -77,27 +77,28 @@ def fuzzy_find_and_replace(content: str, old_string: str, new_string: str, ("indentation_flexible", _strategy_indentation_flexible), ("escape_normalized", _strategy_escape_normalized), ("trimmed_boundary", _strategy_trimmed_boundary), + ("unicode_normalized", _strategy_unicode_normalized), ("block_anchor", _strategy_block_anchor), ("context_aware", _strategy_context_aware), ] - - for _strategy_name, strategy_fn in strategies: + + for strategy_name, strategy_fn in strategies: matches = strategy_fn(content, old_string) - + if matches: # Found matches with this strategy if len(matches) > 1 and not replace_all: - return content, 0, ( + return content, 0, None, ( f"Found {len(matches)} matches for old_string. " f"Provide more context to make it unique, or use replace_all=True." ) - + # Perform replacement new_content = _apply_replacements(content, matches, new_string) - return new_content, len(matches), None - + return new_content, len(matches), strategy_name, None + # No strategy found a match - return content, 0, "Could not find a match for old_string in the file" + return content, 0, None, "Could not find a match for old_string in the file" def _apply_replacements(content: str, matches: List[Tuple[int, int]], new_string: str) -> str: @@ -258,9 +259,90 @@ def _strategy_trimmed_boundary(content: str, pattern: str) -> List[Tuple[int, in return matches +def _build_orig_to_norm_map(original: str) -> List[int]: + """Build a list mapping each original character index to its normalized index. + + Because UNICODE_MAP replacements may expand characters (e.g. em-dash → '--', + ellipsis → '...'), the normalised string can be longer than the original. + This map lets us convert positions in the normalised string back to the + corresponding positions in the original string. + + Returns a list of length ``len(original) + 1``; entry ``i`` is the + normalised index that character ``i`` maps to. + """ + result: List[int] = [] + norm_pos = 0 + for char in original: + result.append(norm_pos) + repl = UNICODE_MAP.get(char) + norm_pos += len(repl) if repl is not None else 1 + result.append(norm_pos) # sentinel: one past the last character + return result + + +def _map_positions_norm_to_orig( + orig_to_norm: List[int], + norm_matches: List[Tuple[int, int]], +) -> List[Tuple[int, int]]: + """Convert (start, end) positions in the normalised string to original positions.""" + # Invert the map: norm_pos -> first original position with that norm_pos + norm_to_orig_start: dict[int, int] = {} + for orig_pos, norm_pos in enumerate(orig_to_norm[:-1]): + if norm_pos not in norm_to_orig_start: + norm_to_orig_start[norm_pos] = orig_pos + + results: List[Tuple[int, int]] = [] + orig_len = len(orig_to_norm) - 1 # number of original characters + + for norm_start, norm_end in norm_matches: + if norm_start not in norm_to_orig_start: + continue + orig_start = norm_to_orig_start[norm_start] + + # Walk forward until orig_to_norm[orig_end] >= norm_end + orig_end = orig_start + while orig_end < orig_len and orig_to_norm[orig_end] < norm_end: + orig_end += 1 + + results.append((orig_start, orig_end)) + + return results + + +def _strategy_unicode_normalized(content: str, pattern: str) -> List[Tuple[int, int]]: + """Strategy 7: Unicode normalisation. + + Normalises smart quotes, em/en-dashes, ellipsis, and non-breaking spaces + to their ASCII equivalents in both *content* and *pattern*, then runs + exact and line_trimmed matching on the normalised copies. + + Positions are mapped back to the *original* string via + ``_build_orig_to_norm_map`` — necessary because some UNICODE_MAP + replacements expand a single character into multiple ASCII characters, + making a naïve position copy incorrect. + """ + # Normalize both sides. Either the content or the pattern (or both) may + # carry unicode variants — e.g. content has an em-dash that should match + # the LLM's ASCII '--', or vice-versa. Skip only when neither changes. + norm_pattern = _unicode_normalize(pattern) + norm_content = _unicode_normalize(content) + if norm_content == content and norm_pattern == pattern: + return [] + + norm_matches = _strategy_exact(norm_content, norm_pattern) + if not norm_matches: + norm_matches = _strategy_line_trimmed(norm_content, norm_pattern) + + if not norm_matches: + return [] + + orig_to_norm = _build_orig_to_norm_map(content) + return _map_positions_norm_to_orig(orig_to_norm, norm_matches) + + def _strategy_block_anchor(content: str, pattern: str) -> List[Tuple[int, int]]: """ - Strategy 7: Match by anchoring on first and last lines. + Strategy 8: Match by anchoring on first and last lines. Adjusted with permissive thresholds and unicode normalization. """ # Normalize both strings for comparison while keeping original content for offset calculation @@ -290,8 +372,10 @@ def _strategy_block_anchor(content: str, pattern: str) -> List[Tuple[int, int]]: matches = [] candidate_count = len(potential_matches) - # Thresholding logic: 0.10 for unique matches (max flexibility), 0.30 for multiple candidates - threshold = 0.10 if candidate_count == 1 else 0.30 + # Thresholding logic: 0.50 for unique matches, 0.70 for multiple candidates. + # Previous values (0.10 / 0.30) were dangerously loose — a 10% middle-section + # similarity could match completely unrelated blocks. + threshold = 0.50 if candidate_count == 1 else 0.70 for i in potential_matches: if pattern_line_count <= 2: @@ -314,7 +398,7 @@ def _strategy_block_anchor(content: str, pattern: str) -> List[Tuple[int, int]]: def _strategy_context_aware(content: str, pattern: str) -> List[Tuple[int, int]]: """ - Strategy 8: Line-by-line similarity with 50% threshold. + Strategy 9: Line-by-line similarity with 50% threshold. Finds blocks where at least 50% of lines have high similarity. """ diff --git a/tools/patch_parser.py b/tools/patch_parser.py index 1a11f1413..0c961083c 100644 --- a/tools/patch_parser.py +++ b/tools/patch_parser.py @@ -28,6 +28,7 @@ Usage: result = apply_v4a_operations(operations, file_ops) """ +import difflib import re from dataclasses import dataclass, field from typing import List, Optional, Tuple, Any @@ -202,31 +203,162 @@ def parse_v4a_patch(patch_content: str) -> Tuple[List[PatchOperation], Optional[ if current_hunk and current_hunk.lines: current_op.hunks.append(current_hunk) operations.append(current_op) - + + # Validate the parsed result + if not operations: + # Empty patch is not an error — callers get [] and can decide + return operations, None + + parse_errors: List[str] = [] + for op in operations: + if not op.file_path: + parse_errors.append("Operation with empty file path") + if op.operation == OperationType.UPDATE and not op.hunks: + parse_errors.append(f"UPDATE {op.file_path!r}: no hunks found") + if op.operation == OperationType.MOVE and not op.new_path: + parse_errors.append(f"MOVE {op.file_path!r}: missing destination path (expected 'src -> dst')") + + if parse_errors: + return [], "Parse error: " + "; ".join(parse_errors) + return operations, None -def apply_v4a_operations(operations: List[PatchOperation], - file_ops: Any) -> 'PatchResult': +def _count_occurrences(text: str, pattern: str) -> int: + """Count non-overlapping occurrences of *pattern* in *text*.""" + count = 0 + start = 0 + while True: + pos = text.find(pattern, start) + if pos == -1: + break + count += 1 + start = pos + 1 + return count + + +def _validate_operations( + operations: List[PatchOperation], + file_ops: Any, +) -> List[str]: + """Validate all operations without writing any files. + + Returns a list of error strings; an empty list means all operations + are valid and the apply phase can proceed safely. + + For UPDATE operations, hunks are simulated in order so that later + hunks validate against post-earlier-hunk content (matching apply order). """ - Apply V4A patch operations using a file operations interface. - + # Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency + from tools.fuzzy_match import fuzzy_find_and_replace + + errors: List[str] = [] + + for op in operations: + if op.operation == OperationType.UPDATE: + read_result = file_ops.read_file_raw(op.file_path) + if read_result.error: + errors.append(f"{op.file_path}: {read_result.error}") + continue + + simulated = read_result.content + for hunk in op.hunks: + search_lines = [l.content for l in hunk.lines if l.prefix in (' ', '-')] + if not search_lines: + # Addition-only hunk: validate context hint uniqueness + if hunk.context_hint: + occurrences = _count_occurrences(simulated, hunk.context_hint) + if occurrences == 0: + errors.append( + f"{op.file_path}: addition-only hunk context hint " + f"'{hunk.context_hint}' not found" + ) + elif occurrences > 1: + errors.append( + f"{op.file_path}: addition-only hunk context hint " + f"'{hunk.context_hint}' is ambiguous " + f"({occurrences} occurrences)" + ) + continue + + search_pattern = '\n'.join(search_lines) + replace_lines = [l.content for l in hunk.lines if l.prefix in (' ', '+')] + replacement = '\n'.join(replace_lines) + + new_simulated, count, _strategy, match_error = fuzzy_find_and_replace( + simulated, search_pattern, replacement, replace_all=False + ) + if count == 0: + label = f"'{hunk.context_hint}'" if hunk.context_hint else "(no hint)" + errors.append( + f"{op.file_path}: hunk {label} not found" + + (f" — {match_error}" if match_error else "") + ) + else: + # Advance simulation so subsequent hunks validate correctly. + # Reuse the result from the call above — no second fuzzy run. + simulated = new_simulated + + elif op.operation == OperationType.DELETE: + read_result = file_ops.read_file_raw(op.file_path) + if read_result.error: + errors.append(f"{op.file_path}: file not found for deletion") + + elif op.operation == OperationType.MOVE: + if not op.new_path: + errors.append(f"{op.file_path}: MOVE operation missing destination path") + continue + src_result = file_ops.read_file_raw(op.file_path) + if src_result.error: + errors.append(f"{op.file_path}: source file not found for move") + dst_result = file_ops.read_file_raw(op.new_path) + if not dst_result.error: + errors.append( + f"{op.new_path}: destination already exists — move would overwrite" + ) + + # ADD: parent directory creation handled by write_file; no pre-check needed. + + return errors + + +def apply_v4a_operations(operations: List[PatchOperation], + file_ops: Any) -> 'PatchResult': + """Apply V4A patch operations using a file operations interface. + + Uses a two-phase validate-then-apply approach: + - Phase 1: validate all operations against current file contents without + writing anything. If any validation error is found, return immediately + with no filesystem changes. + - Phase 2: apply all operations. A failure here (e.g. a race between + validation and apply) is reported with a note to run ``git diff``. + Args: operations: List of PatchOperation from parse_v4a_patch - file_ops: Object with read_file, write_file methods - + file_ops: Object with read_file_raw, write_file methods + Returns: PatchResult with results of all operations """ # Import here to avoid circular imports from tools.file_operations import PatchResult - + + # ---- Phase 1: validate ---- + validation_errors = _validate_operations(operations, file_ops) + if validation_errors: + return PatchResult( + success=False, + error="Patch validation failed (no files were modified):\n" + + "\n".join(f" • {e}" for e in validation_errors), + ) + + # ---- Phase 2: apply ---- files_modified = [] files_created = [] files_deleted = [] all_diffs = [] errors = [] - + for op in operations: try: if op.operation == OperationType.ADD: @@ -236,7 +368,7 @@ def apply_v4a_operations(operations: List[PatchOperation], all_diffs.append(result[1]) else: errors.append(f"Failed to add {op.file_path}: {result[1]}") - + elif op.operation == OperationType.DELETE: result = _apply_delete(op, file_ops) if result[0]: @@ -244,7 +376,7 @@ def apply_v4a_operations(operations: List[PatchOperation], all_diffs.append(result[1]) else: errors.append(f"Failed to delete {op.file_path}: {result[1]}") - + elif op.operation == OperationType.MOVE: result = _apply_move(op, file_ops) if result[0]: @@ -252,7 +384,7 @@ def apply_v4a_operations(operations: List[PatchOperation], all_diffs.append(result[1]) else: errors.append(f"Failed to move {op.file_path}: {result[1]}") - + elif op.operation == OperationType.UPDATE: result = _apply_update(op, file_ops) if result[0]: @@ -260,19 +392,19 @@ def apply_v4a_operations(operations: List[PatchOperation], all_diffs.append(result[1]) else: errors.append(f"Failed to update {op.file_path}: {result[1]}") - + except Exception as e: errors.append(f"Error processing {op.file_path}: {str(e)}") - + # Run lint on all modified/created files lint_results = {} for f in files_modified + files_created: if hasattr(file_ops, '_check_lint'): lint_result = file_ops._check_lint(f) lint_results[f] = lint_result.to_dict() - + combined_diff = '\n'.join(all_diffs) - + if errors: return PatchResult( success=False, @@ -281,16 +413,17 @@ def apply_v4a_operations(operations: List[PatchOperation], files_created=files_created, files_deleted=files_deleted, lint=lint_results if lint_results else None, - error='; '.join(errors) + error="Apply phase failed (state may be inconsistent — run `git diff` to assess):\n" + + "\n".join(f" • {e}" for e in errors), ) - + return PatchResult( success=True, diff=combined_diff, files_modified=files_modified, files_created=files_created, files_deleted=files_deleted, - lint=lint_results if lint_results else None + lint=lint_results if lint_results else None, ) @@ -317,68 +450,56 @@ def _apply_add(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: def _apply_delete(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: """Apply a delete file operation.""" - # Read file first for diff - read_result = file_ops.read_file(op.file_path) - - if read_result.error and "not found" in read_result.error.lower(): - # File doesn't exist, nothing to delete - return True, f"# {op.file_path} already deleted or doesn't exist" - - # Delete directly via shell command using the underlying environment - rm_result = file_ops._exec(f"rm -f {file_ops._escape_shell_arg(op.file_path)}") - - if rm_result.exit_code != 0: - return False, rm_result.stdout - - diff = f"--- a/{op.file_path}\n+++ /dev/null\n# File deleted" - return True, diff + # Read before deleting so we can produce a real unified diff. + # Validation already confirmed existence; this guards against races. + read_result = file_ops.read_file_raw(op.file_path) + if read_result.error: + return False, f"Cannot delete {op.file_path}: file not found" + + result = file_ops.delete_file(op.file_path) + if result.error: + return False, result.error + + removed_lines = read_result.content.splitlines(keepends=True) + diff = ''.join(difflib.unified_diff( + removed_lines, [], + fromfile=f"a/{op.file_path}", + tofile="/dev/null", + )) + return True, diff or f"# Deleted: {op.file_path}" def _apply_move(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: """Apply a move file operation.""" - # Use shell mv command - mv_result = file_ops._exec( - f"mv {file_ops._escape_shell_arg(op.file_path)} {file_ops._escape_shell_arg(op.new_path)}" - ) - - if mv_result.exit_code != 0: - return False, mv_result.stdout - + result = file_ops.move_file(op.file_path, op.new_path) + if result.error: + return False, result.error + diff = f"# Moved: {op.file_path} -> {op.new_path}" return True, diff def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: """Apply an update file operation.""" - # Read current content - read_result = file_ops.read_file(op.file_path, limit=10000) - + # Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency + from tools.fuzzy_match import fuzzy_find_and_replace + + # Read current content — raw so no line-number prefixes or per-line truncation + read_result = file_ops.read_file_raw(op.file_path) + if read_result.error: return False, f"Cannot read file: {read_result.error}" - - # Parse content (remove line numbers) - current_lines = [] - for line in read_result.content.split('\n'): - if re.match(r'^\s*\d+\|', line): - # Line format: " 123|content" - parts = line.split('|', 1) - if len(parts) == 2: - current_lines.append(parts[1]) - else: - current_lines.append(line) - else: - current_lines.append(line) - - current_content = '\n'.join(current_lines) - + + current_content = read_result.content + # Apply each hunk new_content = current_content - + for hunk in op.hunks: # Build search pattern from context and removed lines search_lines = [] replace_lines = [] - + for line in hunk.lines: if line.prefix == ' ': search_lines.append(line.content) @@ -387,17 +508,15 @@ def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: search_lines.append(line.content) elif line.prefix == '+': replace_lines.append(line.content) - + if search_lines: search_pattern = '\n'.join(search_lines) replacement = '\n'.join(replace_lines) - - # Use fuzzy matching - from tools.fuzzy_match import fuzzy_find_and_replace - new_content, count, error = fuzzy_find_and_replace( + + new_content, count, _strategy, error = fuzzy_find_and_replace( new_content, search_pattern, replacement, replace_all=False ) - + if error and count == 0: # Try with context hint if available if hunk.context_hint: @@ -408,8 +527,8 @@ def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: window_start = max(0, hint_pos - 500) window_end = min(len(new_content), hint_pos + 2000) window = new_content[window_start:window_end] - - window_new, count, error = fuzzy_find_and_replace( + + window_new, count, _strategy, error = fuzzy_find_and_replace( window, search_pattern, replacement, replace_all=False ) @@ -424,16 +543,23 @@ def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: # Insert at the location indicated by the context hint, or at end of file. insert_text = '\n'.join(replace_lines) if hunk.context_hint: - hint_pos = new_content.find(hunk.context_hint) - if hint_pos != -1: + occurrences = _count_occurrences(new_content, hunk.context_hint) + if occurrences == 0: + # Hint not found — append at end as a safe fallback + new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n' + elif occurrences > 1: + return False, ( + f"Addition-only hunk: context hint '{hunk.context_hint}' is ambiguous " + f"({occurrences} occurrences) — provide a more unique hint" + ) + else: + hint_pos = new_content.find(hunk.context_hint) # Insert after the line containing the context hint eol = new_content.find('\n', hint_pos) if eol != -1: new_content = new_content[:eol + 1] + insert_text + '\n' + new_content[eol + 1:] else: new_content = new_content + '\n' + insert_text - else: - new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n' else: new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n' @@ -443,7 +569,6 @@ def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]: return False, write_result.error # Generate diff - import difflib diff_lines = difflib.unified_diff( current_content.splitlines(keepends=True), new_content.splitlines(keepends=True), diff --git a/tools/skill_manager_tool.py b/tools/skill_manager_tool.py index 8a513c69d..2273d75fa 100644 --- a/tools/skill_manager_tool.py +++ b/tools/skill_manager_tool.py @@ -426,7 +426,7 @@ def _patch_skill( # from exact-match failures on minor formatting mismatches. from tools.fuzzy_match import fuzzy_find_and_replace - new_content, match_count, match_error = fuzzy_find_and_replace( + new_content, match_count, _strategy, match_error = fuzzy_find_and_replace( content, old_string, new_string, replace_all ) if match_error: