hermes-agent/tools/file_operations.py
jani 0df80f4391 docs: align terminal-backend count and naming across docs and code
README:24 claimed "Six terminal backends" while tools/environments/ exposes
seven top-level backend choices through TERMINAL_ENV: local, docker, ssh,
singularity, modal, daytona, vercel_sandbox. Modal additionally has direct
and Nous-managed modes selected via terminal.modal_mode (the
ManagedModalEnvironment class is a Modal sub-mode, not a separate top-level
backend).

The same drift appeared in five other doc and code-comment sites with
inconsistent counts (six, seven, or implicit) and varying lists. Updated
all sites to a consistent seven-backend list in canonical order. The
configuration guide also clarifies how Modal's two modes are selected so
operators do not search for a non-existent backend: managed_modal value.

CONTRIBUTING.md:160 lists six backend filenames in a code tree but does
not carry the "Six terminal" prose; left out of scope per cohesion sweep
guidance to bundle only identical wording.

Files updated:
- README.md (line 24, marketing copy)
- website/docs/index.md (line 49, landing page)
- website/docs/user-guide/configuration.md (line 86, config guide)
- tools/environments/__init__.py (lines 3-6, package docstring)
- tools/file_operations.py (line 6, module docstring)
- environments/README.md (line 43, RL training docs — TERMINAL_ENV list)
2026-05-05 13:44:09 -07:00

1561 lines
63 KiB
Python

#!/usr/bin/env python3
"""
File Operations Module
Provides file manipulation capabilities (read, write, patch, search) that work
across all terminal backends (local, docker, ssh, singularity, modal, daytona, vercel_sandbox).
The key insight is that all file operations can be expressed as shell commands,
so we wrap the terminal backend's execute() interface to provide a unified file API.
Usage:
from tools.file_operations import ShellFileOperations
from tools.terminal_tool import _active_environments
# Get file operations for a terminal environment
file_ops = ShellFileOperations(terminal_env)
# Read a file
result = file_ops.read_file("/path/to/file.py")
# Write a file
result = file_ops.write_file("/path/to/new.py", "print('hello')")
# Search for content
result = file_ops.search("TODO", path=".", file_glob="*.py")
"""
import os
import re
import difflib
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from pathlib import Path
from tools.binary_extensions import BINARY_EXTENSIONS
from agent.file_safety import (
build_write_denied_paths,
build_write_denied_prefixes,
get_safe_write_root as _shared_get_safe_write_root,
is_write_denied as _shared_is_write_denied,
)
# ---------------------------------------------------------------------------
# Write-path deny list — blocks writes to sensitive system/credential files
# ---------------------------------------------------------------------------
_HOME = str(Path.home())
WRITE_DENIED_PATHS = build_write_denied_paths(_HOME)
WRITE_DENIED_PREFIXES = build_write_denied_prefixes(_HOME)
_OSC_SEQUENCE_RE = re.compile(r"\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)")
_FENCE_MARKER_RE = re.compile(r"'?\x07?__HERMES_FENCE_[A-Za-z0-9]+__\x07?'?")
def _strip_terminal_fence_leaks(text: str) -> str:
"""Strip leaked terminal fence wrappers from file read output."""
if not text:
return text
cleaned_lines: List[str] = []
for line in text.splitlines(keepends=True):
had_terminal_wrapper = "__HERMES_FENCE_" in line or "\x1b]" in line
cleaned = _OSC_SEQUENCE_RE.sub("", line)
cleaned = _FENCE_MARKER_RE.sub("", cleaned)
cleaned = cleaned.replace("\x07", "")
if had_terminal_wrapper and cleaned.strip("'\r\n\t ") == "":
continue
cleaned_lines.append(cleaned)
return "".join(cleaned_lines)
def _get_safe_write_root() -> Optional[str]:
"""Return the resolved HERMES_WRITE_SAFE_ROOT path, or None if unset.
When set, all write_file/patch operations are constrained to this
directory tree. Writes outside it are denied even if the target is
not on the static deny list. Opt-in hardening for gateway/messaging
deployments that should only touch a workspace checkout.
"""
return _shared_get_safe_write_root()
def _is_write_denied(path: str) -> bool:
"""Return True if path is on the write deny list."""
return _shared_is_write_denied(path)
# =============================================================================
# Result Data Classes
# =============================================================================
@dataclass
class ReadResult:
"""Result from reading a file."""
content: str = ""
total_lines: int = 0
file_size: int = 0
truncated: bool = False
hint: Optional[str] = None
is_binary: bool = False
is_image: bool = False
base64_content: Optional[str] = None
mime_type: Optional[str] = None
dimensions: Optional[str] = None # For images: "WIDTHxHEIGHT"
error: Optional[str] = None
similar_files: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {k: v for k, v in self.__dict__.items() if v is not None and v != []}
@dataclass
class WriteResult:
"""Result from writing a file."""
bytes_written: int = 0
dirs_created: bool = False
lint: Optional[Dict[str, Any]] = None
error: Optional[str] = None
warning: Optional[str] = None
def to_dict(self) -> dict:
return {k: v for k, v in self.__dict__.items() if v is not None}
@dataclass
class PatchResult:
"""Result from patching a file."""
success: bool = False
diff: str = ""
files_modified: List[str] = field(default_factory=list)
files_created: List[str] = field(default_factory=list)
files_deleted: List[str] = field(default_factory=list)
lint: Optional[Dict[str, Any]] = None
error: Optional[str] = None
def to_dict(self) -> dict:
result = {"success": self.success}
if self.diff:
result["diff"] = self.diff
if self.files_modified:
result["files_modified"] = self.files_modified
if self.files_created:
result["files_created"] = self.files_created
if self.files_deleted:
result["files_deleted"] = self.files_deleted
if self.lint:
result["lint"] = self.lint
if self.error:
result["error"] = self.error
return result
@dataclass
class SearchMatch:
"""A single search match."""
path: str
line_number: int
content: str
mtime: float = 0.0 # Modification time for sorting
@dataclass
class SearchResult:
"""Result from searching."""
matches: List[SearchMatch] = field(default_factory=list)
files: List[str] = field(default_factory=list)
counts: Dict[str, int] = field(default_factory=dict)
total_count: int = 0
truncated: bool = False
error: Optional[str] = None
def to_dict(self) -> dict:
result = {"total_count": self.total_count}
if self.matches:
result["matches"] = [
{"path": m.path, "line": m.line_number, "content": m.content}
for m in self.matches
]
if self.files:
result["files"] = self.files
if self.counts:
result["counts"] = self.counts
if self.truncated:
result["truncated"] = True
if self.error:
result["error"] = self.error
return result
@dataclass
class LintResult:
"""Result from linting a file."""
success: bool = True
skipped: bool = False
output: str = ""
message: str = ""
def to_dict(self) -> dict:
if self.skipped:
return {"status": "skipped", "message": self.message}
result = {"status": "ok" if self.success else "error", "output": self.output}
if self.message:
result["message"] = self.message
return result
@dataclass
class ExecuteResult:
"""Result from executing a shell command."""
stdout: str = ""
exit_code: int = 0
def _parse_search_context_line(line: str) -> tuple[str, int, str] | None:
"""Parse grep/rg context output in ``path-line-content`` format.
Context lines are ambiguous because filenames may legitimately contain
``-<digits>-`` segments. Prefer the rightmost numeric separator so a path
like ``dir/file-12-name.py-8-context`` resolves to
``dir/file-12-name.py`` line ``8`` instead of truncating at ``file``.
"""
if not line or line == "--":
return None
match = None
for candidate in re.finditer(r'-(\d+)-', line):
match = candidate
if match is None:
return None
path = line[:match.start()]
if not path:
return None
return path, int(match.group(1)), line[match.end():]
# =============================================================================
# Abstract Interface
# =============================================================================
class FileOperations(ABC):
"""Abstract interface for file operations across terminal backends."""
@abstractmethod
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
"""Read a file with pagination support."""
...
@abstractmethod
def read_file_raw(self, path: str) -> ReadResult:
"""Read the complete file content as a plain string.
No pagination, no line-number prefixes, no per-line truncation.
Returns ReadResult with .content = full file text, .error set on
failure. Always reads to EOF regardless of file size.
"""
...
@abstractmethod
def write_file(self, path: str, content: str) -> WriteResult:
"""Write content to a file, creating directories as needed."""
...
@abstractmethod
def patch_replace(self, path: str, old_string: str, new_string: str,
replace_all: bool = False) -> PatchResult:
"""Replace text in a file using fuzzy matching."""
...
@abstractmethod
def patch_v4a(self, patch_content: str) -> PatchResult:
"""Apply a V4A format patch."""
...
@abstractmethod
def delete_file(self, path: str) -> WriteResult:
"""Delete a file. Returns WriteResult with .error set on failure."""
...
@abstractmethod
def move_file(self, src: str, dst: str) -> WriteResult:
"""Move/rename a file from src to dst. Returns WriteResult with .error set on failure."""
...
@abstractmethod
def search(self, pattern: str, path: str = ".", target: str = "content",
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
output_mode: str = "content", context: int = 0) -> SearchResult:
"""Search for content or files."""
...
# =============================================================================
# Shell-based Implementation
# =============================================================================
# Image extensions (subset of binary that we can return as base64)
IMAGE_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.ico'}
# Shell-based linters by file extension. Invoked via _exec() with the
# filesystem path. Cover languages where a compile/type check needs an
# external toolchain (py_compile, node, tsc, go vet, rustfmt).
LINTERS = {
'.py': 'python -m py_compile {file} 2>&1',
'.js': 'node --check {file} 2>&1',
'.ts': 'npx tsc --noEmit {file} 2>&1',
'.go': 'go vet {file} 2>&1',
'.rs': 'rustfmt --check {file} 2>&1',
}
def _lint_json_inproc(content: str) -> tuple[bool, str]:
"""In-process JSON syntax check. Returns (ok, error_message)."""
import json as _json
try:
_json.loads(content)
return True, ""
except _json.JSONDecodeError as e:
return False, f"JSONDecodeError: {e.msg} (line {e.lineno}, column {e.colno})"
except Exception as e: # noqa: BLE001 — any parse failure is a lint failure
return False, f"{type(e).__name__}: {e}"
def _lint_yaml_inproc(content: str) -> tuple[bool, str]:
"""In-process YAML syntax check. Returns (ok, error_message).
Skipped gracefully if PyYAML isn't installed — YAML parsing is optional.
"""
try:
import yaml as _yaml
except ImportError:
# PyYAML not available — skip silently, caller treats as no linter.
return True, "__SKIP__"
try:
_yaml.safe_load(content)
return True, ""
except _yaml.YAMLError as e:
return False, f"YAMLError: {e}"
except Exception as e: # noqa: BLE001
return False, f"{type(e).__name__}: {e}"
def _lint_toml_inproc(content: str) -> tuple[bool, str]:
"""In-process TOML syntax check (stdlib tomllib, Python 3.11+)."""
try:
import tomllib as _toml
except ImportError:
# Pre-3.11 fallback via tomli, if installed.
try:
import tomli as _toml # type: ignore[no-redef]
except ImportError:
return True, "__SKIP__"
try:
_toml.loads(content)
return True, ""
except Exception as e: # tomllib raises TOMLDecodeError, a ValueError subclass
return False, f"{type(e).__name__}: {e}"
def _lint_python_inproc(content: str) -> tuple[bool, str]:
"""In-process Python syntax check via ast.parse.
Catches SyntaxError, IndentationError, and everything else the
ast module rejects — matching py_compile's scope but with no
subprocess overhead and no dependency on a ``python`` in PATH.
"""
import ast as _ast
try:
_ast.parse(content)
return True, ""
except SyntaxError as e:
loc = f" (line {e.lineno}, column {e.offset})" if e.lineno else ""
return False, f"{type(e).__name__}: {e.msg}{loc}"
except Exception as e: # noqa: BLE001
return False, f"{type(e).__name__}: {e}"
# In-process linters by file extension. Preferred over shell linters when
# present — no subprocess overhead, microseconds per call. Each callable
# takes file content (str) and returns (ok: bool, error: str). An error
# string of ``"__SKIP__"`` signals the linter isn't available (missing
# dependency) and should be treated as "no linter".
LINTERS_INPROC = {
'.py': _lint_python_inproc,
'.json': _lint_json_inproc,
'.yaml': _lint_yaml_inproc,
'.yml': _lint_yaml_inproc,
'.toml': _lint_toml_inproc,
}
# Max limits for read operations
MAX_LINES = 2000
MAX_LINE_LENGTH = 2000
MAX_FILE_SIZE = 50 * 1024 # 50KB
DEFAULT_READ_OFFSET = 1
DEFAULT_READ_LIMIT = 500
DEFAULT_SEARCH_OFFSET = 0
DEFAULT_SEARCH_LIMIT = 50
def _coerce_int(value: Any, default: int) -> int:
"""Best-effort integer coercion for tool pagination inputs."""
try:
return int(value)
except (TypeError, ValueError):
return default
def normalize_read_pagination(offset: Any = DEFAULT_READ_OFFSET,
limit: Any = DEFAULT_READ_LIMIT) -> tuple[int, int]:
"""Return safe read_file pagination bounds.
Tool schemas declare minimum/maximum values, but not every caller or
provider enforces schemas before dispatch. Clamp here so invalid values
cannot leak into sed ranges like ``0,-1p``.
The upper bound on ``limit`` comes from ``tool_output.max_lines`` in
config.yaml (defaults to the module-level ``MAX_LINES`` constant).
"""
from tools.tool_output_limits import get_max_lines
max_lines = get_max_lines()
normalized_offset = max(1, _coerce_int(offset, DEFAULT_READ_OFFSET))
normalized_limit = _coerce_int(limit, DEFAULT_READ_LIMIT)
normalized_limit = max(1, min(normalized_limit, max_lines))
return normalized_offset, normalized_limit
def normalize_search_pagination(offset: Any = DEFAULT_SEARCH_OFFSET,
limit: Any = DEFAULT_SEARCH_LIMIT) -> tuple[int, int]:
"""Return safe search pagination bounds for shell head/tail pipelines."""
normalized_offset = max(0, _coerce_int(offset, DEFAULT_SEARCH_OFFSET))
normalized_limit = max(1, _coerce_int(limit, DEFAULT_SEARCH_LIMIT))
return normalized_offset, normalized_limit
class ShellFileOperations(FileOperations):
"""
File operations implemented via shell commands.
Works with ANY terminal backend that has execute(command, cwd) method.
This includes local, docker, singularity, ssh, modal, and daytona environments.
"""
def __init__(self, terminal_env, cwd: str = None):
"""
Initialize file operations with a terminal environment.
Args:
terminal_env: Any object with execute(command, cwd) method.
Returns {"output": str, "returncode": int}
cwd: Optional explicit fallback cwd when the terminal env has
no cwd attribute (rare — most backends track cwd live).
Note:
Every _exec() call prefers the LIVE ``terminal_env.cwd`` over
``self.cwd`` so ``cd`` commands run via the terminal tool are
picked up immediately. ``self.cwd`` is only used as a fallback
when the env has no cwd at all — it is NOT the authoritative
cwd, despite being settable at init time.
Historical bug (fixed): prior versions of this class used the
init-time cwd for every _exec() call, which caused relative
paths passed to patch/read/write to target the wrong directory
after the user ran ``cd`` in the terminal. Patches would
claim success and return a plausible diff but land in the
original directory, producing apparent silent failures.
"""
self.env = terminal_env
# Determine cwd from various possible sources.
# IMPORTANT: do NOT fall back to os.getcwd() -- that's the HOST's local
# path which doesn't exist inside container/cloud backends (modal, docker).
# If nothing provides a cwd, use "/" as a safe universal default.
self.cwd = cwd or getattr(terminal_env, 'cwd', None) or \
getattr(getattr(terminal_env, 'config', None), 'cwd', None) or "/"
# Cache for command availability checks
self._command_cache: Dict[str, bool] = {}
def _exec(self, command: str, cwd: str = None, timeout: int = None,
stdin_data: str = None) -> ExecuteResult:
"""Execute command via terminal backend.
Args:
stdin_data: If provided, piped to the process's stdin instead of
embedding in the command string. Bypasses ARG_MAX.
Cwd resolution order (critical — see class docstring):
1. Explicit ``cwd`` arg (if provided)
2. Live ``self.env.cwd`` (tracks ``cd`` commands run via terminal)
3. Init-time ``self.cwd`` (fallback when env has no cwd attribute)
This ordering ensures relative paths in file operations follow the
terminal's current directory — not the directory this file_ops was
originally created in. See test_file_ops_cwd_tracking.py.
"""
kwargs = {}
if timeout:
kwargs['timeout'] = timeout
if stdin_data is not None:
kwargs['stdin_data'] = stdin_data
# Resolve cwd from the live env so `cd` commands are picked up.
# Fall through to init-time self.cwd only if the env doesn't track cwd.
effective_cwd = cwd or getattr(self.env, 'cwd', None) or self.cwd
result = self.env.execute(command, cwd=effective_cwd, **kwargs)
return ExecuteResult(
stdout=result.get("output", ""),
exit_code=result.get("returncode", 0)
)
def _has_command(self, cmd: str) -> bool:
"""Check if a command exists in the environment (cached)."""
if cmd not in self._command_cache:
result = self._exec(f"command -v {cmd} >/dev/null 2>&1 && echo 'yes'")
self._command_cache[cmd] = result.stdout.strip() == 'yes'
return self._command_cache[cmd]
def _is_likely_binary(self, path: str, content_sample: str = None) -> bool:
"""
Check if a file is likely binary.
Uses extension check (fast) + content analysis (fallback).
"""
ext = os.path.splitext(path)[1].lower()
if ext in BINARY_EXTENSIONS:
return True
# Content analysis: >30% non-printable chars = binary
if content_sample:
non_printable = sum(1 for c in content_sample[:1000]
if ord(c) < 32 and c not in '\n\r\t')
return non_printable / min(len(content_sample), 1000) > 0.30
return False
def _is_image(self, path: str) -> bool:
"""Check if file is an image we can return as base64."""
ext = os.path.splitext(path)[1].lower()
return ext in IMAGE_EXTENSIONS
def _add_line_numbers(self, content: str, start_line: int = 1) -> str:
"""Add line numbers to content in LINE_NUM|CONTENT format."""
from tools.tool_output_limits import get_max_line_length
max_line_length = get_max_line_length()
lines = content.split('\n')
numbered = []
for i, line in enumerate(lines, start=start_line):
# Truncate long lines
if len(line) > max_line_length:
line = line[:max_line_length] + "... [truncated]"
numbered.append(f"{i:6d}|{line}")
return '\n'.join(numbered)
def _expand_path(self, path: str) -> str:
"""
Expand shell-style paths like ~ and ~user to absolute paths.
This must be done BEFORE shell escaping, since ~ doesn't expand
inside single quotes.
"""
if not path:
return path
# Handle ~ and ~user
if path.startswith('~'):
# Get home directory via the terminal environment
result = self._exec("echo $HOME")
if result.exit_code == 0 and result.stdout.strip():
home = result.stdout.strip()
if path == '~':
return home
elif path.startswith('~/'):
return home + path[1:] # Replace ~ with home
# ~username format - extract and validate username before
# letting shell expand it (prevent shell injection via
# paths like "~; rm -rf /").
rest = path[1:] # strip leading ~
slash_idx = rest.find('/')
username = rest[:slash_idx] if slash_idx >= 0 else rest
if username and re.fullmatch(r'[a-zA-Z0-9._-]+', username):
# Only expand ~username (not the full path) to avoid shell
# injection via path suffixes like "~user/$(malicious)".
expand_result = self._exec(f"echo ~{username}")
if expand_result.exit_code == 0 and expand_result.stdout.strip():
user_home = expand_result.stdout.strip()
suffix = path[1 + len(username):] # e.g. "/rest/of/path"
return user_home + suffix
return path
def _escape_shell_arg(self, arg: str) -> str:
"""Escape a string for safe use in shell commands."""
# Use single quotes and escape any single quotes in the string
return "'" + arg.replace("'", "'\"'\"'") + "'"
def _unified_diff(self, old_content: str, new_content: str, filename: str) -> str:
"""Generate unified diff between old and new content."""
old_lines = old_content.splitlines(keepends=True)
new_lines = new_content.splitlines(keepends=True)
diff = difflib.unified_diff(
old_lines, new_lines,
fromfile=f"a/{filename}",
tofile=f"b/{filename}"
)
return ''.join(diff)
# =========================================================================
# READ Implementation
# =========================================================================
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
"""
Read a file with pagination, binary detection, and line numbers.
Args:
path: File path (absolute or relative to cwd)
offset: Line number to start from (1-indexed, default 1)
limit: Maximum lines to return (default 500, max 2000)
Returns:
ReadResult with content, metadata, or error info
"""
# Expand ~ and other shell paths
path = self._expand_path(path)
offset, limit = normalize_read_pagination(offset, limit)
# Check if file exists and get size (wc -c is POSIX, works on Linux + macOS)
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
stat_result = self._exec(stat_cmd)
if stat_result.exit_code != 0:
# File not found - try to suggest similar files
return self._suggest_similar_files(path)
stat_output = _strip_terminal_fence_leaks(stat_result.stdout)
try:
file_size = int(stat_output.strip())
except ValueError:
file_size = 0
# Check if file is too large
if file_size > MAX_FILE_SIZE:
# Still try to read, but warn
pass
# Images are never inlined — redirect to the vision tool
if self._is_image(path):
return ReadResult(
is_image=True,
is_binary=True,
file_size=file_size,
hint=(
"Image file detected. Automatically redirected to vision_analyze tool. "
"Use vision_analyze with this file path to inspect the image contents."
),
)
# Read a sample to check for binary content
sample_cmd = f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null"
sample_result = self._exec(sample_cmd)
sample_output = _strip_terminal_fence_leaks(sample_result.stdout)
if self._is_likely_binary(path, sample_output):
return ReadResult(
is_binary=True,
file_size=file_size,
error="Binary file - cannot display as text. Use appropriate tools to handle this file type."
)
# Read with pagination using sed
end_line = offset + limit - 1
read_cmd = f"sed -n '{offset},{end_line}p' {self._escape_shell_arg(path)}"
read_result = self._exec(read_cmd)
if read_result.exit_code != 0:
return ReadResult(error=f"Failed to read file: {read_result.stdout}")
read_output = _strip_terminal_fence_leaks(read_result.stdout)
# Get total line count
wc_cmd = f"wc -l < {self._escape_shell_arg(path)}"
wc_result = self._exec(wc_cmd)
wc_output = _strip_terminal_fence_leaks(wc_result.stdout)
try:
total_lines = int(wc_output.strip())
except ValueError:
total_lines = 0
# Check if truncated
truncated = total_lines > end_line
hint = None
if truncated:
hint = f"Use offset={end_line + 1} to continue reading (showing {offset}-{end_line} of {total_lines} lines)"
return ReadResult(
content=self._add_line_numbers(read_output, offset),
total_lines=total_lines,
file_size=file_size,
truncated=truncated,
hint=hint
)
def _suggest_similar_files(self, path: str) -> ReadResult:
"""Suggest similar files when the requested file is not found."""
dir_path = os.path.dirname(path) or "."
filename = os.path.basename(path)
basename_no_ext = os.path.splitext(filename)[0]
ext = os.path.splitext(filename)[1].lower()
lower_name = filename.lower()
# List files in the target directory
ls_cmd = f"ls -1 {self._escape_shell_arg(dir_path)} 2>/dev/null | head -50"
ls_result = self._exec(ls_cmd)
scored: list = [] # (score, filepath) — higher is better
if ls_result.exit_code == 0 and ls_result.stdout.strip():
for f in ls_result.stdout.strip().split('\n'):
if not f:
continue
lf = f.lower()
score = 0
# Exact match (shouldn't happen, but guard)
if lf == lower_name:
score = 100
# Same base name, different extension (e.g. config.yml vs config.yaml)
elif os.path.splitext(f)[0].lower() == basename_no_ext.lower():
score = 90
# Target is prefix of candidate or vice-versa
elif lf.startswith(lower_name) or lower_name.startswith(lf):
score = 70
# Substring match (candidate contains query)
elif lower_name in lf:
score = 60
# Reverse substring (query contains candidate name)
elif lf in lower_name and len(lf) > 2:
score = 40
# Same extension with some overlap
elif ext and os.path.splitext(f)[1].lower() == ext:
common = set(lower_name) & set(lf)
if len(common) >= max(len(lower_name), len(lf)) * 0.4:
score = 30
if score > 0:
scored.append((score, os.path.join(dir_path, f)))
scored.sort(key=lambda x: -x[0])
similar = [fp for _, fp in scored[:5]]
return ReadResult(
error=f"File not found: {path}",
similar_files=similar
)
def read_file_raw(self, path: str) -> ReadResult:
"""Read the complete file content as a plain string.
No pagination, no line-number prefixes, no per-line truncation.
Uses cat so the full file is returned regardless of size.
"""
path = self._expand_path(path)
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
stat_result = self._exec(stat_cmd)
if stat_result.exit_code != 0:
return self._suggest_similar_files(path)
stat_output = _strip_terminal_fence_leaks(stat_result.stdout)
try:
file_size = int(stat_output.strip())
except ValueError:
file_size = 0
if self._is_image(path):
return ReadResult(is_image=True, is_binary=True, file_size=file_size)
sample_result = self._exec(f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null")
sample_output = _strip_terminal_fence_leaks(sample_result.stdout)
if self._is_likely_binary(path, sample_output):
return ReadResult(
is_binary=True, file_size=file_size,
error="Binary file — cannot display as text."
)
cat_result = self._exec(f"cat {self._escape_shell_arg(path)}")
if cat_result.exit_code != 0:
return ReadResult(error=f"Failed to read file: {cat_result.stdout}")
return ReadResult(
content=_strip_terminal_fence_leaks(cat_result.stdout),
file_size=file_size,
)
def delete_file(self, path: str) -> WriteResult:
"""Delete a file via rm."""
path = self._expand_path(path)
if _is_write_denied(path):
return WriteResult(error=f"Delete denied: {path} is a protected path")
result = self._exec(f"rm -f {self._escape_shell_arg(path)}")
if result.exit_code != 0:
return WriteResult(error=f"Failed to delete {path}: {result.stdout}")
return WriteResult()
def move_file(self, src: str, dst: str) -> WriteResult:
"""Move a file via mv."""
src = self._expand_path(src)
dst = self._expand_path(dst)
for p in (src, dst):
if _is_write_denied(p):
return WriteResult(error=f"Move denied: {p} is a protected path")
result = self._exec(
f"mv {self._escape_shell_arg(src)} {self._escape_shell_arg(dst)}"
)
if result.exit_code != 0:
return WriteResult(error=f"Failed to move {src} -> {dst}: {result.stdout}")
return WriteResult()
# =========================================================================
# WRITE Implementation
# =========================================================================
def write_file(self, path: str, content: str) -> WriteResult:
"""
Write content to a file, creating parent directories as needed.
Pipes content through stdin to avoid OS ARG_MAX limits on large
files. The content never appears in the shell command string —
only the file path does.
After the write, runs a post-first / pre-lazy lint check via
``_check_lint_delta()``. If the new content is clean, the lint
call is O(one parse). If the new content has errors, the pre-write
content is linted too and only errors newly introduced by this
write are surfaced — pre-existing problems are filtered out so
the agent isn't distracted chasing them.
Args:
path: File path to write
content: Content to write
Returns:
WriteResult with bytes written, lint summary, or error.
"""
# Expand ~ and other shell paths
path = self._expand_path(path)
# Block writes to sensitive paths
if _is_write_denied(path):
return WriteResult(error=f"Write denied: '{path}' is a protected system/credential file.")
# Capture pre-write content for lint-delta computation. Only do this
# when an in-process OR shell linter exists for this extension — no
# point paying for the read otherwise. For in-process linters we
# pass the content directly; for shell linters the pre-state isn't
# useful (we'd have to re-write-read to lint the old version, which
# defeats the purpose), so we skip the capture and accept the naive
# "all errors" report.
ext = os.path.splitext(path)[1].lower()
pre_content: Optional[str] = None
if ext in LINTERS_INPROC:
# Best-effort read; failure (file missing, permission) leaves
# pre_content as None which makes the delta step degrade
# gracefully to "report all errors".
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
read_result = self._exec(read_cmd)
if read_result.exit_code == 0 and read_result.stdout:
pre_content = read_result.stdout
# Create parent directories
parent = os.path.dirname(path)
dirs_created = False
if parent:
mkdir_cmd = f"mkdir -p {self._escape_shell_arg(parent)}"
mkdir_result = self._exec(mkdir_cmd)
if mkdir_result.exit_code == 0:
dirs_created = True
# Write via stdin pipe — content bypasses shell arg parsing entirely,
# so there's no ARG_MAX limit regardless of file size.
write_cmd = f"cat > {self._escape_shell_arg(path)}"
write_result = self._exec(write_cmd, stdin_data=content)
if write_result.exit_code != 0:
return WriteResult(error=f"Failed to write file: {write_result.stdout}")
# Get bytes written (wc -c is POSIX, works on Linux + macOS)
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
stat_result = self._exec(stat_cmd)
try:
bytes_written = int(stat_result.stdout.strip())
except ValueError:
bytes_written = len(content.encode('utf-8'))
# Post-write lint with delta refinement.
lint_result = self._check_lint_delta(path, pre_content=pre_content, post_content=content)
return WriteResult(
bytes_written=bytes_written,
dirs_created=dirs_created,
lint=lint_result.to_dict() if lint_result else None,
)
# =========================================================================
# PATCH Implementation (Replace Mode)
# =========================================================================
def patch_replace(self, path: str, old_string: str, new_string: str,
replace_all: bool = False) -> PatchResult:
"""
Replace text in a file using fuzzy matching.
Args:
path: File path to modify
old_string: Text to find (must be unique unless replace_all=True)
new_string: Replacement text
replace_all: If True, replace all occurrences
Returns:
PatchResult with diff and lint results
"""
# Expand ~ and other shell paths
path = self._expand_path(path)
# Block writes to sensitive paths
if _is_write_denied(path):
return PatchResult(error=f"Write denied: '{path}' is a protected system/credential file.")
# Read current content
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
read_result = self._exec(read_cmd)
if read_result.exit_code != 0:
return PatchResult(error=f"Failed to read file: {path}")
content = read_result.stdout
# Import and use fuzzy matching
from tools.fuzzy_match import fuzzy_find_and_replace
new_content, match_count, _strategy, error = fuzzy_find_and_replace(
content, old_string, new_string, replace_all
)
if error or match_count == 0:
err_msg = error or f"Could not find match for old_string in {path}"
try:
from tools.fuzzy_match import format_no_match_hint
err_msg += format_no_match_hint(err_msg, match_count, old_string, content)
except Exception:
pass
return PatchResult(error=err_msg)
# Write back
write_result = self.write_file(path, new_content)
if write_result.error:
return PatchResult(error=f"Failed to write changes: {write_result.error}")
# Post-write verification — re-read the file and confirm the bytes we
# intended to write actually landed. Catches silent persistence
# failures (backend FS oddities, race with another task, truncated
# pipe, etc.) that would otherwise return success-with-diff while the
# file is unchanged on disk.
verify_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
verify_result = self._exec(verify_cmd)
if verify_result.exit_code != 0:
return PatchResult(error=f"Post-write verification failed: could not re-read {path}")
if verify_result.stdout != new_content:
return PatchResult(error=(
f"Post-write verification failed for {path}: on-disk content "
f"differs from intended write "
f"(wrote {len(new_content)} chars, read back {len(verify_result.stdout)}). "
"The patch did not persist. Re-read the file and try again."
))
# Generate diff
diff = self._unified_diff(content, new_content, path)
# Auto-lint with delta refinement: only surface errors introduced
# by this patch, filtering out pre-existing lint failures so the
# agent isn't distracted by problems that were already there.
lint_result = self._check_lint_delta(path, pre_content=content, post_content=new_content)
return PatchResult(
success=True,
diff=diff,
files_modified=[path],
lint=lint_result.to_dict() if lint_result else None
)
def patch_v4a(self, patch_content: str) -> PatchResult:
"""
Apply a V4A format patch.
V4A format:
*** Begin Patch
*** Update File: path/to/file.py
@@ context hint @@
context line
-removed line
+added line
*** End Patch
Args:
patch_content: V4A format patch string
Returns:
PatchResult with changes made
"""
# Import patch parser
from tools.patch_parser import parse_v4a_patch, apply_v4a_operations
operations, parse_error = parse_v4a_patch(patch_content)
if parse_error:
return PatchResult(error=f"Failed to parse patch: {parse_error}")
# Apply operations
result = apply_v4a_operations(operations, self)
return result
def _check_lint(self, path: str, content: Optional[str] = None) -> LintResult:
"""
Run syntax check on a file after editing.
Prefers the in-process linter for structured formats (JSON, YAML,
TOML) when possible — those parse via the Python stdlib in
microseconds and don't require a subprocess. Falls back to the
shell linter table for compiled/type-checked languages
(py_compile, node --check, tsc, go vet, rustfmt).
Args:
path: File path (used to select the linter + for shell invocation).
content: Optional file content. If provided AND an in-process
linter matches the extension, we lint the content
directly without re-reading the file from disk. Ignored
for shell linters.
Returns:
LintResult with status and any errors.
"""
ext = os.path.splitext(path)[1].lower()
# Prefer in-process linter when available.
inproc = LINTERS_INPROC.get(ext)
if inproc is not None:
# Need content — either passed in or read from disk.
if content is None:
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
read_result = self._exec(read_cmd)
if read_result.exit_code != 0:
return LintResult(skipped=True, message=f"Failed to read {path} for lint")
content = read_result.stdout
ok, err = inproc(content)
if err == "__SKIP__":
return LintResult(skipped=True, message=f"No linter available for {ext} (missing dependency)")
return LintResult(success=ok, output="" if ok else err)
# Fall back to shell linter.
if ext not in LINTERS:
return LintResult(skipped=True, message=f"No linter for {ext} files")
linter_cmd = LINTERS[ext]
# Extract the base command (first word)
base_cmd = linter_cmd.split()[0]
if not self._has_command(base_cmd):
return LintResult(skipped=True, message=f"{base_cmd} not available")
# Run linter
cmd = linter_cmd.replace("{file}", self._escape_shell_arg(path))
result = self._exec(cmd, timeout=30)
return LintResult(
success=result.exit_code == 0,
output=result.stdout.strip() if result.stdout.strip() else ""
)
def _check_lint_delta(self, path: str, pre_content: Optional[str],
post_content: Optional[str] = None) -> LintResult:
"""
Run post-write lint with pre-write baseline comparison.
Strategy (post-first, pre-lazy):
1. Lint the post-write state. If clean → return clean immediately.
This is the hot path and matches _check_lint() in cost.
2. If post-lint found errors AND we have pre-write content, lint
that too. If the pre-write file was already broken, return only
the *new* errors introduced by this edit — errors that existed
before aren't the agent's problem to chase right now.
3. If pre_content is None (new file or unavailable), skip the delta
step and return all post-write errors.
This mirrors Cline's and OpenCode's post-edit LSP pattern: surface
only the errors this specific edit introduced, so the agent doesn't
get distracted by pre-existing problems.
Args:
path: File path (for linter selection).
pre_content: File content BEFORE the write. Pass None for new
files or when the pre-state isn't available — the
delta refinement is skipped and all post errors
are returned.
post_content: File content AFTER the write. Optional; if None,
the shell linter reads from disk (same as
_check_lint).
Returns:
LintResult. ``output`` contains either the full post-lint
errors (no pre-state) or just the new-error lines (delta
refinement applied).
"""
post = self._check_lint(path, content=post_content)
# Hot path: clean post-write, no pre-lint needed.
if post.success or post.skipped:
return post
# Post-write has errors. If we have pre-content, run the delta
# refinement to filter out pre-existing errors.
if pre_content is None:
return post
pre = self._check_lint(path, content=pre_content)
if pre.success or pre.skipped or not pre.output:
# Pre-write was clean (or we couldn't lint it) — post errors
# are all new. Return the full post output.
return post
# Both pre- and post-write had errors. Compute the set-difference
# on non-empty stripped lines. Caveat: single-error parsers
# (ast.parse, json.loads) stop at the first error and don't report
# later ones — if the pre-existing error blocks parsing before
# reaching the edit region, we can't prove the edit is clean. So
# if every post error also appeared pre-edit, we report the file
# as still broken but annotate that this edit introduced nothing
# new on top — the agent knows it's inherited state, not fresh
# damage, without silently dropping the error.
pre_lines = {ln.strip() for ln in pre.output.splitlines() if ln.strip()}
post_lines = [ln for ln in post.output.splitlines() if ln.strip() and ln.strip() not in pre_lines]
if not post_lines:
# Every error in post was also in pre — this edit didn't make
# anything obviously worse, but the file remains broken and
# the agent should know.
return LintResult(
success=False,
output=post.output,
message="Pre-existing lint errors — this edit didn't introduce new ones but the file is still broken.",
)
return LintResult(
success=False,
output=(
"New lint errors introduced by this edit "
"(pre-existing errors filtered out):\n" + "\n".join(post_lines)
)
)
# =========================================================================
# SEARCH Implementation
# =========================================================================
def search(self, pattern: str, path: str = ".", target: str = "content",
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
output_mode: str = "content", context: int = 0) -> SearchResult:
"""
Search for content or files.
Args:
pattern: Regex (for content) or glob pattern (for files)
path: Directory/file to search (default: cwd)
target: "content" (grep) or "files" (glob)
file_glob: File pattern filter for content search (e.g., "*.py")
limit: Max results (default 50)
offset: Skip first N results
output_mode: "content", "files_only", or "count"
context: Lines of context around matches
Returns:
SearchResult with matches or file list
"""
offset, limit = normalize_search_pagination(offset, limit)
# Expand ~ and other shell paths
path = self._expand_path(path)
# Validate that the path exists before searching
check = self._exec(f"test -e {self._escape_shell_arg(path)} && echo exists || echo not_found")
if "not_found" in check.stdout:
# Try to suggest nearby paths
parent = os.path.dirname(path) or "."
basename_query = os.path.basename(path)
hint_parts = [f"Path not found: {path}"]
# Check if parent directory exists and list similar entries
parent_check = self._exec(
f"test -d {self._escape_shell_arg(parent)} && echo yes || echo no"
)
if "yes" in parent_check.stdout and basename_query:
ls_result = self._exec(
f"ls -1 {self._escape_shell_arg(parent)} 2>/dev/null | head -20"
)
if ls_result.exit_code == 0 and ls_result.stdout.strip():
lower_q = basename_query.lower()
candidates = []
for entry in ls_result.stdout.strip().split('\n'):
if not entry:
continue
le = entry.lower()
if lower_q in le or le in lower_q or le.startswith(lower_q[:3]):
candidates.append(os.path.join(parent, entry))
if candidates:
hint_parts.append(
"Similar paths: " + ", ".join(candidates[:5])
)
return SearchResult(
error=". ".join(hint_parts),
total_count=0
)
if target == "files":
return self._search_files(pattern, path, limit, offset)
else:
return self._search_content(pattern, path, file_glob, limit, offset,
output_mode, context)
def _search_files(self, pattern: str, path: str, limit: int, offset: int) -> SearchResult:
"""Search for files by name pattern (glob-like)."""
# Auto-prepend **/ for recursive search if not already present
if not pattern.startswith('**/') and '/' not in pattern:
search_pattern = pattern
else:
search_pattern = pattern.split('/')[-1]
search_root = Path(path)
has_hidden_path_ancestor = any(
part not in (".", "..") and part.startswith(".")
for part in search_root.parts
)
# Prefer ripgrep: respects .gitignore, excludes hidden dirs by
# default, and has parallel directory traversal (~200x faster than
# find on wide trees). Mirrors _search_content which already uses rg.
if self._has_command('rg'):
return self._search_files_rg(search_pattern, path, limit, offset)
# Fallback: find (slower, no .gitignore awareness)
if not self._has_command('find'):
return SearchResult(
error="File search requires 'rg' (ripgrep) or 'find'. "
"Install ripgrep for best results: "
"https://github.com/BurntSushi/ripgrep#installation"
)
# Exclude hidden directories (matching ripgrep's default behavior).
hidden_exclude = "-not -path '*/.*'" if not has_hidden_path_ancestor else ""
hidden_filter_expr = f" {hidden_exclude}" if hidden_exclude else ""
# Use shell pagination for standard roots. For hidden roots, gather full
# output so we can re-apply hidden-descendant filtering while allowing
# explicit hidden-root searches.
pagination_expr = ""
if not has_hidden_path_ancestor:
pagination_expr = f" | tail -n +{offset + 1} | head -n {limit}"
cmd = f"find {self._escape_shell_arg(path)}{hidden_filter_expr} -type f -name {self._escape_shell_arg(search_pattern)} " \
f"-printf '%T@ %p\\n' 2>/dev/null | sort -rn{pagination_expr}"
result = self._exec(cmd, timeout=60)
if not result.stdout.strip():
# Try without -printf (BSD find compatibility -- macOS)
cmd_simple = f"find {self._escape_shell_arg(path)}{hidden_filter_expr} -type f -name {self._escape_shell_arg(search_pattern)} " \
f"2>/dev/null | sort -rn{pagination_expr}"
result = self._exec(cmd_simple, timeout=60)
files = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split(' ', 1)
if len(parts) == 2 and parts[0].replace('.', '').isdigit():
files.append(parts[1])
else:
files.append(line)
# For explicit hidden roots, find's path-based filtering excludes every
# file under the hidden path. Apply descendant filtering after command
# execution so only the explicit root ancestry is bypassed.
if has_hidden_path_ancestor:
normalized_root = search_root.resolve()
filtered_files = []
for file_path in files:
try:
rel_parts = Path(file_path).resolve().relative_to(normalized_root).parts
except ValueError:
rel_parts = Path(file_path).parts
if any(part not in (".", "..") and part.startswith(".") for part in rel_parts):
continue
filtered_files.append(file_path)
files = filtered_files[offset:offset + limit]
# pagination for standard roots is already applied in shell
return SearchResult(
files=files,
total_count=len(files)
)
def _search_files_rg(self, pattern: str, path: str, limit: int, offset: int) -> SearchResult:
"""Search for files by name using ripgrep's --files mode.
rg --files respects .gitignore and excludes hidden directories by
default, and uses parallel directory traversal for ~200x speedup
over find on wide trees. Results are sorted by modification time
(most recently edited first) when rg >= 13.0 supports --sortr.
"""
# rg --files -g uses glob patterns; wrap bare names so they match
# at any depth (equivalent to find -name).
if '/' not in pattern and not pattern.startswith('*'):
glob_pattern = f"*{pattern}"
else:
glob_pattern = pattern
fetch_limit = limit + offset
# Try mtime-sorted first (rg 13+); fall back to unsorted if not supported.
cmd_sorted = (
f"rg --files --sortr=modified -g {self._escape_shell_arg(glob_pattern)} "
f"{self._escape_shell_arg(path)} 2>/dev/null "
f"| head -n {fetch_limit}"
)
result = self._exec(cmd_sorted, timeout=60)
all_files = [f for f in result.stdout.strip().split('\n') if f]
if not all_files:
# --sortr may have failed on older rg; retry without it.
cmd_plain = (
f"rg --files -g {self._escape_shell_arg(glob_pattern)} "
f"{self._escape_shell_arg(path)} 2>/dev/null "
f"| head -n {fetch_limit}"
)
result = self._exec(cmd_plain, timeout=60)
all_files = [f for f in result.stdout.strip().split('\n') if f]
page = all_files[offset:offset + limit]
return SearchResult(
files=page,
total_count=len(all_files),
truncated=len(all_files) >= fetch_limit,
)
def _search_content(self, pattern: str, path: str, file_glob: Optional[str],
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
"""Search for content inside files (grep-like)."""
# Try ripgrep first (fast), fallback to grep (slower but works)
if self._has_command('rg'):
return self._search_with_rg(pattern, path, file_glob, limit, offset,
output_mode, context)
elif self._has_command('grep'):
return self._search_with_grep(pattern, path, file_glob, limit, offset,
output_mode, context)
else:
# Neither rg nor grep available (Windows without Git Bash, etc.)
return SearchResult(
error="Content search requires ripgrep (rg) or grep. "
"Install ripgrep: https://github.com/BurntSushi/ripgrep#installation"
)
def _search_with_rg(self, pattern: str, path: str, file_glob: Optional[str],
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
"""Search using ripgrep."""
cmd_parts = ["rg", "--line-number", "--no-heading", "--with-filename"]
# Add context if requested
if context > 0:
cmd_parts.extend(["-C", str(context)])
# Add file glob filter (must be quoted to prevent shell expansion)
if file_glob:
cmd_parts.extend(["--glob", self._escape_shell_arg(file_glob)])
# Output mode handling
if output_mode == "files_only":
cmd_parts.append("-l") # Files only
elif output_mode == "count":
cmd_parts.append("-c") # Count per file
# Add pattern and path
cmd_parts.append(self._escape_shell_arg(pattern))
cmd_parts.append(self._escape_shell_arg(path))
# Fetch extra rows so we can report the true total before slicing.
# For context mode, rg emits separator lines ("--") between groups,
# so we grab generously and filter in Python.
fetch_limit = limit + offset + 200 if context > 0 else limit + offset
cmd_parts.extend(["|", "head", "-n", str(fetch_limit)])
cmd = " ".join(cmd_parts)
result = self._exec(cmd, timeout=60)
# rg exit codes: 0=matches found, 1=no matches, 2=error
if result.exit_code == 2 and not result.stdout.strip():
error_msg = result.stderr.strip() if hasattr(result, 'stderr') and result.stderr else "Search error"
return SearchResult(error=f"Search failed: {error_msg}", total_count=0)
# Parse results based on output mode
if output_mode == "files_only":
all_files = [f for f in result.stdout.strip().split('\n') if f]
total = len(all_files)
page = all_files[offset:offset + limit]
return SearchResult(files=page, total_count=total)
elif output_mode == "count":
counts = {}
for line in result.stdout.strip().split('\n'):
if ':' in line:
parts = line.rsplit(':', 1)
if len(parts) == 2:
try:
counts[parts[0]] = int(parts[1])
except ValueError:
pass
return SearchResult(counts=counts, total_count=sum(counts.values()))
else:
# Parse content matches and context lines.
# rg match lines: "file:lineno:content" (colon separator)
# rg context lines: "file-lineno-content" (dash separator)
# rg group seps: "--"
# Note: on Windows, paths contain drive letters (e.g. C:\path),
# so naive split(":") breaks. Use regex to handle both platforms.
_match_re = re.compile(r'^([A-Za-z]:)?(.*?):(\d+):(.*)$')
matches = []
for line in result.stdout.strip().split('\n'):
if not line or line == "--":
continue
# Try match line first (colon-separated: file:line:content)
m = _match_re.match(line)
if m:
matches.append(SearchMatch(
path=(m.group(1) or '') + m.group(2),
line_number=int(m.group(3)),
content=m.group(4)[:500]
))
continue
# Try context line (dash-separated: file-line-content)
# Only attempt if context was requested to avoid false positives
if context > 0:
parsed = _parse_search_context_line(line)
if parsed:
matches.append(SearchMatch(
path=parsed[0],
line_number=parsed[1],
content=parsed[2][:500]
))
total = len(matches)
page = matches[offset:offset + limit]
return SearchResult(
matches=page,
total_count=total,
truncated=total > offset + limit
)
def _search_with_grep(self, pattern: str, path: str, file_glob: Optional[str],
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
"""Fallback search using grep."""
cmd_parts = ["grep", "-rnH"] # -H forces filename even for single-file searches
# Exclude hidden directories (matching ripgrep's default behavior).
# This prevents searching inside .hub/index-cache/, .git/, etc.
cmd_parts.append("--exclude-dir='.*'")
# Add context if requested
if context > 0:
cmd_parts.extend(["-C", str(context)])
# Add file pattern filter (must be quoted to prevent shell expansion)
if file_glob:
cmd_parts.extend(["--include", self._escape_shell_arg(file_glob)])
# Output mode handling
if output_mode == "files_only":
cmd_parts.append("-l")
elif output_mode == "count":
cmd_parts.append("-c")
# Add pattern and path
cmd_parts.append(self._escape_shell_arg(pattern))
cmd_parts.append(self._escape_shell_arg(path))
# Fetch generously so we can compute total before slicing
fetch_limit = limit + offset + (200 if context > 0 else 0)
cmd_parts.extend(["|", "head", "-n", str(fetch_limit)])
cmd = " ".join(cmd_parts)
result = self._exec(cmd, timeout=60)
# grep exit codes: 0=matches found, 1=no matches, 2=error
if result.exit_code == 2 and not result.stdout.strip():
error_msg = result.stderr.strip() if hasattr(result, 'stderr') and result.stderr else "Search error"
return SearchResult(error=f"Search failed: {error_msg}", total_count=0)
if output_mode == "files_only":
all_files = [f for f in result.stdout.strip().split('\n') if f]
total = len(all_files)
page = all_files[offset:offset + limit]
return SearchResult(files=page, total_count=total)
elif output_mode == "count":
counts = {}
for line in result.stdout.strip().split('\n'):
if ':' in line:
parts = line.rsplit(':', 1)
if len(parts) == 2:
try:
counts[parts[0]] = int(parts[1])
except ValueError:
pass
return SearchResult(counts=counts, total_count=sum(counts.values()))
else:
# grep match lines: "file:lineno:content" (colon)
# grep context lines: "file-lineno-content" (dash)
# grep group seps: "--"
# Note: on Windows, paths contain drive letters (e.g. C:\path),
# so naive split(":") breaks. Use regex to handle both platforms.
_match_re = re.compile(r'^([A-Za-z]:)?(.*?):(\d+):(.*)$')
matches = []
for line in result.stdout.strip().split('\n'):
if not line or line == "--":
continue
m = _match_re.match(line)
if m:
matches.append(SearchMatch(
path=(m.group(1) or '') + m.group(2),
line_number=int(m.group(3)),
content=m.group(4)[:500]
))
continue
if context > 0:
parsed = _parse_search_context_line(line)
if parsed:
matches.append(SearchMatch(
path=parsed[0],
line_number=parsed[1],
content=parsed[2][:500]
))
total = len(matches)
page = matches[offset:offset + limit]
return SearchResult(
matches=page,
total_count=total,
truncated=total > offset + limit
)