#!/usr/bin/env python3 """File Tools Module - LLM agent file manipulation tools.""" import errno import json import logging import os import threading from pathlib import Path from typing import Optional from agent.file_safety import get_read_block_error from tools.binary_extensions import has_binary_extension from tools.file_operations import ( ShellFileOperations, normalize_read_pagination, normalize_search_pagination, ) from tools import file_state from agent.redact import redact_sensitive_text logger = logging.getLogger(__name__) _EXPECTED_WRITE_ERRNOS = {errno.EACCES, errno.EPERM, errno.EROFS} # --------------------------------------------------------------------------- # Read-size guard: cap the character count returned to the model. # We're model-agnostic so we can't count tokens; characters are a safe proxy. # 100K chars ≈ 25–35K tokens across typical tokenisers. Files larger than # this in a single read are a context-window hazard — the model should use # offset+limit to read the relevant section. # # Configurable via config.yaml: file_read_max_chars: 200000 # --------------------------------------------------------------------------- _DEFAULT_MAX_READ_CHARS = 100_000 _max_read_chars_cached: int | None = None def _get_max_read_chars() -> int: """Return the configured max characters per file read. Reads ``file_read_max_chars`` from config.yaml on first call, caches the result for the lifetime of the process. Falls back to the built-in default if the config is missing or invalid. """ global _max_read_chars_cached if _max_read_chars_cached is not None: return _max_read_chars_cached try: from hermes_cli.config import load_config cfg = load_config() val = cfg.get("file_read_max_chars") if isinstance(val, (int, float)) and val > 0: _max_read_chars_cached = int(val) return _max_read_chars_cached except Exception: pass _max_read_chars_cached = _DEFAULT_MAX_READ_CHARS return _max_read_chars_cached # If the total file size exceeds this AND the caller didn't specify a narrow # range (limit <= 200), we include a hint encouraging targeted reads. _LARGE_FILE_HINT_BYTES = 512_000 # 512 KB # --------------------------------------------------------------------------- # Device path blocklist — reading these hangs the process (infinite output # or blocking on input). Checked by path only (no I/O). # --------------------------------------------------------------------------- _BLOCKED_DEVICE_PATHS = frozenset({ # Infinite output — never reach EOF "/dev/zero", "/dev/random", "/dev/urandom", "/dev/full", # Blocks waiting for input "/dev/stdin", "/dev/tty", "/dev/console", # Nonsensical to read "/dev/stdout", "/dev/stderr", # fd aliases "/dev/fd/0", "/dev/fd/1", "/dev/fd/2", }) def _resolve_path(filepath: str, task_id: str = "default") -> Path: """Resolve a path relative to TERMINAL_CWD (the worktree base directory) instead of the main repository root. """ return _resolve_path_for_task(filepath, task_id) def _get_live_tracking_cwd(task_id: str = "default") -> str | None: """Return the task's live terminal cwd for bookkeeping when available.""" with _file_ops_lock: cached = _file_ops_cache.get(task_id) if cached is not None: live_cwd = getattr(getattr(cached, "env", None), "cwd", None) or getattr( cached, "cwd", None ) if live_cwd: return live_cwd try: from tools.terminal_tool import _active_environments, _env_lock with _env_lock: env = _active_environments.get(task_id) live_cwd = getattr(env, "cwd", None) if env is not None else None if live_cwd: return live_cwd except Exception: pass return None def _resolve_path_for_task(filepath: str, task_id: str = "default") -> Path: """Resolve *filepath* against the task's live terminal cwd when possible.""" p = Path(filepath).expanduser() if not p.is_absolute(): base = _get_live_tracking_cwd(task_id) or os.environ.get( "TERMINAL_CWD", os.getcwd() ) p = Path(base) / p return p.resolve() def _is_blocked_device(filepath: str) -> bool: """Return True if the path would hang the process (infinite output or blocking input). Uses the *literal* path — no symlink resolution — because the model specifies paths directly and realpath follows symlinks all the way through (e.g. /dev/stdin → /proc/self/fd/0 → /dev/pts/0), defeating the check. """ normalized = os.path.expanduser(filepath) if normalized in _BLOCKED_DEVICE_PATHS: return True # /proc/self/fd/0-2 and /proc//fd/0-2 are Linux aliases for stdio if normalized.startswith("/proc/") and normalized.endswith( ("/fd/0", "/fd/1", "/fd/2") ): return True return False # Paths that file tools should refuse to write to without going through the # terminal tool's approval system. These match prefixes after os.path.realpath. _SENSITIVE_PATH_PREFIXES = ( "/etc/", "/boot/", "/usr/lib/systemd/", "/private/etc/", "/private/var/", ) _SENSITIVE_EXACT_PATHS = {"/var/run/docker.sock", "/run/docker.sock"} def _check_sensitive_path(filepath: str, task_id: str = "default") -> str | None: """Return an error message if the path targets a sensitive system location.""" try: resolved = str(_resolve_path_for_task(filepath, task_id)) except (OSError, ValueError): resolved = filepath normalized = os.path.normpath(os.path.expanduser(filepath)) _err = ( f"Refusing to write to sensitive system path: {filepath}\n" "Use the terminal tool with sudo if you need to modify system files." ) for prefix in _SENSITIVE_PATH_PREFIXES: if resolved.startswith(prefix) or normalized.startswith(prefix): return _err if resolved in _SENSITIVE_EXACT_PATHS or normalized in _SENSITIVE_EXACT_PATHS: return _err return None def _is_expected_write_exception(exc: Exception) -> bool: """Return True for expected write denials that should not hit error logs.""" if isinstance(exc, PermissionError): return True if isinstance(exc, OSError) and exc.errno in _EXPECTED_WRITE_ERRNOS: return True return False _file_ops_lock = threading.Lock() _file_ops_cache: dict = {} # Track files read per task to detect re-read loops and deduplicate reads. # Per task_id we store: # "last_key": the key of the most recent read/search call (or None) # "consecutive": how many times that exact call has been repeated in a row # "read_history": set of (path, offset, limit) tuples for get_read_files_summary # "dedup": dict mapping (resolved_path, offset, limit) → mtime float # Used to skip re-reads of unchanged files. Reset on # context compression (the original content is summarised # away so the model needs the full content again). # "read_timestamps": dict mapping resolved_path → modification-time float # recorded when the file was last read (or written) by # this task. Used by write_file and patch to detect # external changes between the agent's read and write. # Updated after successful writes so consecutive edits # by the same task don't trigger false warnings. _read_tracker_lock = threading.Lock() _read_tracker: dict = {} # Per-task bounds for the containers inside each _read_tracker[task_id]. # A CLI session uses one stable task_id for its lifetime; without these # caps, a 10k-read session would accumulate ~1.5MB of dict/set state that # is never referenced again (only the most recent reads matter for dedup, # loop detection, and external-edit warnings). Hard caps bound the # accretion to a few hundred KB regardless of session length. _READ_HISTORY_CAP = 500 # set; used only by get_read_files_summary _DEDUP_CAP = 1000 # dict; skip-identical-reread guard _READ_TIMESTAMPS_CAP = 1000 # dict; external-edit detection for write/patch def _cap_read_tracker_data(task_data: dict) -> None: """Enforce size caps on the per-task read-tracker sub-containers. Must be called with ``_read_tracker_lock`` held. Eviction policy: * ``read_history`` (set): pop arbitrary entries on overflow. This is fine because the set only feeds diagnostic summaries; losing old entries just trims the summary's tail. * ``dedup`` / ``read_timestamps`` (dict): pop oldest by insertion order (Python 3.7+ dicts). Evicted entries lose their dedup skip on a future re-read (the file gets re-sent once) and external-edit mtime comparison (the write/patch falls back to a non-mtime check). Both are graceful degradations, not bugs. """ rh = task_data.get("read_history") if rh is not None and len(rh) > _READ_HISTORY_CAP: excess = len(rh) - _READ_HISTORY_CAP for _ in range(excess): try: rh.pop() except KeyError: break dedup = task_data.get("dedup") if dedup is not None and len(dedup) > _DEDUP_CAP: excess = len(dedup) - _DEDUP_CAP for _ in range(excess): try: dedup.pop(next(iter(dedup))) except (StopIteration, KeyError): break ts = task_data.get("read_timestamps") if ts is not None and len(ts) > _READ_TIMESTAMPS_CAP: excess = len(ts) - _READ_TIMESTAMPS_CAP for _ in range(excess): try: ts.pop(next(iter(ts))) except (StopIteration, KeyError): break def _get_file_ops(task_id: str = "default") -> ShellFileOperations: """Get or create ShellFileOperations for a terminal environment. Respects the TERMINAL_ENV setting -- if the task_id doesn't have an environment yet, creates one using the configured backend (local, docker, modal, etc.) rather than always defaulting to local. Thread-safe: uses the same per-task creation locks as terminal_tool to prevent duplicate sandbox creation from concurrent tool calls. """ from tools.terminal_tool import ( _active_environments, _env_lock, _create_environment, _get_env_config, _last_activity, _start_cleanup_thread, _creation_locks, _creation_locks_lock, ) import time # Fast path: check cache -- but also verify the underlying environment # is still alive (it may have been killed by the cleanup thread). with _file_ops_lock: cached = _file_ops_cache.get(task_id) if cached is not None: with _env_lock: if task_id in _active_environments: _last_activity[task_id] = time.time() return cached else: # Environment was cleaned up -- invalidate stale cache entry with _file_ops_lock: _file_ops_cache.pop(task_id, None) # Need to ensure the environment exists before building file_ops. # Acquire per-task lock so only one thread creates the sandbox. with _creation_locks_lock: if task_id not in _creation_locks: _creation_locks[task_id] = threading.Lock() task_lock = _creation_locks[task_id] with task_lock: # Double-check: another thread may have created it while we waited with _env_lock: if task_id in _active_environments: _last_activity[task_id] = time.time() terminal_env = _active_environments[task_id] else: terminal_env = None if terminal_env is None: from tools.terminal_tool import _task_env_overrides config = _get_env_config() env_type = config["env_type"] overrides = _task_env_overrides.get(task_id, {}) if env_type == "docker": image = overrides.get("docker_image") or config["docker_image"] elif env_type == "singularity": image = overrides.get("singularity_image") or config["singularity_image"] elif env_type == "modal": image = overrides.get("modal_image") or config["modal_image"] elif env_type == "daytona": image = overrides.get("daytona_image") or config["daytona_image"] else: image = "" cwd = overrides.get("cwd") or config["cwd"] logger.info("Creating new %s environment for task %s...", env_type, task_id[:8]) container_config = None if env_type in ("docker", "singularity", "modal", "daytona"): container_config = { "container_cpu": config.get("container_cpu", 1), "container_memory": config.get("container_memory", 5120), "container_disk": config.get("container_disk", 51200), "container_persistent": config.get("container_persistent", True), "docker_volumes": config.get("docker_volumes", []), "docker_mount_cwd_to_workspace": config.get("docker_mount_cwd_to_workspace", False), "docker_forward_env": config.get("docker_forward_env", []), } ssh_config = None if env_type == "ssh": ssh_config = { "host": config.get("ssh_host", ""), "user": config.get("ssh_user", ""), "port": config.get("ssh_port", 22), "key": config.get("ssh_key", ""), "persistent": config.get("ssh_persistent", False), } local_config = None if env_type == "local": local_config = { "persistent": config.get("local_persistent", False), } terminal_env = _create_environment( env_type=env_type, image=image, cwd=cwd, timeout=config["timeout"], ssh_config=ssh_config, container_config=container_config, local_config=local_config, task_id=task_id, host_cwd=config.get("host_cwd"), ) with _env_lock: _active_environments[task_id] = terminal_env _last_activity[task_id] = time.time() _start_cleanup_thread() logger.info("%s environment ready for task %s", env_type, task_id[:8]) # Build file_ops from the (guaranteed live) environment and cache it file_ops = ShellFileOperations(terminal_env) with _file_ops_lock: _file_ops_cache[task_id] = file_ops return file_ops def clear_file_ops_cache(task_id: str = None): """Clear the file operations cache.""" with _file_ops_lock: if task_id: _file_ops_cache.pop(task_id, None) else: _file_ops_cache.clear() def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str: """Read a file with pagination and line numbers.""" try: offset, limit = normalize_read_pagination(offset, limit) # ── Device path guard ───────────────────────────────────────── # Block paths that would hang the process (infinite output, # blocking on input). Pure path check — no I/O. if _is_blocked_device(path): return json.dumps({ "error": ( f"Cannot read '{path}': this is a device file that would " "block or produce infinite output." ), }) _resolved = _resolve_path_for_task(path, task_id) # ── Binary file guard ───────────────────────────────────────── # Block binary files by extension (no I/O). if has_binary_extension(str(_resolved)): _ext = _resolved.suffix.lower() return json.dumps({ "error": ( f"Cannot read binary file '{path}' ({_ext}). " "Use vision_analyze for images, or terminal to inspect binary files." ), }) # ── Hermes internal path guard ──────────────────────────────── # Prevent prompt injection via catalog or hub metadata files. block_error = get_read_block_error(path) if block_error: return json.dumps({"error": block_error}) # ── Dedup check ─────────────────────────────────────────────── # If we already read this exact (path, offset, limit) and the # file hasn't been modified since, return a lightweight stub # instead of re-sending the same content. Saves context tokens. resolved_str = str(_resolved) dedup_key = (resolved_str, offset, limit) with _read_tracker_lock: task_data = _read_tracker.setdefault(task_id, { "last_key": None, "consecutive": 0, "read_history": set(), "dedup": {}, }) cached_mtime = task_data.get("dedup", {}).get(dedup_key) if cached_mtime is not None: try: current_mtime = os.path.getmtime(resolved_str) if current_mtime == cached_mtime: return json.dumps({ "content": ( "File unchanged since last read. The content from " "the earlier read_file result in this conversation is " "still current — refer to that instead of re-reading." ), "path": path, "dedup": True, }, ensure_ascii=False) except OSError: pass # stat failed — fall through to full read # ── Perform the read ────────────────────────────────────────── file_ops = _get_file_ops(task_id) result = file_ops.read_file(path, offset, limit) result_dict = result.to_dict() # ── Character-count guard ───────────────────────────────────── # We're model-agnostic so we can't count tokens; characters are # the best proxy we have. If the read produced an unreasonable # amount of content, reject it and tell the model to narrow down. # Note: we check the formatted content (with line-number prefixes), # not the raw file size, because that's what actually enters context. # Check BEFORE redaction to avoid expensive regex on huge content. content_len = len(result.content or "") file_size = result_dict.get("file_size", 0) max_chars = _get_max_read_chars() if content_len > max_chars: total_lines = result_dict.get("total_lines", "unknown") return json.dumps({ "error": ( f"Read produced {content_len:,} characters which exceeds " f"the safety limit ({max_chars:,} chars). " "Use offset and limit to read a smaller range. " f"The file has {total_lines} lines total." ), "path": path, "total_lines": total_lines, "file_size": file_size, }, ensure_ascii=False) # ── Redact secrets (after guard check to skip oversized content) ── if result.content: result.content = redact_sensitive_text(result.content) result_dict["content"] = result.content # Large-file hint: if the file is big and the caller didn't ask # for a narrow window, nudge toward targeted reads. if (file_size and file_size > _LARGE_FILE_HINT_BYTES and limit > 200 and result_dict.get("truncated")): result_dict.setdefault("_hint", ( f"This file is large ({file_size:,} bytes). " "Consider reading only the section you need with offset and limit " "to keep context usage efficient." )) # ── Track for consecutive-loop detection ────────────────────── read_key = ("read", path, offset, limit) with _read_tracker_lock: # Ensure "dedup" key exists (backward compat with old tracker state) if "dedup" not in task_data: task_data["dedup"] = {} task_data["read_history"].add((path, offset, limit)) if task_data["last_key"] == read_key: task_data["consecutive"] += 1 else: task_data["last_key"] = read_key task_data["consecutive"] = 1 count = task_data["consecutive"] # Store mtime at read time for two purposes: # 1. Dedup: skip identical re-reads of unchanged files. # 2. Staleness: warn on write/patch if the file changed since # the agent last read it (external edit, concurrent agent, etc.). try: _mtime_now = os.path.getmtime(resolved_str) task_data["dedup"][dedup_key] = _mtime_now task_data.setdefault("read_timestamps", {})[resolved_str] = _mtime_now except OSError: pass # Can't stat — skip tracking for this entry # Bound the per-task containers so a long CLI session doesn't # accumulate megabytes of dict/set state. See _cap_read_tracker_data. _cap_read_tracker_data(task_data) # Cross-agent file-state registry (separate from per-task read # tracker above): records that THIS agent has read this path so # write/patch can detect sibling-subagent writes that happened # after our read. Partial read when offset>1 or the read was # truncated (large file with more content than limit covered). # Outside the _read_tracker_lock so the registry's own locking # isn't nested under ours. try: _partial = (offset > 1) or bool(result_dict.get("truncated")) file_state.record_read(task_id, resolved_str, partial=_partial) except Exception: logger.debug("file_state.record_read failed", exc_info=True) if count >= 4: # Hard block: stop returning content to break the loop return json.dumps({ "error": ( f"BLOCKED: You have read this exact file region {count} times in a row. " "The content has NOT changed. You already have this information. " "STOP re-reading and proceed with your task." ), "path": path, "already_read": count, }, ensure_ascii=False) elif count >= 3: result_dict["_warning"] = ( f"You have read this exact file region {count} times consecutively. " "The content has not changed since your last read. Use the information you already have. " "If you are stuck in a loop, stop reading and proceed with writing or responding." ) return json.dumps(result_dict, ensure_ascii=False) except Exception as e: return tool_error(str(e)) def reset_file_dedup(task_id: str = None): """Clear the deduplication cache for file reads. Called after context compression — the original read content has been summarised away, so the model needs the full content if it reads the same file again. Without this, reads after compression would return a "file unchanged" stub pointing at content that no longer exists in context. Call with a task_id to clear just that task, or without to clear all. """ with _read_tracker_lock: if task_id: task_data = _read_tracker.get(task_id) if task_data and "dedup" in task_data: task_data["dedup"].clear() else: for task_data in _read_tracker.values(): if "dedup" in task_data: task_data["dedup"].clear() def notify_other_tool_call(task_id: str = "default"): """Reset consecutive read/search counter for a task. Called by the tool dispatcher (model_tools.py) whenever a tool OTHER than read_file / search_files is executed. This ensures we only warn or block on *truly consecutive* repeated reads — if the agent does anything else in between (write, patch, terminal, etc.) the counter resets and the next read is treated as fresh. """ with _read_tracker_lock: task_data = _read_tracker.get(task_id) if task_data: task_data["last_key"] = None task_data["consecutive"] = 0 def _update_read_timestamp(filepath: str, task_id: str) -> None: """Record the file's current modification time after a successful write. Called after write_file and patch so that consecutive edits by the same task don't trigger false staleness warnings — each write refreshes the stored timestamp to match the file's new state. """ try: resolved = str(_resolve_path_for_task(filepath, task_id)) current_mtime = os.path.getmtime(resolved) except (OSError, ValueError): return with _read_tracker_lock: task_data = _read_tracker.get(task_id) if task_data is not None: task_data.setdefault("read_timestamps", {})[resolved] = current_mtime _cap_read_tracker_data(task_data) def _check_file_staleness(filepath: str, task_id: str) -> str | None: """Check whether a file was modified since the agent last read it. Returns a warning string if the file is stale (mtime changed since the last read_file call for this task), or None if the file is fresh or was never read. Does not block — the write still proceeds. """ try: resolved = str(_resolve_path_for_task(filepath, task_id)) except (OSError, ValueError): return None with _read_tracker_lock: task_data = _read_tracker.get(task_id) if not task_data: return None read_mtime = task_data.get("read_timestamps", {}).get(resolved) if read_mtime is None: return None # File was never read — nothing to compare against try: current_mtime = os.path.getmtime(resolved) except OSError: return None # Can't stat — file may have been deleted, let write handle it if current_mtime != read_mtime: return ( f"Warning: {filepath} was modified since you last read it " "(external edit or concurrent agent). The content you read may be " "stale. Consider re-reading the file to verify before writing." ) return None def write_file_tool(path: str, content: str, task_id: str = "default") -> str: """Write content to a file.""" sensitive_err = _check_sensitive_path(path, task_id) if sensitive_err: return tool_error(sensitive_err) try: # Resolve once for the registry lock + stale check. Failures here # fall back to the legacy path — write proceeds, per-task staleness # check below still runs. try: _resolved = str(_resolve_path_for_task(path, task_id)) except Exception: _resolved = None if _resolved is None: stale_warning = _check_file_staleness(path, task_id) file_ops = _get_file_ops(task_id) result = file_ops.write_file(path, content) result_dict = result.to_dict() if stale_warning: result_dict["_warning"] = stale_warning _update_read_timestamp(path, task_id) return json.dumps(result_dict, ensure_ascii=False) # Serialize the read→modify→write region per-path so concurrent # subagents can't interleave on the same file. Different paths # remain fully parallel. with file_state.lock_path(_resolved): # Cross-agent staleness wins over per-task warning when both # fire — its message names the sibling subagent. cross_warning = file_state.check_stale(task_id, _resolved) stale_warning = _check_file_staleness(path, task_id) file_ops = _get_file_ops(task_id) result = file_ops.write_file(path, content) result_dict = result.to_dict() effective_warning = cross_warning or stale_warning if effective_warning: result_dict["_warning"] = effective_warning # Refresh stamps after the successful write so consecutive # writes by this task don't trigger false staleness warnings. _update_read_timestamp(path, task_id) if not result_dict.get("error"): file_state.note_write(task_id, _resolved) return json.dumps(result_dict, ensure_ascii=False) except Exception as e: if _is_expected_write_exception(e): logger.debug("write_file expected denial: %s: %s", type(e).__name__, e) else: logger.error("write_file error: %s: %s", type(e).__name__, e, exc_info=True) return tool_error(str(e)) def patch_tool(mode: str = "replace", path: str = None, old_string: str = None, new_string: str = None, replace_all: bool = False, patch: str = None, task_id: str = "default") -> str: """Patch a file using replace mode or V4A patch format.""" # Check sensitive paths for both replace (explicit path) and V4A patch (extract paths) _paths_to_check = [] if path: _paths_to_check.append(path) if mode == "patch" and patch: import re as _re for _m in _re.finditer(r'^\*\*\*\s+(?:Update|Add|Delete)\s+File:\s*(.+)$', patch, _re.MULTILINE): _paths_to_check.append(_m.group(1).strip()) for _p in _paths_to_check: sensitive_err = _check_sensitive_path(_p, task_id) if sensitive_err: return tool_error(sensitive_err) try: # Resolve paths for locking. Ordered + deduplicated so concurrent # callers lock in the same order — prevents deadlock on overlapping # multi-file V4A patches. _resolved_paths: list[str] = [] _seen: set[str] = set() for _p in _paths_to_check: try: _r = str(_resolve_path_for_task(_p, task_id)) except Exception: _r = None if _r and _r not in _seen: _resolved_paths.append(_r) _seen.add(_r) _resolved_paths.sort() # Acquire per-path locks in sorted order via ExitStack. On single # path this degenerates to one lock; on empty list (unresolvable) # it's a no-op and execution falls through unchanged. from contextlib import ExitStack with ExitStack() as _locks: for _r in _resolved_paths: _locks.enter_context(file_state.lock_path(_r)) # Collect warnings — cross-agent registry first (names sibling), # then per-task tracker as a fallback. stale_warnings: list[str] = [] _path_to_resolved: dict[str, str] = {} for _p in _paths_to_check: try: _r = str(_resolve_path_for_task(_p, task_id)) except Exception: _r = None _path_to_resolved[_p] = _r _cross = file_state.check_stale(task_id, _r) if _r else None _sw = _cross or _check_file_staleness(_p, task_id) if _sw: stale_warnings.append(_sw) file_ops = _get_file_ops(task_id) if mode == "replace": if not path: return tool_error("path required") if old_string is None or new_string is None: return tool_error("old_string and new_string required") result = file_ops.patch_replace(path, old_string, new_string, replace_all) elif mode == "patch": if not patch: return tool_error("patch content required") result = file_ops.patch_v4a(patch) else: return tool_error(f"Unknown mode: {mode}") result_dict = result.to_dict() if stale_warnings: result_dict["_warning"] = stale_warnings[0] if len(stale_warnings) == 1 else " | ".join(stale_warnings) # Refresh stored timestamps for all successfully-patched paths so # consecutive edits by this task don't trigger false warnings. if not result_dict.get("error"): for _p in _paths_to_check: _update_read_timestamp(_p, task_id) _r = _path_to_resolved.get(_p) if _r: file_state.note_write(task_id, _r) # Hint when old_string not found — saves iterations where the agent # retries with stale content instead of re-reading the file. # Suppressed when patch_replace already attached a rich "Did you mean?" # snippet (which is strictly more useful than the generic hint). if result_dict.get("error") and "Could not find" in str(result_dict["error"]): if "Did you mean one of these sections?" not in str(result_dict["error"]): result_dict["_hint"] = ( "old_string not found. Use read_file to verify the current " "content, or search_files to locate the text." ) return json.dumps(result_dict, ensure_ascii=False) except Exception as e: return tool_error(str(e)) def search_tool(pattern: str, target: str = "content", path: str = ".", file_glob: str = None, limit: int = 50, offset: int = 0, output_mode: str = "content", context: int = 0, task_id: str = "default") -> str: """Search for content or files.""" try: offset, limit = normalize_search_pagination(offset, limit) # Track searches to detect *consecutive* repeated search loops. # Include pagination args so users can page through truncated # results without tripping the repeated-search guard. search_key = ( "search", pattern, target, str(path), file_glob or "", limit, offset, ) with _read_tracker_lock: task_data = _read_tracker.setdefault(task_id, { "last_key": None, "consecutive": 0, "read_history": set(), }) if task_data["last_key"] == search_key: task_data["consecutive"] += 1 else: task_data["last_key"] = search_key task_data["consecutive"] = 1 count = task_data["consecutive"] if count >= 4: return json.dumps({ "error": ( f"BLOCKED: You have run this exact search {count} times in a row. " "The results have NOT changed. You already have this information. " "STOP re-searching and proceed with your task." ), "pattern": pattern, "already_searched": count, }, ensure_ascii=False) file_ops = _get_file_ops(task_id) result = file_ops.search( pattern=pattern, path=path, target=target, file_glob=file_glob, limit=limit, offset=offset, output_mode=output_mode, context=context ) if hasattr(result, 'matches'): for m in result.matches: if hasattr(m, 'content') and m.content: m.content = redact_sensitive_text(m.content) result_dict = result.to_dict() if count >= 3: result_dict["_warning"] = ( f"You have run this exact search {count} times consecutively. " "The results have not changed. Use the information you already have." ) result_json = json.dumps(result_dict, ensure_ascii=False) # Hint when results were truncated — explicit next offset is clearer # than relying on the model to infer it from total_count vs match count. if result_dict.get("truncated"): next_offset = offset + limit result_json += f"\n\n[Hint: Results truncated. Use offset={next_offset} to see more, or narrow with a more specific pattern or file_glob.]" return result_json except Exception as e: return tool_error(str(e)) # --------------------------------------------------------------------------- # Schemas + Registry # --------------------------------------------------------------------------- from tools.registry import registry, tool_error def _check_file_reqs(): """Lazy wrapper to avoid circular import with tools/__init__.py.""" from tools import check_file_requirements return check_file_requirements() READ_FILE_SCHEMA = { "name": "read_file", "description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Cannot read images or binary files — use vision_analyze for images.", "parameters": { "type": "object", "properties": { "path": {"type": "string", "description": "Path to the file to read (absolute, relative, or ~/path)"}, "offset": {"type": "integer", "description": "Line number to start reading from (1-indexed, default: 1)", "default": 1, "minimum": 1}, "limit": {"type": "integer", "description": "Maximum number of lines to read (default: 500, max: 2000)", "default": 500, "maximum": 2000} }, "required": ["path"] } } WRITE_FILE_SCHEMA = { "name": "write_file", "description": "Write content to a file, completely replacing existing content. Use this instead of echo/cat heredoc in terminal. Creates parent directories automatically. OVERWRITES the entire file — use 'patch' for targeted edits.", "parameters": { "type": "object", "properties": { "path": {"type": "string", "description": "Path to the file to write (will be created if it doesn't exist, overwritten if it does)"}, "content": {"type": "string", "description": "Complete content to write to the file"} }, "required": ["path", "content"] } } PATCH_SCHEMA = { "name": "patch", "description": "Targeted find-and-replace edits in files. Use this instead of sed/awk in terminal. Uses fuzzy matching (9 strategies) so minor whitespace/indentation differences won't break it. Returns a unified diff. Auto-runs syntax checks after editing.\n\nReplace mode (default): find a unique string and replace it.\nPatch mode: apply V4A multi-file patches for bulk changes.", "parameters": { "type": "object", "properties": { "mode": {"type": "string", "enum": ["replace", "patch"], "description": "Edit mode: 'replace' for targeted find-and-replace, 'patch' for V4A multi-file patches", "default": "replace"}, "path": {"type": "string", "description": "File path to edit (required for 'replace' mode)"}, "old_string": {"type": "string", "description": "Text to find in the file (required for 'replace' mode). Must be unique in the file unless replace_all=true. Include enough surrounding context to ensure uniqueness."}, "new_string": {"type": "string", "description": "Replacement text (required for 'replace' mode). Can be empty string to delete the matched text."}, "replace_all": {"type": "boolean", "description": "Replace all occurrences instead of requiring a unique match (default: false)", "default": False}, "patch": {"type": "string", "description": "V4A format patch content (required for 'patch' mode). Format:\n*** Begin Patch\n*** Update File: path/to/file\n@@ context hint @@\n context line\n-removed line\n+added line\n*** End Patch"} }, "required": ["mode"] } } SEARCH_FILES_SCHEMA = { "name": "search_files", "description": "Search file contents or find files by name. Use this instead of grep/rg/find/ls in terminal. Ripgrep-backed, faster than shell equivalents.\n\nContent search (target='content'): Regex search inside files. Output modes: full matches with line numbers, file paths only, or match counts.\n\nFile search (target='files'): Find files by glob pattern (e.g., '*.py', '*config*'). Also use this instead of ls — results sorted by modification time.", "parameters": { "type": "object", "properties": { "pattern": {"type": "string", "description": "Regex pattern for content search, or glob pattern (e.g., '*.py') for file search"}, "target": {"type": "string", "enum": ["content", "files"], "description": "'content' searches inside file contents, 'files' searches for files by name", "default": "content"}, "path": {"type": "string", "description": "Directory or file to search in (default: current working directory)", "default": "."}, "file_glob": {"type": "string", "description": "Filter files by pattern in grep mode (e.g., '*.py' to only search Python files)"}, "limit": {"type": "integer", "description": "Maximum number of results to return (default: 50)", "default": 50}, "offset": {"type": "integer", "description": "Skip first N results for pagination (default: 0)", "default": 0}, "output_mode": {"type": "string", "enum": ["content", "files_only", "count"], "description": "Output format for grep mode: 'content' shows matching lines with line numbers, 'files_only' lists file paths, 'count' shows match counts per file", "default": "content"}, "context": {"type": "integer", "description": "Number of context lines before and after each match (grep mode only)", "default": 0} }, "required": ["pattern"] } } def _handle_read_file(args, **kw): tid = kw.get("task_id") or "default" return read_file_tool(path=args.get("path", ""), offset=args.get("offset", 1), limit=args.get("limit", 500), task_id=tid) def _handle_write_file(args, **kw): tid = kw.get("task_id") or "default" return write_file_tool(path=args.get("path", ""), content=args.get("content", ""), task_id=tid) def _handle_patch(args, **kw): tid = kw.get("task_id") or "default" return patch_tool( mode=args.get("mode", "replace"), path=args.get("path"), old_string=args.get("old_string"), new_string=args.get("new_string"), replace_all=args.get("replace_all", False), patch=args.get("patch"), task_id=tid) def _handle_search_files(args, **kw): tid = kw.get("task_id") or "default" target_map = {"grep": "content", "find": "files"} raw_target = args.get("target", "content") target = target_map.get(raw_target, raw_target) return search_tool( pattern=args.get("pattern", ""), target=target, path=args.get("path", "."), file_glob=args.get("file_glob"), limit=args.get("limit", 50), offset=args.get("offset", 0), output_mode=args.get("output_mode", "content"), context=args.get("context", 0), task_id=tid) registry.register(name="read_file", toolset="file", schema=READ_FILE_SCHEMA, handler=_handle_read_file, check_fn=_check_file_reqs, emoji="📖", max_result_size_chars=float('inf')) registry.register(name="write_file", toolset="file", schema=WRITE_FILE_SCHEMA, handler=_handle_write_file, check_fn=_check_file_reqs, emoji="✍️", max_result_size_chars=100_000) registry.register(name="patch", toolset="file", schema=PATCH_SCHEMA, handler=_handle_patch, check_fn=_check_file_reqs, emoji="🔧", max_result_size_chars=100_000) registry.register(name="search_files", toolset="file", schema=SEARCH_FILES_SCHEMA, handler=_handle_search_files, check_fn=_check_file_reqs, emoji="🔎", max_result_size_chars=100_000)