mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-30 01:41:43 +00:00
Mechanical cleanup across 43 files — removes 46 unused imports (F401) and 14 unused local variables (F841) detected by `ruff check --select F401,F841`. Net: -49 lines. Also fixes a latent NameError in rl_cli.py where `get_hermes_home()` was called at module line 32 before its import at line 65 — the module never imported successfully on main. The ruff audit surfaced this because it correctly saw the symbol as imported-but-unused (the call happened before the import ran); the fix moves the import to the top of the file alongside other stdlib imports. One `# noqa: F401` kept in hermes_cli/status.py for `subprocess`: tests monkeypatch `hermes_cli.status.subprocess` as a regression guard that systemctl isn't called on Termux, so the name must exist at module scope even though the module body doesn't reference it. Docstring explains the reason. Also fixes an invalid `# noqa:` directive in gateway/platforms/discord.py:308 that lacked a rule code. Co-authored-by: teknium1 <teknium@users.noreply.github.com>
1257 lines
50 KiB
Python
1257 lines
50 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
File Operations Module
|
|
|
|
Provides file manipulation capabilities (read, write, patch, search) that work
|
|
across all terminal backends (local, docker, singularity, ssh, modal, daytona).
|
|
|
|
The key insight is that all file operations can be expressed as shell commands,
|
|
so we wrap the terminal backend's execute() interface to provide a unified file API.
|
|
|
|
Usage:
|
|
from tools.file_operations import ShellFileOperations
|
|
from tools.terminal_tool import _active_environments
|
|
|
|
# Get file operations for a terminal environment
|
|
file_ops = ShellFileOperations(terminal_env)
|
|
|
|
# Read a file
|
|
result = file_ops.read_file("/path/to/file.py")
|
|
|
|
# Write a file
|
|
result = file_ops.write_file("/path/to/new.py", "print('hello')")
|
|
|
|
# Search for content
|
|
result = file_ops.search("TODO", path=".", file_glob="*.py")
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import difflib
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional, List, Dict, Any
|
|
from pathlib import Path
|
|
from tools.binary_extensions import BINARY_EXTENSIONS
|
|
|
|
from agent.file_safety import (
|
|
build_write_denied_paths,
|
|
build_write_denied_prefixes,
|
|
get_safe_write_root as _shared_get_safe_write_root,
|
|
is_write_denied as _shared_is_write_denied,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Write-path deny list — blocks writes to sensitive system/credential files
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_HOME = str(Path.home())
|
|
|
|
WRITE_DENIED_PATHS = build_write_denied_paths(_HOME)
|
|
|
|
WRITE_DENIED_PREFIXES = build_write_denied_prefixes(_HOME)
|
|
|
|
|
|
def _get_safe_write_root() -> Optional[str]:
|
|
"""Return the resolved HERMES_WRITE_SAFE_ROOT path, or None if unset.
|
|
|
|
When set, all write_file/patch operations are constrained to this
|
|
directory tree. Writes outside it are denied even if the target is
|
|
not on the static deny list. Opt-in hardening for gateway/messaging
|
|
deployments that should only touch a workspace checkout.
|
|
"""
|
|
return _shared_get_safe_write_root()
|
|
|
|
|
|
def _is_write_denied(path: str) -> bool:
|
|
"""Return True if path is on the write deny list."""
|
|
return _shared_is_write_denied(path)
|
|
|
|
|
|
# =============================================================================
|
|
# Result Data Classes
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class ReadResult:
|
|
"""Result from reading a file."""
|
|
content: str = ""
|
|
total_lines: int = 0
|
|
file_size: int = 0
|
|
truncated: bool = False
|
|
hint: Optional[str] = None
|
|
is_binary: bool = False
|
|
is_image: bool = False
|
|
base64_content: Optional[str] = None
|
|
mime_type: Optional[str] = None
|
|
dimensions: Optional[str] = None # For images: "WIDTHxHEIGHT"
|
|
error: Optional[str] = None
|
|
similar_files: List[str] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {k: v for k, v in self.__dict__.items() if v is not None and v != []}
|
|
|
|
|
|
@dataclass
|
|
class WriteResult:
|
|
"""Result from writing a file."""
|
|
bytes_written: int = 0
|
|
dirs_created: bool = False
|
|
error: Optional[str] = None
|
|
warning: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {k: v for k, v in self.__dict__.items() if v is not None}
|
|
|
|
|
|
@dataclass
|
|
class PatchResult:
|
|
"""Result from patching a file."""
|
|
success: bool = False
|
|
diff: str = ""
|
|
files_modified: List[str] = field(default_factory=list)
|
|
files_created: List[str] = field(default_factory=list)
|
|
files_deleted: List[str] = field(default_factory=list)
|
|
lint: Optional[Dict[str, Any]] = None
|
|
error: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
result = {"success": self.success}
|
|
if self.diff:
|
|
result["diff"] = self.diff
|
|
if self.files_modified:
|
|
result["files_modified"] = self.files_modified
|
|
if self.files_created:
|
|
result["files_created"] = self.files_created
|
|
if self.files_deleted:
|
|
result["files_deleted"] = self.files_deleted
|
|
if self.lint:
|
|
result["lint"] = self.lint
|
|
if self.error:
|
|
result["error"] = self.error
|
|
return result
|
|
|
|
|
|
@dataclass
|
|
class SearchMatch:
|
|
"""A single search match."""
|
|
path: str
|
|
line_number: int
|
|
content: str
|
|
mtime: float = 0.0 # Modification time for sorting
|
|
|
|
|
|
@dataclass
|
|
class SearchResult:
|
|
"""Result from searching."""
|
|
matches: List[SearchMatch] = field(default_factory=list)
|
|
files: List[str] = field(default_factory=list)
|
|
counts: Dict[str, int] = field(default_factory=dict)
|
|
total_count: int = 0
|
|
truncated: bool = False
|
|
error: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
result = {"total_count": self.total_count}
|
|
if self.matches:
|
|
result["matches"] = [
|
|
{"path": m.path, "line": m.line_number, "content": m.content}
|
|
for m in self.matches
|
|
]
|
|
if self.files:
|
|
result["files"] = self.files
|
|
if self.counts:
|
|
result["counts"] = self.counts
|
|
if self.truncated:
|
|
result["truncated"] = True
|
|
if self.error:
|
|
result["error"] = self.error
|
|
return result
|
|
|
|
|
|
@dataclass
|
|
class LintResult:
|
|
"""Result from linting a file."""
|
|
success: bool = True
|
|
skipped: bool = False
|
|
output: str = ""
|
|
message: str = ""
|
|
|
|
def to_dict(self) -> dict:
|
|
if self.skipped:
|
|
return {"status": "skipped", "message": self.message}
|
|
return {
|
|
"status": "ok" if self.success else "error",
|
|
"output": self.output
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ExecuteResult:
|
|
"""Result from executing a shell command."""
|
|
stdout: str = ""
|
|
exit_code: int = 0
|
|
|
|
|
|
# =============================================================================
|
|
# Abstract Interface
|
|
# =============================================================================
|
|
|
|
class FileOperations(ABC):
|
|
"""Abstract interface for file operations across terminal backends."""
|
|
|
|
@abstractmethod
|
|
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
|
|
"""Read a file with pagination support."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def read_file_raw(self, path: str) -> ReadResult:
|
|
"""Read the complete file content as a plain string.
|
|
|
|
No pagination, no line-number prefixes, no per-line truncation.
|
|
Returns ReadResult with .content = full file text, .error set on
|
|
failure. Always reads to EOF regardless of file size.
|
|
"""
|
|
...
|
|
|
|
@abstractmethod
|
|
def write_file(self, path: str, content: str) -> WriteResult:
|
|
"""Write content to a file, creating directories as needed."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def patch_replace(self, path: str, old_string: str, new_string: str,
|
|
replace_all: bool = False) -> PatchResult:
|
|
"""Replace text in a file using fuzzy matching."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def patch_v4a(self, patch_content: str) -> PatchResult:
|
|
"""Apply a V4A format patch."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def delete_file(self, path: str) -> WriteResult:
|
|
"""Delete a file. Returns WriteResult with .error set on failure."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def move_file(self, src: str, dst: str) -> WriteResult:
|
|
"""Move/rename a file from src to dst. Returns WriteResult with .error set on failure."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def search(self, pattern: str, path: str = ".", target: str = "content",
|
|
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
|
|
output_mode: str = "content", context: int = 0) -> SearchResult:
|
|
"""Search for content or files."""
|
|
...
|
|
|
|
|
|
# =============================================================================
|
|
# Shell-based Implementation
|
|
# =============================================================================
|
|
|
|
# Image extensions (subset of binary that we can return as base64)
|
|
IMAGE_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.ico'}
|
|
|
|
# Linters by file extension
|
|
LINTERS = {
|
|
'.py': 'python -m py_compile {file} 2>&1',
|
|
'.js': 'node --check {file} 2>&1',
|
|
'.ts': 'npx tsc --noEmit {file} 2>&1',
|
|
'.go': 'go vet {file} 2>&1',
|
|
'.rs': 'rustfmt --check {file} 2>&1',
|
|
}
|
|
|
|
# Max limits for read operations
|
|
MAX_LINES = 2000
|
|
MAX_LINE_LENGTH = 2000
|
|
MAX_FILE_SIZE = 50 * 1024 # 50KB
|
|
DEFAULT_READ_OFFSET = 1
|
|
DEFAULT_READ_LIMIT = 500
|
|
DEFAULT_SEARCH_OFFSET = 0
|
|
DEFAULT_SEARCH_LIMIT = 50
|
|
|
|
|
|
def _coerce_int(value: Any, default: int) -> int:
|
|
"""Best-effort integer coercion for tool pagination inputs."""
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def normalize_read_pagination(offset: Any = DEFAULT_READ_OFFSET,
|
|
limit: Any = DEFAULT_READ_LIMIT) -> tuple[int, int]:
|
|
"""Return safe read_file pagination bounds.
|
|
|
|
Tool schemas declare minimum/maximum values, but not every caller or
|
|
provider enforces schemas before dispatch. Clamp here so invalid values
|
|
cannot leak into sed ranges like ``0,-1p``.
|
|
|
|
The upper bound on ``limit`` comes from ``tool_output.max_lines`` in
|
|
config.yaml (defaults to the module-level ``MAX_LINES`` constant).
|
|
"""
|
|
from tools.tool_output_limits import get_max_lines
|
|
max_lines = get_max_lines()
|
|
normalized_offset = max(1, _coerce_int(offset, DEFAULT_READ_OFFSET))
|
|
normalized_limit = _coerce_int(limit, DEFAULT_READ_LIMIT)
|
|
normalized_limit = max(1, min(normalized_limit, max_lines))
|
|
return normalized_offset, normalized_limit
|
|
|
|
|
|
def normalize_search_pagination(offset: Any = DEFAULT_SEARCH_OFFSET,
|
|
limit: Any = DEFAULT_SEARCH_LIMIT) -> tuple[int, int]:
|
|
"""Return safe search pagination bounds for shell head/tail pipelines."""
|
|
normalized_offset = max(0, _coerce_int(offset, DEFAULT_SEARCH_OFFSET))
|
|
normalized_limit = max(1, _coerce_int(limit, DEFAULT_SEARCH_LIMIT))
|
|
return normalized_offset, normalized_limit
|
|
|
|
|
|
class ShellFileOperations(FileOperations):
|
|
"""
|
|
File operations implemented via shell commands.
|
|
|
|
Works with ANY terminal backend that has execute(command, cwd) method.
|
|
This includes local, docker, singularity, ssh, modal, and daytona environments.
|
|
"""
|
|
|
|
def __init__(self, terminal_env, cwd: str = None):
|
|
"""
|
|
Initialize file operations with a terminal environment.
|
|
|
|
Args:
|
|
terminal_env: Any object with execute(command, cwd) method.
|
|
Returns {"output": str, "returncode": int}
|
|
cwd: Optional explicit fallback cwd when the terminal env has
|
|
no cwd attribute (rare — most backends track cwd live).
|
|
|
|
Note:
|
|
Every _exec() call prefers the LIVE ``terminal_env.cwd`` over
|
|
``self.cwd`` so ``cd`` commands run via the terminal tool are
|
|
picked up immediately. ``self.cwd`` is only used as a fallback
|
|
when the env has no cwd at all — it is NOT the authoritative
|
|
cwd, despite being settable at init time.
|
|
|
|
Historical bug (fixed): prior versions of this class used the
|
|
init-time cwd for every _exec() call, which caused relative
|
|
paths passed to patch/read/write to target the wrong directory
|
|
after the user ran ``cd`` in the terminal. Patches would
|
|
claim success and return a plausible diff but land in the
|
|
original directory, producing apparent silent failures.
|
|
"""
|
|
self.env = terminal_env
|
|
# Determine cwd from various possible sources.
|
|
# IMPORTANT: do NOT fall back to os.getcwd() -- that's the HOST's local
|
|
# path which doesn't exist inside container/cloud backends (modal, docker).
|
|
# If nothing provides a cwd, use "/" as a safe universal default.
|
|
self.cwd = cwd or getattr(terminal_env, 'cwd', None) or \
|
|
getattr(getattr(terminal_env, 'config', None), 'cwd', None) or "/"
|
|
|
|
# Cache for command availability checks
|
|
self._command_cache: Dict[str, bool] = {}
|
|
|
|
def _exec(self, command: str, cwd: str = None, timeout: int = None,
|
|
stdin_data: str = None) -> ExecuteResult:
|
|
"""Execute command via terminal backend.
|
|
|
|
Args:
|
|
stdin_data: If provided, piped to the process's stdin instead of
|
|
embedding in the command string. Bypasses ARG_MAX.
|
|
|
|
Cwd resolution order (critical — see class docstring):
|
|
1. Explicit ``cwd`` arg (if provided)
|
|
2. Live ``self.env.cwd`` (tracks ``cd`` commands run via terminal)
|
|
3. Init-time ``self.cwd`` (fallback when env has no cwd attribute)
|
|
|
|
This ordering ensures relative paths in file operations follow the
|
|
terminal's current directory — not the directory this file_ops was
|
|
originally created in. See test_file_ops_cwd_tracking.py.
|
|
"""
|
|
kwargs = {}
|
|
if timeout:
|
|
kwargs['timeout'] = timeout
|
|
if stdin_data is not None:
|
|
kwargs['stdin_data'] = stdin_data
|
|
|
|
# Resolve cwd from the live env so `cd` commands are picked up.
|
|
# Fall through to init-time self.cwd only if the env doesn't track cwd.
|
|
effective_cwd = cwd or getattr(self.env, 'cwd', None) or self.cwd
|
|
result = self.env.execute(command, cwd=effective_cwd, **kwargs)
|
|
return ExecuteResult(
|
|
stdout=result.get("output", ""),
|
|
exit_code=result.get("returncode", 0)
|
|
)
|
|
|
|
def _has_command(self, cmd: str) -> bool:
|
|
"""Check if a command exists in the environment (cached)."""
|
|
if cmd not in self._command_cache:
|
|
result = self._exec(f"command -v {cmd} >/dev/null 2>&1 && echo 'yes'")
|
|
self._command_cache[cmd] = result.stdout.strip() == 'yes'
|
|
return self._command_cache[cmd]
|
|
|
|
def _is_likely_binary(self, path: str, content_sample: str = None) -> bool:
|
|
"""
|
|
Check if a file is likely binary.
|
|
|
|
Uses extension check (fast) + content analysis (fallback).
|
|
"""
|
|
ext = os.path.splitext(path)[1].lower()
|
|
if ext in BINARY_EXTENSIONS:
|
|
return True
|
|
|
|
# Content analysis: >30% non-printable chars = binary
|
|
if content_sample:
|
|
non_printable = sum(1 for c in content_sample[:1000]
|
|
if ord(c) < 32 and c not in '\n\r\t')
|
|
return non_printable / min(len(content_sample), 1000) > 0.30
|
|
|
|
return False
|
|
|
|
def _is_image(self, path: str) -> bool:
|
|
"""Check if file is an image we can return as base64."""
|
|
ext = os.path.splitext(path)[1].lower()
|
|
return ext in IMAGE_EXTENSIONS
|
|
|
|
def _add_line_numbers(self, content: str, start_line: int = 1) -> str:
|
|
"""Add line numbers to content in LINE_NUM|CONTENT format."""
|
|
from tools.tool_output_limits import get_max_line_length
|
|
max_line_length = get_max_line_length()
|
|
lines = content.split('\n')
|
|
numbered = []
|
|
for i, line in enumerate(lines, start=start_line):
|
|
# Truncate long lines
|
|
if len(line) > max_line_length:
|
|
line = line[:max_line_length] + "... [truncated]"
|
|
numbered.append(f"{i:6d}|{line}")
|
|
return '\n'.join(numbered)
|
|
|
|
def _expand_path(self, path: str) -> str:
|
|
"""
|
|
Expand shell-style paths like ~ and ~user to absolute paths.
|
|
|
|
This must be done BEFORE shell escaping, since ~ doesn't expand
|
|
inside single quotes.
|
|
"""
|
|
if not path:
|
|
return path
|
|
|
|
# Handle ~ and ~user
|
|
if path.startswith('~'):
|
|
# Get home directory via the terminal environment
|
|
result = self._exec("echo $HOME")
|
|
if result.exit_code == 0 and result.stdout.strip():
|
|
home = result.stdout.strip()
|
|
if path == '~':
|
|
return home
|
|
elif path.startswith('~/'):
|
|
return home + path[1:] # Replace ~ with home
|
|
# ~username format - extract and validate username before
|
|
# letting shell expand it (prevent shell injection via
|
|
# paths like "~; rm -rf /").
|
|
rest = path[1:] # strip leading ~
|
|
slash_idx = rest.find('/')
|
|
username = rest[:slash_idx] if slash_idx >= 0 else rest
|
|
if username and re.fullmatch(r'[a-zA-Z0-9._-]+', username):
|
|
# Only expand ~username (not the full path) to avoid shell
|
|
# injection via path suffixes like "~user/$(malicious)".
|
|
expand_result = self._exec(f"echo ~{username}")
|
|
if expand_result.exit_code == 0 and expand_result.stdout.strip():
|
|
user_home = expand_result.stdout.strip()
|
|
suffix = path[1 + len(username):] # e.g. "/rest/of/path"
|
|
return user_home + suffix
|
|
|
|
return path
|
|
|
|
def _escape_shell_arg(self, arg: str) -> str:
|
|
"""Escape a string for safe use in shell commands."""
|
|
# Use single quotes and escape any single quotes in the string
|
|
return "'" + arg.replace("'", "'\"'\"'") + "'"
|
|
|
|
def _unified_diff(self, old_content: str, new_content: str, filename: str) -> str:
|
|
"""Generate unified diff between old and new content."""
|
|
old_lines = old_content.splitlines(keepends=True)
|
|
new_lines = new_content.splitlines(keepends=True)
|
|
diff = difflib.unified_diff(
|
|
old_lines, new_lines,
|
|
fromfile=f"a/{filename}",
|
|
tofile=f"b/{filename}"
|
|
)
|
|
return ''.join(diff)
|
|
|
|
# =========================================================================
|
|
# READ Implementation
|
|
# =========================================================================
|
|
|
|
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
|
|
"""
|
|
Read a file with pagination, binary detection, and line numbers.
|
|
|
|
Args:
|
|
path: File path (absolute or relative to cwd)
|
|
offset: Line number to start from (1-indexed, default 1)
|
|
limit: Maximum lines to return (default 500, max 2000)
|
|
|
|
Returns:
|
|
ReadResult with content, metadata, or error info
|
|
"""
|
|
# Expand ~ and other shell paths
|
|
path = self._expand_path(path)
|
|
|
|
offset, limit = normalize_read_pagination(offset, limit)
|
|
|
|
# Check if file exists and get size (wc -c is POSIX, works on Linux + macOS)
|
|
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
|
|
stat_result = self._exec(stat_cmd)
|
|
|
|
if stat_result.exit_code != 0:
|
|
# File not found - try to suggest similar files
|
|
return self._suggest_similar_files(path)
|
|
|
|
try:
|
|
file_size = int(stat_result.stdout.strip())
|
|
except ValueError:
|
|
file_size = 0
|
|
|
|
# Check if file is too large
|
|
if file_size > MAX_FILE_SIZE:
|
|
# Still try to read, but warn
|
|
pass
|
|
|
|
# Images are never inlined — redirect to the vision tool
|
|
if self._is_image(path):
|
|
return ReadResult(
|
|
is_image=True,
|
|
is_binary=True,
|
|
file_size=file_size,
|
|
hint=(
|
|
"Image file detected. Automatically redirected to vision_analyze tool. "
|
|
"Use vision_analyze with this file path to inspect the image contents."
|
|
),
|
|
)
|
|
|
|
# Read a sample to check for binary content
|
|
sample_cmd = f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null"
|
|
sample_result = self._exec(sample_cmd)
|
|
|
|
if self._is_likely_binary(path, sample_result.stdout):
|
|
return ReadResult(
|
|
is_binary=True,
|
|
file_size=file_size,
|
|
error="Binary file - cannot display as text. Use appropriate tools to handle this file type."
|
|
)
|
|
|
|
# Read with pagination using sed
|
|
end_line = offset + limit - 1
|
|
read_cmd = f"sed -n '{offset},{end_line}p' {self._escape_shell_arg(path)}"
|
|
read_result = self._exec(read_cmd)
|
|
|
|
if read_result.exit_code != 0:
|
|
return ReadResult(error=f"Failed to read file: {read_result.stdout}")
|
|
|
|
# Get total line count
|
|
wc_cmd = f"wc -l < {self._escape_shell_arg(path)}"
|
|
wc_result = self._exec(wc_cmd)
|
|
try:
|
|
total_lines = int(wc_result.stdout.strip())
|
|
except ValueError:
|
|
total_lines = 0
|
|
|
|
# Check if truncated
|
|
truncated = total_lines > end_line
|
|
hint = None
|
|
if truncated:
|
|
hint = f"Use offset={end_line + 1} to continue reading (showing {offset}-{end_line} of {total_lines} lines)"
|
|
|
|
return ReadResult(
|
|
content=self._add_line_numbers(read_result.stdout, offset),
|
|
total_lines=total_lines,
|
|
file_size=file_size,
|
|
truncated=truncated,
|
|
hint=hint
|
|
)
|
|
|
|
def _suggest_similar_files(self, path: str) -> ReadResult:
|
|
"""Suggest similar files when the requested file is not found."""
|
|
dir_path = os.path.dirname(path) or "."
|
|
filename = os.path.basename(path)
|
|
basename_no_ext = os.path.splitext(filename)[0]
|
|
ext = os.path.splitext(filename)[1].lower()
|
|
lower_name = filename.lower()
|
|
|
|
# List files in the target directory
|
|
ls_cmd = f"ls -1 {self._escape_shell_arg(dir_path)} 2>/dev/null | head -50"
|
|
ls_result = self._exec(ls_cmd)
|
|
|
|
scored: list = [] # (score, filepath) — higher is better
|
|
if ls_result.exit_code == 0 and ls_result.stdout.strip():
|
|
for f in ls_result.stdout.strip().split('\n'):
|
|
if not f:
|
|
continue
|
|
lf = f.lower()
|
|
score = 0
|
|
|
|
# Exact match (shouldn't happen, but guard)
|
|
if lf == lower_name:
|
|
score = 100
|
|
# Same base name, different extension (e.g. config.yml vs config.yaml)
|
|
elif os.path.splitext(f)[0].lower() == basename_no_ext.lower():
|
|
score = 90
|
|
# Target is prefix of candidate or vice-versa
|
|
elif lf.startswith(lower_name) or lower_name.startswith(lf):
|
|
score = 70
|
|
# Substring match (candidate contains query)
|
|
elif lower_name in lf:
|
|
score = 60
|
|
# Reverse substring (query contains candidate name)
|
|
elif lf in lower_name and len(lf) > 2:
|
|
score = 40
|
|
# Same extension with some overlap
|
|
elif ext and os.path.splitext(f)[1].lower() == ext:
|
|
common = set(lower_name) & set(lf)
|
|
if len(common) >= max(len(lower_name), len(lf)) * 0.4:
|
|
score = 30
|
|
|
|
if score > 0:
|
|
scored.append((score, os.path.join(dir_path, f)))
|
|
|
|
scored.sort(key=lambda x: -x[0])
|
|
similar = [fp for _, fp in scored[:5]]
|
|
|
|
return ReadResult(
|
|
error=f"File not found: {path}",
|
|
similar_files=similar
|
|
)
|
|
|
|
def read_file_raw(self, path: str) -> ReadResult:
|
|
"""Read the complete file content as a plain string.
|
|
|
|
No pagination, no line-number prefixes, no per-line truncation.
|
|
Uses cat so the full file is returned regardless of size.
|
|
"""
|
|
path = self._expand_path(path)
|
|
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
|
|
stat_result = self._exec(stat_cmd)
|
|
if stat_result.exit_code != 0:
|
|
return self._suggest_similar_files(path)
|
|
try:
|
|
file_size = int(stat_result.stdout.strip())
|
|
except ValueError:
|
|
file_size = 0
|
|
if self._is_image(path):
|
|
return ReadResult(is_image=True, is_binary=True, file_size=file_size)
|
|
sample_result = self._exec(f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null")
|
|
if self._is_likely_binary(path, sample_result.stdout):
|
|
return ReadResult(
|
|
is_binary=True, file_size=file_size,
|
|
error="Binary file — cannot display as text."
|
|
)
|
|
cat_result = self._exec(f"cat {self._escape_shell_arg(path)}")
|
|
if cat_result.exit_code != 0:
|
|
return ReadResult(error=f"Failed to read file: {cat_result.stdout}")
|
|
return ReadResult(content=cat_result.stdout, file_size=file_size)
|
|
|
|
def delete_file(self, path: str) -> WriteResult:
|
|
"""Delete a file via rm."""
|
|
path = self._expand_path(path)
|
|
if _is_write_denied(path):
|
|
return WriteResult(error=f"Delete denied: {path} is a protected path")
|
|
result = self._exec(f"rm -f {self._escape_shell_arg(path)}")
|
|
if result.exit_code != 0:
|
|
return WriteResult(error=f"Failed to delete {path}: {result.stdout}")
|
|
return WriteResult()
|
|
|
|
def move_file(self, src: str, dst: str) -> WriteResult:
|
|
"""Move a file via mv."""
|
|
src = self._expand_path(src)
|
|
dst = self._expand_path(dst)
|
|
for p in (src, dst):
|
|
if _is_write_denied(p):
|
|
return WriteResult(error=f"Move denied: {p} is a protected path")
|
|
result = self._exec(
|
|
f"mv {self._escape_shell_arg(src)} {self._escape_shell_arg(dst)}"
|
|
)
|
|
if result.exit_code != 0:
|
|
return WriteResult(error=f"Failed to move {src} -> {dst}: {result.stdout}")
|
|
return WriteResult()
|
|
|
|
# =========================================================================
|
|
# WRITE Implementation
|
|
# =========================================================================
|
|
|
|
def write_file(self, path: str, content: str) -> WriteResult:
|
|
"""
|
|
Write content to a file, creating parent directories as needed.
|
|
|
|
Pipes content through stdin to avoid OS ARG_MAX limits on large
|
|
files. The content never appears in the shell command string —
|
|
only the file path does.
|
|
|
|
Args:
|
|
path: File path to write
|
|
content: Content to write
|
|
|
|
Returns:
|
|
WriteResult with bytes written or error
|
|
"""
|
|
# Expand ~ and other shell paths
|
|
path = self._expand_path(path)
|
|
|
|
# Block writes to sensitive paths
|
|
if _is_write_denied(path):
|
|
return WriteResult(error=f"Write denied: '{path}' is a protected system/credential file.")
|
|
|
|
# Create parent directories
|
|
parent = os.path.dirname(path)
|
|
dirs_created = False
|
|
|
|
if parent:
|
|
mkdir_cmd = f"mkdir -p {self._escape_shell_arg(parent)}"
|
|
mkdir_result = self._exec(mkdir_cmd)
|
|
if mkdir_result.exit_code == 0:
|
|
dirs_created = True
|
|
|
|
# Write via stdin pipe — content bypasses shell arg parsing entirely,
|
|
# so there's no ARG_MAX limit regardless of file size.
|
|
write_cmd = f"cat > {self._escape_shell_arg(path)}"
|
|
write_result = self._exec(write_cmd, stdin_data=content)
|
|
|
|
if write_result.exit_code != 0:
|
|
return WriteResult(error=f"Failed to write file: {write_result.stdout}")
|
|
|
|
# Get bytes written (wc -c is POSIX, works on Linux + macOS)
|
|
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
|
|
stat_result = self._exec(stat_cmd)
|
|
|
|
try:
|
|
bytes_written = int(stat_result.stdout.strip())
|
|
except ValueError:
|
|
bytes_written = len(content.encode('utf-8'))
|
|
|
|
return WriteResult(
|
|
bytes_written=bytes_written,
|
|
dirs_created=dirs_created
|
|
)
|
|
|
|
# =========================================================================
|
|
# PATCH Implementation (Replace Mode)
|
|
# =========================================================================
|
|
|
|
def patch_replace(self, path: str, old_string: str, new_string: str,
|
|
replace_all: bool = False) -> PatchResult:
|
|
"""
|
|
Replace text in a file using fuzzy matching.
|
|
|
|
Args:
|
|
path: File path to modify
|
|
old_string: Text to find (must be unique unless replace_all=True)
|
|
new_string: Replacement text
|
|
replace_all: If True, replace all occurrences
|
|
|
|
Returns:
|
|
PatchResult with diff and lint results
|
|
"""
|
|
# Expand ~ and other shell paths
|
|
path = self._expand_path(path)
|
|
|
|
# Block writes to sensitive paths
|
|
if _is_write_denied(path):
|
|
return PatchResult(error=f"Write denied: '{path}' is a protected system/credential file.")
|
|
|
|
# Read current content
|
|
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
|
|
read_result = self._exec(read_cmd)
|
|
|
|
if read_result.exit_code != 0:
|
|
return PatchResult(error=f"Failed to read file: {path}")
|
|
|
|
content = read_result.stdout
|
|
|
|
# Import and use fuzzy matching
|
|
from tools.fuzzy_match import fuzzy_find_and_replace
|
|
|
|
new_content, match_count, _strategy, error = fuzzy_find_and_replace(
|
|
content, old_string, new_string, replace_all
|
|
)
|
|
|
|
if error or match_count == 0:
|
|
err_msg = error or f"Could not find match for old_string in {path}"
|
|
try:
|
|
from tools.fuzzy_match import format_no_match_hint
|
|
err_msg += format_no_match_hint(err_msg, match_count, old_string, content)
|
|
except Exception:
|
|
pass
|
|
return PatchResult(error=err_msg)
|
|
# Write back
|
|
write_result = self.write_file(path, new_content)
|
|
if write_result.error:
|
|
return PatchResult(error=f"Failed to write changes: {write_result.error}")
|
|
|
|
# Post-write verification — re-read the file and confirm the bytes we
|
|
# intended to write actually landed. Catches silent persistence
|
|
# failures (backend FS oddities, race with another task, truncated
|
|
# pipe, etc.) that would otherwise return success-with-diff while the
|
|
# file is unchanged on disk.
|
|
verify_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
|
|
verify_result = self._exec(verify_cmd)
|
|
if verify_result.exit_code != 0:
|
|
return PatchResult(error=f"Post-write verification failed: could not re-read {path}")
|
|
if verify_result.stdout != new_content:
|
|
return PatchResult(error=(
|
|
f"Post-write verification failed for {path}: on-disk content "
|
|
f"differs from intended write "
|
|
f"(wrote {len(new_content)} chars, read back {len(verify_result.stdout)}). "
|
|
"The patch did not persist. Re-read the file and try again."
|
|
))
|
|
|
|
# Generate diff
|
|
diff = self._unified_diff(content, new_content, path)
|
|
|
|
# Auto-lint
|
|
lint_result = self._check_lint(path)
|
|
|
|
return PatchResult(
|
|
success=True,
|
|
diff=diff,
|
|
files_modified=[path],
|
|
lint=lint_result.to_dict() if lint_result else None
|
|
)
|
|
|
|
def patch_v4a(self, patch_content: str) -> PatchResult:
|
|
"""
|
|
Apply a V4A format patch.
|
|
|
|
V4A format:
|
|
*** Begin Patch
|
|
*** Update File: path/to/file.py
|
|
@@ context hint @@
|
|
context line
|
|
-removed line
|
|
+added line
|
|
*** End Patch
|
|
|
|
Args:
|
|
patch_content: V4A format patch string
|
|
|
|
Returns:
|
|
PatchResult with changes made
|
|
"""
|
|
# Import patch parser
|
|
from tools.patch_parser import parse_v4a_patch, apply_v4a_operations
|
|
|
|
operations, parse_error = parse_v4a_patch(patch_content)
|
|
if parse_error:
|
|
return PatchResult(error=f"Failed to parse patch: {parse_error}")
|
|
|
|
# Apply operations
|
|
result = apply_v4a_operations(operations, self)
|
|
return result
|
|
|
|
def _check_lint(self, path: str) -> LintResult:
|
|
"""
|
|
Run syntax check on a file after editing.
|
|
|
|
Args:
|
|
path: File path to lint
|
|
|
|
Returns:
|
|
LintResult with status and any errors
|
|
"""
|
|
ext = os.path.splitext(path)[1].lower()
|
|
|
|
if ext not in LINTERS:
|
|
return LintResult(skipped=True, message=f"No linter for {ext} files")
|
|
|
|
# Check if linter command is available
|
|
linter_cmd = LINTERS[ext]
|
|
# Extract the base command (first word)
|
|
base_cmd = linter_cmd.split()[0]
|
|
|
|
if not self._has_command(base_cmd):
|
|
return LintResult(skipped=True, message=f"{base_cmd} not available")
|
|
|
|
# Run linter
|
|
cmd = linter_cmd.replace("{file}", self._escape_shell_arg(path))
|
|
result = self._exec(cmd, timeout=30)
|
|
|
|
return LintResult(
|
|
success=result.exit_code == 0,
|
|
output=result.stdout.strip() if result.stdout.strip() else ""
|
|
)
|
|
|
|
# =========================================================================
|
|
# SEARCH Implementation
|
|
# =========================================================================
|
|
|
|
def search(self, pattern: str, path: str = ".", target: str = "content",
|
|
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
|
|
output_mode: str = "content", context: int = 0) -> SearchResult:
|
|
"""
|
|
Search for content or files.
|
|
|
|
Args:
|
|
pattern: Regex (for content) or glob pattern (for files)
|
|
path: Directory/file to search (default: cwd)
|
|
target: "content" (grep) or "files" (glob)
|
|
file_glob: File pattern filter for content search (e.g., "*.py")
|
|
limit: Max results (default 50)
|
|
offset: Skip first N results
|
|
output_mode: "content", "files_only", or "count"
|
|
context: Lines of context around matches
|
|
|
|
Returns:
|
|
SearchResult with matches or file list
|
|
"""
|
|
offset, limit = normalize_search_pagination(offset, limit)
|
|
|
|
# Expand ~ and other shell paths
|
|
path = self._expand_path(path)
|
|
|
|
# Validate that the path exists before searching
|
|
check = self._exec(f"test -e {self._escape_shell_arg(path)} && echo exists || echo not_found")
|
|
if "not_found" in check.stdout:
|
|
# Try to suggest nearby paths
|
|
parent = os.path.dirname(path) or "."
|
|
basename_query = os.path.basename(path)
|
|
hint_parts = [f"Path not found: {path}"]
|
|
# Check if parent directory exists and list similar entries
|
|
parent_check = self._exec(
|
|
f"test -d {self._escape_shell_arg(parent)} && echo yes || echo no"
|
|
)
|
|
if "yes" in parent_check.stdout and basename_query:
|
|
ls_result = self._exec(
|
|
f"ls -1 {self._escape_shell_arg(parent)} 2>/dev/null | head -20"
|
|
)
|
|
if ls_result.exit_code == 0 and ls_result.stdout.strip():
|
|
lower_q = basename_query.lower()
|
|
candidates = []
|
|
for entry in ls_result.stdout.strip().split('\n'):
|
|
if not entry:
|
|
continue
|
|
le = entry.lower()
|
|
if lower_q in le or le in lower_q or le.startswith(lower_q[:3]):
|
|
candidates.append(os.path.join(parent, entry))
|
|
if candidates:
|
|
hint_parts.append(
|
|
"Similar paths: " + ", ".join(candidates[:5])
|
|
)
|
|
return SearchResult(
|
|
error=". ".join(hint_parts),
|
|
total_count=0
|
|
)
|
|
|
|
if target == "files":
|
|
return self._search_files(pattern, path, limit, offset)
|
|
else:
|
|
return self._search_content(pattern, path, file_glob, limit, offset,
|
|
output_mode, context)
|
|
|
|
def _search_files(self, pattern: str, path: str, limit: int, offset: int) -> SearchResult:
|
|
"""Search for files by name pattern (glob-like)."""
|
|
# Auto-prepend **/ for recursive search if not already present
|
|
if not pattern.startswith('**/') and '/' not in pattern:
|
|
search_pattern = pattern
|
|
else:
|
|
search_pattern = pattern.split('/')[-1]
|
|
|
|
# Prefer ripgrep: respects .gitignore, excludes hidden dirs by
|
|
# default, and has parallel directory traversal (~200x faster than
|
|
# find on wide trees). Mirrors _search_content which already uses rg.
|
|
if self._has_command('rg'):
|
|
return self._search_files_rg(search_pattern, path, limit, offset)
|
|
|
|
# Fallback: find (slower, no .gitignore awareness)
|
|
if not self._has_command('find'):
|
|
return SearchResult(
|
|
error="File search requires 'rg' (ripgrep) or 'find'. "
|
|
"Install ripgrep for best results: "
|
|
"https://github.com/BurntSushi/ripgrep#installation"
|
|
)
|
|
|
|
# Exclude hidden directories (matching ripgrep's default behavior).
|
|
hidden_exclude = "-not -path '*/.*'"
|
|
|
|
cmd = f"find {self._escape_shell_arg(path)} {hidden_exclude} -type f -name {self._escape_shell_arg(search_pattern)} " \
|
|
f"-printf '%T@ %p\\n' 2>/dev/null | sort -rn | tail -n +{offset + 1} | head -n {limit}"
|
|
|
|
result = self._exec(cmd, timeout=60)
|
|
|
|
if not result.stdout.strip():
|
|
# Try without -printf (BSD find compatibility -- macOS)
|
|
cmd_simple = f"find {self._escape_shell_arg(path)} {hidden_exclude} -type f -name {self._escape_shell_arg(search_pattern)} " \
|
|
f"2>/dev/null | head -n {limit + offset} | tail -n +{offset + 1}"
|
|
result = self._exec(cmd_simple, timeout=60)
|
|
|
|
files = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
parts = line.split(' ', 1)
|
|
if len(parts) == 2 and parts[0].replace('.', '').isdigit():
|
|
files.append(parts[1])
|
|
else:
|
|
files.append(line)
|
|
|
|
return SearchResult(
|
|
files=files,
|
|
total_count=len(files)
|
|
)
|
|
|
|
def _search_files_rg(self, pattern: str, path: str, limit: int, offset: int) -> SearchResult:
|
|
"""Search for files by name using ripgrep's --files mode.
|
|
|
|
rg --files respects .gitignore and excludes hidden directories by
|
|
default, and uses parallel directory traversal for ~200x speedup
|
|
over find on wide trees. Results are sorted by modification time
|
|
(most recently edited first) when rg >= 13.0 supports --sortr.
|
|
"""
|
|
# rg --files -g uses glob patterns; wrap bare names so they match
|
|
# at any depth (equivalent to find -name).
|
|
if '/' not in pattern and not pattern.startswith('*'):
|
|
glob_pattern = f"*{pattern}"
|
|
else:
|
|
glob_pattern = pattern
|
|
|
|
fetch_limit = limit + offset
|
|
# Try mtime-sorted first (rg 13+); fall back to unsorted if not supported.
|
|
cmd_sorted = (
|
|
f"rg --files --sortr=modified -g {self._escape_shell_arg(glob_pattern)} "
|
|
f"{self._escape_shell_arg(path)} 2>/dev/null "
|
|
f"| head -n {fetch_limit}"
|
|
)
|
|
result = self._exec(cmd_sorted, timeout=60)
|
|
all_files = [f for f in result.stdout.strip().split('\n') if f]
|
|
|
|
if not all_files:
|
|
# --sortr may have failed on older rg; retry without it.
|
|
cmd_plain = (
|
|
f"rg --files -g {self._escape_shell_arg(glob_pattern)} "
|
|
f"{self._escape_shell_arg(path)} 2>/dev/null "
|
|
f"| head -n {fetch_limit}"
|
|
)
|
|
result = self._exec(cmd_plain, timeout=60)
|
|
all_files = [f for f in result.stdout.strip().split('\n') if f]
|
|
|
|
page = all_files[offset:offset + limit]
|
|
|
|
return SearchResult(
|
|
files=page,
|
|
total_count=len(all_files),
|
|
truncated=len(all_files) >= fetch_limit,
|
|
)
|
|
|
|
def _search_content(self, pattern: str, path: str, file_glob: Optional[str],
|
|
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
|
|
"""Search for content inside files (grep-like)."""
|
|
# Try ripgrep first (fast), fallback to grep (slower but works)
|
|
if self._has_command('rg'):
|
|
return self._search_with_rg(pattern, path, file_glob, limit, offset,
|
|
output_mode, context)
|
|
elif self._has_command('grep'):
|
|
return self._search_with_grep(pattern, path, file_glob, limit, offset,
|
|
output_mode, context)
|
|
else:
|
|
# Neither rg nor grep available (Windows without Git Bash, etc.)
|
|
return SearchResult(
|
|
error="Content search requires ripgrep (rg) or grep. "
|
|
"Install ripgrep: https://github.com/BurntSushi/ripgrep#installation"
|
|
)
|
|
|
|
def _search_with_rg(self, pattern: str, path: str, file_glob: Optional[str],
|
|
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
|
|
"""Search using ripgrep."""
|
|
cmd_parts = ["rg", "--line-number", "--no-heading", "--with-filename"]
|
|
|
|
# Add context if requested
|
|
if context > 0:
|
|
cmd_parts.extend(["-C", str(context)])
|
|
|
|
# Add file glob filter (must be quoted to prevent shell expansion)
|
|
if file_glob:
|
|
cmd_parts.extend(["--glob", self._escape_shell_arg(file_glob)])
|
|
|
|
# Output mode handling
|
|
if output_mode == "files_only":
|
|
cmd_parts.append("-l") # Files only
|
|
elif output_mode == "count":
|
|
cmd_parts.append("-c") # Count per file
|
|
|
|
# Add pattern and path
|
|
cmd_parts.append(self._escape_shell_arg(pattern))
|
|
cmd_parts.append(self._escape_shell_arg(path))
|
|
|
|
# Fetch extra rows so we can report the true total before slicing.
|
|
# For context mode, rg emits separator lines ("--") between groups,
|
|
# so we grab generously and filter in Python.
|
|
fetch_limit = limit + offset + 200 if context > 0 else limit + offset
|
|
cmd_parts.extend(["|", "head", "-n", str(fetch_limit)])
|
|
|
|
cmd = " ".join(cmd_parts)
|
|
result = self._exec(cmd, timeout=60)
|
|
|
|
# rg exit codes: 0=matches found, 1=no matches, 2=error
|
|
if result.exit_code == 2 and not result.stdout.strip():
|
|
error_msg = result.stderr.strip() if hasattr(result, 'stderr') and result.stderr else "Search error"
|
|
return SearchResult(error=f"Search failed: {error_msg}", total_count=0)
|
|
|
|
# Parse results based on output mode
|
|
if output_mode == "files_only":
|
|
all_files = [f for f in result.stdout.strip().split('\n') if f]
|
|
total = len(all_files)
|
|
page = all_files[offset:offset + limit]
|
|
return SearchResult(files=page, total_count=total)
|
|
|
|
elif output_mode == "count":
|
|
counts = {}
|
|
for line in result.stdout.strip().split('\n'):
|
|
if ':' in line:
|
|
parts = line.rsplit(':', 1)
|
|
if len(parts) == 2:
|
|
try:
|
|
counts[parts[0]] = int(parts[1])
|
|
except ValueError:
|
|
pass
|
|
return SearchResult(counts=counts, total_count=sum(counts.values()))
|
|
|
|
else:
|
|
# Parse content matches and context lines.
|
|
# rg match lines: "file:lineno:content" (colon separator)
|
|
# rg context lines: "file-lineno-content" (dash separator)
|
|
# rg group seps: "--"
|
|
# Note: on Windows, paths contain drive letters (e.g. C:\path),
|
|
# so naive split(":") breaks. Use regex to handle both platforms.
|
|
_match_re = re.compile(r'^([A-Za-z]:)?(.*?):(\d+):(.*)$')
|
|
_ctx_re = re.compile(r'^([A-Za-z]:)?(.*?)-(\d+)-(.*)$')
|
|
matches = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line or line == "--":
|
|
continue
|
|
|
|
# Try match line first (colon-separated: file:line:content)
|
|
m = _match_re.match(line)
|
|
if m:
|
|
matches.append(SearchMatch(
|
|
path=(m.group(1) or '') + m.group(2),
|
|
line_number=int(m.group(3)),
|
|
content=m.group(4)[:500]
|
|
))
|
|
continue
|
|
|
|
# Try context line (dash-separated: file-line-content)
|
|
# Only attempt if context was requested to avoid false positives
|
|
if context > 0:
|
|
m = _ctx_re.match(line)
|
|
if m:
|
|
matches.append(SearchMatch(
|
|
path=(m.group(1) or '') + m.group(2),
|
|
line_number=int(m.group(3)),
|
|
content=m.group(4)[:500]
|
|
))
|
|
|
|
total = len(matches)
|
|
page = matches[offset:offset + limit]
|
|
return SearchResult(
|
|
matches=page,
|
|
total_count=total,
|
|
truncated=total > offset + limit
|
|
)
|
|
|
|
def _search_with_grep(self, pattern: str, path: str, file_glob: Optional[str],
|
|
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
|
|
"""Fallback search using grep."""
|
|
cmd_parts = ["grep", "-rnH"] # -H forces filename even for single-file searches
|
|
|
|
# Exclude hidden directories (matching ripgrep's default behavior).
|
|
# This prevents searching inside .hub/index-cache/, .git/, etc.
|
|
cmd_parts.append("--exclude-dir='.*'")
|
|
|
|
# Add context if requested
|
|
if context > 0:
|
|
cmd_parts.extend(["-C", str(context)])
|
|
|
|
# Add file pattern filter (must be quoted to prevent shell expansion)
|
|
if file_glob:
|
|
cmd_parts.extend(["--include", self._escape_shell_arg(file_glob)])
|
|
|
|
# Output mode handling
|
|
if output_mode == "files_only":
|
|
cmd_parts.append("-l")
|
|
elif output_mode == "count":
|
|
cmd_parts.append("-c")
|
|
|
|
# Add pattern and path
|
|
cmd_parts.append(self._escape_shell_arg(pattern))
|
|
cmd_parts.append(self._escape_shell_arg(path))
|
|
|
|
# Fetch generously so we can compute total before slicing
|
|
fetch_limit = limit + offset + (200 if context > 0 else 0)
|
|
cmd_parts.extend(["|", "head", "-n", str(fetch_limit)])
|
|
|
|
cmd = " ".join(cmd_parts)
|
|
result = self._exec(cmd, timeout=60)
|
|
|
|
# grep exit codes: 0=matches found, 1=no matches, 2=error
|
|
if result.exit_code == 2 and not result.stdout.strip():
|
|
error_msg = result.stderr.strip() if hasattr(result, 'stderr') and result.stderr else "Search error"
|
|
return SearchResult(error=f"Search failed: {error_msg}", total_count=0)
|
|
|
|
if output_mode == "files_only":
|
|
all_files = [f for f in result.stdout.strip().split('\n') if f]
|
|
total = len(all_files)
|
|
page = all_files[offset:offset + limit]
|
|
return SearchResult(files=page, total_count=total)
|
|
|
|
elif output_mode == "count":
|
|
counts = {}
|
|
for line in result.stdout.strip().split('\n'):
|
|
if ':' in line:
|
|
parts = line.rsplit(':', 1)
|
|
if len(parts) == 2:
|
|
try:
|
|
counts[parts[0]] = int(parts[1])
|
|
except ValueError:
|
|
pass
|
|
return SearchResult(counts=counts, total_count=sum(counts.values()))
|
|
|
|
else:
|
|
# grep match lines: "file:lineno:content" (colon)
|
|
# grep context lines: "file-lineno-content" (dash)
|
|
# grep group seps: "--"
|
|
# Note: on Windows, paths contain drive letters (e.g. C:\path),
|
|
# so naive split(":") breaks. Use regex to handle both platforms.
|
|
_match_re = re.compile(r'^([A-Za-z]:)?(.*?):(\d+):(.*)$')
|
|
_ctx_re = re.compile(r'^([A-Za-z]:)?(.*?)-(\d+)-(.*)$')
|
|
matches = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line or line == "--":
|
|
continue
|
|
|
|
m = _match_re.match(line)
|
|
if m:
|
|
matches.append(SearchMatch(
|
|
path=(m.group(1) or '') + m.group(2),
|
|
line_number=int(m.group(3)),
|
|
content=m.group(4)[:500]
|
|
))
|
|
continue
|
|
|
|
if context > 0:
|
|
m = _ctx_re.match(line)
|
|
if m:
|
|
matches.append(SearchMatch(
|
|
path=(m.group(1) or '') + m.group(2),
|
|
line_number=int(m.group(3)),
|
|
content=m.group(4)[:500]
|
|
))
|
|
|
|
|
|
total = len(matches)
|
|
page = matches[offset:offset + limit]
|
|
return SearchResult(
|
|
matches=page,
|
|
total_count=total,
|
|
truncated=total > offset + limit
|
|
)
|