mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
- Bug 1: replace read_file(limit=10000) with read_file_raw in _apply_update, preventing silent truncation of files >2000 lines and corruption of lines >2000 chars; add read_file_raw to FileOperations abstract interface and ShellFileOperations - Bug 2: split apply_v4a_operations into validate-then-apply phases; if any hunk fails validation, zero writes occur (was: continue after failure, leaving filesystem partially modified) - Bug 3: parse_v4a_patch now returns an error for begin-marker-with-no-ops, empty file paths, and moves missing a destination (was: always returned error=None) - Bug 4: raise strategy 7 (block anchor) single-candidate similarity threshold from 0.10 to 0.50, eliminating false-positive matches in repetitive code - Bug 5: add _strategy_unicode_normalized (new strategy 7) with position mapping via _build_orig_to_norm_map; smart quotes and em-dashes in LLM-generated patches now match via strategies 1-6 before falling through to fuzzy strategies - Bug 6: extend fuzzy_find_and_replace to return 4-tuple (content, count, error, strategy); update all 5 call sites across patch_parser.py, file_operations.py, and skill_manager_tool.py - Bug 7: guard in _apply_update returns error when addition-only context hint is ambiguous (>1 occurrences); validation phase errors on both 0 and >1 - Bug 8: _apply_delete returns error (not silent success) on missing file - Bug 9: _validate_operations checks source existence and destination absence for MOVE operations before any write occurs
580 lines
21 KiB
Python
580 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
V4A Patch Format Parser
|
|
|
|
Parses the V4A patch format used by codex, cline, and other coding agents.
|
|
|
|
V4A Format:
|
|
*** Begin Patch
|
|
*** Update File: path/to/file.py
|
|
@@ optional context hint @@
|
|
context line (space prefix)
|
|
-removed line (minus prefix)
|
|
+added line (plus prefix)
|
|
*** Add File: path/to/new.py
|
|
+new file content
|
|
+line 2
|
|
*** Delete File: path/to/old.py
|
|
*** Move File: old/path.py -> new/path.py
|
|
*** End Patch
|
|
|
|
Usage:
|
|
from tools.patch_parser import parse_v4a_patch, apply_v4a_operations
|
|
|
|
operations, error = parse_v4a_patch(patch_content)
|
|
if error:
|
|
print(f"Parse error: {error}")
|
|
else:
|
|
result = apply_v4a_operations(operations, file_ops)
|
|
"""
|
|
|
|
import difflib
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Optional, Tuple, Any
|
|
from enum import Enum
|
|
|
|
|
|
class OperationType(Enum):
|
|
ADD = "add"
|
|
UPDATE = "update"
|
|
DELETE = "delete"
|
|
MOVE = "move"
|
|
|
|
|
|
@dataclass
|
|
class HunkLine:
|
|
"""A single line in a patch hunk."""
|
|
prefix: str # ' ', '-', or '+'
|
|
content: str
|
|
|
|
|
|
@dataclass
|
|
class Hunk:
|
|
"""A group of changes within a file."""
|
|
context_hint: Optional[str] = None
|
|
lines: List[HunkLine] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class PatchOperation:
|
|
"""A single operation in a V4A patch."""
|
|
operation: OperationType
|
|
file_path: str
|
|
new_path: Optional[str] = None # For move operations
|
|
hunks: List[Hunk] = field(default_factory=list)
|
|
content: Optional[str] = None # For add file operations
|
|
|
|
|
|
def parse_v4a_patch(patch_content: str) -> Tuple[List[PatchOperation], Optional[str]]:
|
|
"""
|
|
Parse a V4A format patch.
|
|
|
|
Args:
|
|
patch_content: The patch text in V4A format
|
|
|
|
Returns:
|
|
Tuple of (operations, error_message)
|
|
- If successful: (list_of_operations, None)
|
|
- If failed: ([], error_description)
|
|
"""
|
|
lines = patch_content.split('\n')
|
|
operations: List[PatchOperation] = []
|
|
|
|
# Find patch boundaries
|
|
start_idx = None
|
|
end_idx = None
|
|
|
|
for i, line in enumerate(lines):
|
|
if '*** Begin Patch' in line or '***Begin Patch' in line:
|
|
start_idx = i
|
|
elif '*** End Patch' in line or '***End Patch' in line:
|
|
end_idx = i
|
|
break
|
|
|
|
if start_idx is None:
|
|
# Try to parse without explicit begin marker
|
|
start_idx = -1
|
|
|
|
if end_idx is None:
|
|
end_idx = len(lines)
|
|
|
|
# Parse operations between boundaries
|
|
i = start_idx + 1
|
|
current_op: Optional[PatchOperation] = None
|
|
current_hunk: Optional[Hunk] = None
|
|
|
|
while i < end_idx:
|
|
line = lines[i]
|
|
|
|
# Check for file operation markers
|
|
update_match = re.match(r'\*\*\*\s*Update\s+File:\s*(.+)', line)
|
|
add_match = re.match(r'\*\*\*\s*Add\s+File:\s*(.+)', line)
|
|
delete_match = re.match(r'\*\*\*\s*Delete\s+File:\s*(.+)', line)
|
|
move_match = re.match(r'\*\*\*\s*Move\s+File:\s*(.+?)\s*->\s*(.+)', line)
|
|
|
|
if update_match:
|
|
# Save previous operation
|
|
if current_op:
|
|
if current_hunk and current_hunk.lines:
|
|
current_op.hunks.append(current_hunk)
|
|
operations.append(current_op)
|
|
|
|
current_op = PatchOperation(
|
|
operation=OperationType.UPDATE,
|
|
file_path=update_match.group(1).strip()
|
|
)
|
|
current_hunk = None
|
|
|
|
elif add_match:
|
|
if current_op:
|
|
if current_hunk and current_hunk.lines:
|
|
current_op.hunks.append(current_hunk)
|
|
operations.append(current_op)
|
|
|
|
current_op = PatchOperation(
|
|
operation=OperationType.ADD,
|
|
file_path=add_match.group(1).strip()
|
|
)
|
|
current_hunk = Hunk()
|
|
|
|
elif delete_match:
|
|
if current_op:
|
|
if current_hunk and current_hunk.lines:
|
|
current_op.hunks.append(current_hunk)
|
|
operations.append(current_op)
|
|
|
|
current_op = PatchOperation(
|
|
operation=OperationType.DELETE,
|
|
file_path=delete_match.group(1).strip()
|
|
)
|
|
operations.append(current_op)
|
|
current_op = None
|
|
current_hunk = None
|
|
|
|
elif move_match:
|
|
if current_op:
|
|
if current_hunk and current_hunk.lines:
|
|
current_op.hunks.append(current_hunk)
|
|
operations.append(current_op)
|
|
|
|
current_op = PatchOperation(
|
|
operation=OperationType.MOVE,
|
|
file_path=move_match.group(1).strip(),
|
|
new_path=move_match.group(2).strip()
|
|
)
|
|
operations.append(current_op)
|
|
current_op = None
|
|
current_hunk = None
|
|
|
|
elif line.startswith('@@'):
|
|
# Context hint / hunk marker
|
|
if current_op:
|
|
if current_hunk and current_hunk.lines:
|
|
current_op.hunks.append(current_hunk)
|
|
|
|
# Extract context hint
|
|
hint_match = re.match(r'@@\s*(.+?)\s*@@', line)
|
|
hint = hint_match.group(1) if hint_match else None
|
|
current_hunk = Hunk(context_hint=hint)
|
|
|
|
elif current_op and line:
|
|
# Parse hunk line
|
|
if current_hunk is None:
|
|
current_hunk = Hunk()
|
|
|
|
if line.startswith('+'):
|
|
current_hunk.lines.append(HunkLine('+', line[1:]))
|
|
elif line.startswith('-'):
|
|
current_hunk.lines.append(HunkLine('-', line[1:]))
|
|
elif line.startswith(' '):
|
|
current_hunk.lines.append(HunkLine(' ', line[1:]))
|
|
elif line.startswith('\\'):
|
|
# "\ No newline at end of file" marker - skip
|
|
pass
|
|
else:
|
|
# Treat as context line (implicit space prefix)
|
|
current_hunk.lines.append(HunkLine(' ', line))
|
|
|
|
i += 1
|
|
|
|
# Don't forget the last operation
|
|
if current_op:
|
|
if current_hunk and current_hunk.lines:
|
|
current_op.hunks.append(current_hunk)
|
|
operations.append(current_op)
|
|
|
|
# Validate the parsed result
|
|
if not operations:
|
|
# Empty patch is not an error — callers get [] and can decide
|
|
return operations, None
|
|
|
|
parse_errors: List[str] = []
|
|
for op in operations:
|
|
if not op.file_path:
|
|
parse_errors.append("Operation with empty file path")
|
|
if op.operation == OperationType.UPDATE and not op.hunks:
|
|
parse_errors.append(f"UPDATE {op.file_path!r}: no hunks found")
|
|
if op.operation == OperationType.MOVE and not op.new_path:
|
|
parse_errors.append(f"MOVE {op.file_path!r}: missing destination path (expected 'src -> dst')")
|
|
|
|
if parse_errors:
|
|
return [], "Parse error: " + "; ".join(parse_errors)
|
|
|
|
return operations, None
|
|
|
|
|
|
def _count_occurrences(text: str, pattern: str) -> int:
|
|
"""Count non-overlapping occurrences of *pattern* in *text*."""
|
|
count = 0
|
|
start = 0
|
|
while True:
|
|
pos = text.find(pattern, start)
|
|
if pos == -1:
|
|
break
|
|
count += 1
|
|
start = pos + 1
|
|
return count
|
|
|
|
|
|
def _validate_operations(
|
|
operations: List[PatchOperation],
|
|
file_ops: Any,
|
|
) -> List[str]:
|
|
"""Validate all operations without writing any files.
|
|
|
|
Returns a list of error strings; an empty list means all operations
|
|
are valid and the apply phase can proceed safely.
|
|
|
|
For UPDATE operations, hunks are simulated in order so that later
|
|
hunks validate against post-earlier-hunk content (matching apply order).
|
|
"""
|
|
# Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency
|
|
from tools.fuzzy_match import fuzzy_find_and_replace
|
|
|
|
errors: List[str] = []
|
|
|
|
for op in operations:
|
|
if op.operation == OperationType.UPDATE:
|
|
read_result = file_ops.read_file_raw(op.file_path)
|
|
if read_result.error:
|
|
errors.append(f"{op.file_path}: {read_result.error}")
|
|
continue
|
|
|
|
simulated = read_result.content
|
|
for hunk in op.hunks:
|
|
search_lines = [l.content for l in hunk.lines if l.prefix in (' ', '-')]
|
|
if not search_lines:
|
|
# Addition-only hunk: validate context hint uniqueness
|
|
if hunk.context_hint:
|
|
occurrences = _count_occurrences(simulated, hunk.context_hint)
|
|
if occurrences == 0:
|
|
errors.append(
|
|
f"{op.file_path}: addition-only hunk context hint "
|
|
f"'{hunk.context_hint}' not found"
|
|
)
|
|
elif occurrences > 1:
|
|
errors.append(
|
|
f"{op.file_path}: addition-only hunk context hint "
|
|
f"'{hunk.context_hint}' is ambiguous "
|
|
f"({occurrences} occurrences)"
|
|
)
|
|
continue
|
|
|
|
search_pattern = '\n'.join(search_lines)
|
|
replace_lines = [l.content for l in hunk.lines if l.prefix in (' ', '+')]
|
|
replacement = '\n'.join(replace_lines)
|
|
|
|
new_simulated, count, _strategy, match_error = fuzzy_find_and_replace(
|
|
simulated, search_pattern, replacement, replace_all=False
|
|
)
|
|
if count == 0:
|
|
label = f"'{hunk.context_hint}'" if hunk.context_hint else "(no hint)"
|
|
errors.append(
|
|
f"{op.file_path}: hunk {label} not found"
|
|
+ (f" — {match_error}" if match_error else "")
|
|
)
|
|
else:
|
|
# Advance simulation so subsequent hunks validate correctly.
|
|
# Reuse the result from the call above — no second fuzzy run.
|
|
simulated = new_simulated
|
|
|
|
elif op.operation == OperationType.DELETE:
|
|
read_result = file_ops.read_file_raw(op.file_path)
|
|
if read_result.error:
|
|
errors.append(f"{op.file_path}: file not found for deletion")
|
|
|
|
elif op.operation == OperationType.MOVE:
|
|
if not op.new_path:
|
|
errors.append(f"{op.file_path}: MOVE operation missing destination path")
|
|
continue
|
|
src_result = file_ops.read_file_raw(op.file_path)
|
|
if src_result.error:
|
|
errors.append(f"{op.file_path}: source file not found for move")
|
|
dst_result = file_ops.read_file_raw(op.new_path)
|
|
if not dst_result.error:
|
|
errors.append(
|
|
f"{op.new_path}: destination already exists — move would overwrite"
|
|
)
|
|
|
|
# ADD: parent directory creation handled by write_file; no pre-check needed.
|
|
|
|
return errors
|
|
|
|
|
|
def apply_v4a_operations(operations: List[PatchOperation],
|
|
file_ops: Any) -> 'PatchResult':
|
|
"""Apply V4A patch operations using a file operations interface.
|
|
|
|
Uses a two-phase validate-then-apply approach:
|
|
- Phase 1: validate all operations against current file contents without
|
|
writing anything. If any validation error is found, return immediately
|
|
with no filesystem changes.
|
|
- Phase 2: apply all operations. A failure here (e.g. a race between
|
|
validation and apply) is reported with a note to run ``git diff``.
|
|
|
|
Args:
|
|
operations: List of PatchOperation from parse_v4a_patch
|
|
file_ops: Object with read_file_raw, write_file methods
|
|
|
|
Returns:
|
|
PatchResult with results of all operations
|
|
"""
|
|
# Import here to avoid circular imports
|
|
from tools.file_operations import PatchResult
|
|
|
|
# ---- Phase 1: validate ----
|
|
validation_errors = _validate_operations(operations, file_ops)
|
|
if validation_errors:
|
|
return PatchResult(
|
|
success=False,
|
|
error="Patch validation failed (no files were modified):\n"
|
|
+ "\n".join(f" • {e}" for e in validation_errors),
|
|
)
|
|
|
|
# ---- Phase 2: apply ----
|
|
files_modified = []
|
|
files_created = []
|
|
files_deleted = []
|
|
all_diffs = []
|
|
errors = []
|
|
|
|
for op in operations:
|
|
try:
|
|
if op.operation == OperationType.ADD:
|
|
result = _apply_add(op, file_ops)
|
|
if result[0]:
|
|
files_created.append(op.file_path)
|
|
all_diffs.append(result[1])
|
|
else:
|
|
errors.append(f"Failed to add {op.file_path}: {result[1]}")
|
|
|
|
elif op.operation == OperationType.DELETE:
|
|
result = _apply_delete(op, file_ops)
|
|
if result[0]:
|
|
files_deleted.append(op.file_path)
|
|
all_diffs.append(result[1])
|
|
else:
|
|
errors.append(f"Failed to delete {op.file_path}: {result[1]}")
|
|
|
|
elif op.operation == OperationType.MOVE:
|
|
result = _apply_move(op, file_ops)
|
|
if result[0]:
|
|
files_modified.append(f"{op.file_path} -> {op.new_path}")
|
|
all_diffs.append(result[1])
|
|
else:
|
|
errors.append(f"Failed to move {op.file_path}: {result[1]}")
|
|
|
|
elif op.operation == OperationType.UPDATE:
|
|
result = _apply_update(op, file_ops)
|
|
if result[0]:
|
|
files_modified.append(op.file_path)
|
|
all_diffs.append(result[1])
|
|
else:
|
|
errors.append(f"Failed to update {op.file_path}: {result[1]}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing {op.file_path}: {str(e)}")
|
|
|
|
# Run lint on all modified/created files
|
|
lint_results = {}
|
|
for f in files_modified + files_created:
|
|
if hasattr(file_ops, '_check_lint'):
|
|
lint_result = file_ops._check_lint(f)
|
|
lint_results[f] = lint_result.to_dict()
|
|
|
|
combined_diff = '\n'.join(all_diffs)
|
|
|
|
if errors:
|
|
return PatchResult(
|
|
success=False,
|
|
diff=combined_diff,
|
|
files_modified=files_modified,
|
|
files_created=files_created,
|
|
files_deleted=files_deleted,
|
|
lint=lint_results if lint_results else None,
|
|
error="Apply phase failed (state may be inconsistent — run `git diff` to assess):\n"
|
|
+ "\n".join(f" • {e}" for e in errors),
|
|
)
|
|
|
|
return PatchResult(
|
|
success=True,
|
|
diff=combined_diff,
|
|
files_modified=files_modified,
|
|
files_created=files_created,
|
|
files_deleted=files_deleted,
|
|
lint=lint_results if lint_results else None,
|
|
)
|
|
|
|
|
|
def _apply_add(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
|
|
"""Apply an add file operation."""
|
|
# Extract content from hunks (all + lines)
|
|
content_lines = []
|
|
for hunk in op.hunks:
|
|
for line in hunk.lines:
|
|
if line.prefix == '+':
|
|
content_lines.append(line.content)
|
|
|
|
content = '\n'.join(content_lines)
|
|
|
|
result = file_ops.write_file(op.file_path, content)
|
|
if result.error:
|
|
return False, result.error
|
|
|
|
diff = f"--- /dev/null\n+++ b/{op.file_path}\n"
|
|
diff += '\n'.join(f"+{line}" for line in content_lines)
|
|
|
|
return True, diff
|
|
|
|
|
|
def _apply_delete(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
|
|
"""Apply a delete file operation."""
|
|
# Read before deleting so we can produce a real unified diff.
|
|
# Validation already confirmed existence; this guards against races.
|
|
read_result = file_ops.read_file_raw(op.file_path)
|
|
if read_result.error:
|
|
return False, f"Cannot delete {op.file_path}: file not found"
|
|
|
|
result = file_ops.delete_file(op.file_path)
|
|
if result.error:
|
|
return False, result.error
|
|
|
|
removed_lines = read_result.content.splitlines(keepends=True)
|
|
diff = ''.join(difflib.unified_diff(
|
|
removed_lines, [],
|
|
fromfile=f"a/{op.file_path}",
|
|
tofile="/dev/null",
|
|
))
|
|
return True, diff or f"# Deleted: {op.file_path}"
|
|
|
|
|
|
def _apply_move(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
|
|
"""Apply a move file operation."""
|
|
result = file_ops.move_file(op.file_path, op.new_path)
|
|
if result.error:
|
|
return False, result.error
|
|
|
|
diff = f"# Moved: {op.file_path} -> {op.new_path}"
|
|
return True, diff
|
|
|
|
|
|
def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
|
|
"""Apply an update file operation."""
|
|
# Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency
|
|
from tools.fuzzy_match import fuzzy_find_and_replace
|
|
|
|
# Read current content — raw so no line-number prefixes or per-line truncation
|
|
read_result = file_ops.read_file_raw(op.file_path)
|
|
|
|
if read_result.error:
|
|
return False, f"Cannot read file: {read_result.error}"
|
|
|
|
current_content = read_result.content
|
|
|
|
# Apply each hunk
|
|
new_content = current_content
|
|
|
|
for hunk in op.hunks:
|
|
# Build search pattern from context and removed lines
|
|
search_lines = []
|
|
replace_lines = []
|
|
|
|
for line in hunk.lines:
|
|
if line.prefix == ' ':
|
|
search_lines.append(line.content)
|
|
replace_lines.append(line.content)
|
|
elif line.prefix == '-':
|
|
search_lines.append(line.content)
|
|
elif line.prefix == '+':
|
|
replace_lines.append(line.content)
|
|
|
|
if search_lines:
|
|
search_pattern = '\n'.join(search_lines)
|
|
replacement = '\n'.join(replace_lines)
|
|
|
|
new_content, count, _strategy, error = fuzzy_find_and_replace(
|
|
new_content, search_pattern, replacement, replace_all=False
|
|
)
|
|
|
|
if error and count == 0:
|
|
# Try with context hint if available
|
|
if hunk.context_hint:
|
|
# Find the context hint location and search nearby
|
|
hint_pos = new_content.find(hunk.context_hint)
|
|
if hint_pos != -1:
|
|
# Search in a window around the hint
|
|
window_start = max(0, hint_pos - 500)
|
|
window_end = min(len(new_content), hint_pos + 2000)
|
|
window = new_content[window_start:window_end]
|
|
|
|
window_new, count, _strategy, error = fuzzy_find_and_replace(
|
|
window, search_pattern, replacement, replace_all=False
|
|
)
|
|
|
|
if count > 0:
|
|
new_content = new_content[:window_start] + window_new + new_content[window_end:]
|
|
error = None
|
|
|
|
if error:
|
|
return False, f"Could not apply hunk: {error}"
|
|
else:
|
|
# Addition-only hunk (no context or removed lines).
|
|
# Insert at the location indicated by the context hint, or at end of file.
|
|
insert_text = '\n'.join(replace_lines)
|
|
if hunk.context_hint:
|
|
occurrences = _count_occurrences(new_content, hunk.context_hint)
|
|
if occurrences == 0:
|
|
# Hint not found — append at end as a safe fallback
|
|
new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n'
|
|
elif occurrences > 1:
|
|
return False, (
|
|
f"Addition-only hunk: context hint '{hunk.context_hint}' is ambiguous "
|
|
f"({occurrences} occurrences) — provide a more unique hint"
|
|
)
|
|
else:
|
|
hint_pos = new_content.find(hunk.context_hint)
|
|
# Insert after the line containing the context hint
|
|
eol = new_content.find('\n', hint_pos)
|
|
if eol != -1:
|
|
new_content = new_content[:eol + 1] + insert_text + '\n' + new_content[eol + 1:]
|
|
else:
|
|
new_content = new_content + '\n' + insert_text
|
|
else:
|
|
new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n'
|
|
|
|
# Write new content
|
|
write_result = file_ops.write_file(op.file_path, new_content)
|
|
if write_result.error:
|
|
return False, write_result.error
|
|
|
|
# Generate diff
|
|
diff_lines = difflib.unified_diff(
|
|
current_content.splitlines(keepends=True),
|
|
new_content.splitlines(keepends=True),
|
|
fromfile=f"a/{op.file_path}",
|
|
tofile=f"b/{op.file_path}"
|
|
)
|
|
diff = ''.join(diff_lines)
|
|
|
|
return True, diff
|