mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-03 02:11:48 +00:00
Merge origin/main into atropos-integrations
Merged main's latest changes including: - New hermes_cli/ unified CLI commands - File operations tools, fuzzy match, patch parser - RL training tools and tinker-atropos submodule - Enhanced batch_runner and run_agent - Gateway improvements (Telegram, Discord) - Cron job management - Installation scripts Preserved our branch-specific features: - Modal backend (atropos/backends/modal_backend.py) - Modal terminal tool integration (ModalProfile, _ModalSandboxPool, etc.) - Singularity/Apptainer support - Atropos AgentEnv Modal config fields - Combined pyproject.toml extras (atropos + messaging + cron + cli) Conflict resolution: - cli.py, model_tools.py, README.md: accepted main (newer features) - pyproject.toml: combined both extras and package lists - tools/terminal_tool.py: accepted main's base + re-inserted Modal integration
This commit is contained in:
commit
36ea883d45
79 changed files with 22673 additions and 2082 deletions
|
|
@ -127,6 +127,50 @@ except ModuleNotFoundError: # pragma: no cover
|
|||
def check_browser_requirements() -> bool: # type: ignore[no-redef]
|
||||
return False
|
||||
|
||||
# Cronjob management tools (CLI-only, hermes-cli toolset)
|
||||
from .cronjob_tools import (
|
||||
schedule_cronjob,
|
||||
list_cronjobs,
|
||||
remove_cronjob,
|
||||
check_cronjob_requirements,
|
||||
get_cronjob_tool_definitions,
|
||||
SCHEDULE_CRONJOB_SCHEMA,
|
||||
LIST_CRONJOBS_SCHEMA,
|
||||
REMOVE_CRONJOB_SCHEMA
|
||||
)
|
||||
|
||||
# RL Training tools (Tinker-Atropos)
|
||||
from .rl_training_tool import (
|
||||
rl_list_environments,
|
||||
rl_select_environment,
|
||||
rl_get_current_config,
|
||||
rl_edit_config,
|
||||
rl_start_training,
|
||||
rl_check_status,
|
||||
rl_stop_training,
|
||||
rl_get_results,
|
||||
rl_list_runs,
|
||||
rl_test_inference,
|
||||
check_rl_api_keys,
|
||||
get_missing_keys,
|
||||
)
|
||||
|
||||
# File manipulation tools (read, write, patch, search)
|
||||
from .file_tools import (
|
||||
read_file_tool,
|
||||
write_file_tool,
|
||||
patch_tool,
|
||||
search_tool,
|
||||
get_file_tools,
|
||||
clear_file_ops_cache,
|
||||
)
|
||||
|
||||
# File tools have no external requirements - they use the terminal backend
|
||||
def check_file_requirements():
|
||||
"""File tools only require terminal backend to be available."""
|
||||
from .terminal_tool import check_terminal_requirements
|
||||
return check_terminal_requirements()
|
||||
|
||||
__all__ = [
|
||||
# Web tools
|
||||
'web_search_tool',
|
||||
|
|
@ -175,4 +219,34 @@ __all__ = [
|
|||
'get_active_browser_sessions',
|
||||
'check_browser_requirements',
|
||||
'BROWSER_TOOL_SCHEMAS',
|
||||
# Cronjob management tools (CLI-only)
|
||||
'schedule_cronjob',
|
||||
'list_cronjobs',
|
||||
'remove_cronjob',
|
||||
'check_cronjob_requirements',
|
||||
'get_cronjob_tool_definitions',
|
||||
'SCHEDULE_CRONJOB_SCHEMA',
|
||||
'LIST_CRONJOBS_SCHEMA',
|
||||
'REMOVE_CRONJOB_SCHEMA',
|
||||
# RL Training tools
|
||||
'rl_list_environments',
|
||||
'rl_select_environment',
|
||||
'rl_get_current_config',
|
||||
'rl_edit_config',
|
||||
'rl_start_training',
|
||||
'rl_check_status',
|
||||
'rl_stop_training',
|
||||
'rl_get_results',
|
||||
'rl_list_runs',
|
||||
'rl_test_inference',
|
||||
'check_rl_api_keys',
|
||||
'get_missing_keys',
|
||||
# File manipulation tools
|
||||
'read_file_tool',
|
||||
'write_file_tool',
|
||||
'patch_tool',
|
||||
'search_tool',
|
||||
'get_file_tools',
|
||||
'clear_file_ops_cache',
|
||||
'check_file_requirements',
|
||||
]
|
||||
|
|
|
|||
374
tools/cronjob_tools.py
Normal file
374
tools/cronjob_tools.py
Normal file
|
|
@ -0,0 +1,374 @@
|
|||
"""
|
||||
Cron job management tools for Hermes Agent.
|
||||
|
||||
These tools allow the agent to schedule, list, and remove automated tasks.
|
||||
Only available when running via CLI (hermes-cli toolset).
|
||||
|
||||
IMPORTANT: Cronjobs run in isolated sessions with NO prior context.
|
||||
The prompt must contain ALL necessary information.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
# Import from cron module (will be available when properly installed)
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from cron.jobs import create_job, get_job, list_jobs, remove_job
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tool: schedule_cronjob
|
||||
# =============================================================================
|
||||
|
||||
def schedule_cronjob(
|
||||
prompt: str,
|
||||
schedule: str,
|
||||
name: Optional[str] = None,
|
||||
repeat: Optional[int] = None,
|
||||
deliver: Optional[str] = None,
|
||||
task_id: str = None
|
||||
) -> str:
|
||||
"""
|
||||
Schedule an automated task to run the agent on a schedule.
|
||||
|
||||
IMPORTANT: When the cronjob runs, it starts a COMPLETELY FRESH session.
|
||||
The agent will have NO memory of this conversation or any prior context.
|
||||
Therefore, the prompt MUST contain ALL necessary information:
|
||||
- Full context of what needs to be done
|
||||
- Specific file paths, URLs, or identifiers
|
||||
- Clear success criteria
|
||||
- Any relevant background information
|
||||
|
||||
BAD prompt: "Check on that server issue"
|
||||
GOOD prompt: "SSH into server 192.168.1.100 as user 'deploy', check if nginx
|
||||
is running with 'systemctl status nginx', and verify the site
|
||||
https://example.com returns HTTP 200. Report any issues found."
|
||||
|
||||
Args:
|
||||
prompt: Complete, self-contained instructions for the future agent.
|
||||
Must include ALL context needed - the agent won't remember anything.
|
||||
schedule: When to run. Either:
|
||||
- Duration for one-shot: "30m", "2h", "1d" (runs once)
|
||||
- Interval: "every 30m", "every 2h" (recurring)
|
||||
- Cron expression: "0 9 * * *" (daily at 9am)
|
||||
- ISO timestamp: "2026-02-03T14:00:00" (one-shot at specific time)
|
||||
name: Optional human-friendly name for the job (for listing/management)
|
||||
repeat: How many times to run. Omit for default behavior:
|
||||
- One-shot schedules default to repeat=1 (run once)
|
||||
- Intervals/cron default to forever
|
||||
- Set repeat=5 to run 5 times then auto-delete
|
||||
deliver: Where to send the output. Options:
|
||||
- "origin": Back to where this job was created (default)
|
||||
- "local": Save to local files only (~/.hermes/cron/output/)
|
||||
- "telegram": Send to Telegram home channel
|
||||
- "discord": Send to Discord home channel
|
||||
- "telegram:123456": Send to specific chat ID
|
||||
|
||||
Returns:
|
||||
JSON with job_id, next_run time, and confirmation
|
||||
"""
|
||||
# Get origin info from environment if available
|
||||
origin = None
|
||||
origin_platform = os.getenv("HERMES_SESSION_PLATFORM")
|
||||
origin_chat_id = os.getenv("HERMES_SESSION_CHAT_ID")
|
||||
if origin_platform and origin_chat_id:
|
||||
origin = {
|
||||
"platform": origin_platform,
|
||||
"chat_id": origin_chat_id,
|
||||
"chat_name": os.getenv("HERMES_SESSION_CHAT_NAME"),
|
||||
}
|
||||
|
||||
try:
|
||||
job = create_job(
|
||||
prompt=prompt,
|
||||
schedule=schedule,
|
||||
name=name,
|
||||
repeat=repeat,
|
||||
deliver=deliver,
|
||||
origin=origin
|
||||
)
|
||||
|
||||
# Format repeat info for display
|
||||
times = job["repeat"].get("times")
|
||||
if times is None:
|
||||
repeat_display = "forever"
|
||||
elif times == 1:
|
||||
repeat_display = "once"
|
||||
else:
|
||||
repeat_display = f"{times} times"
|
||||
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
"job_id": job["id"],
|
||||
"name": job["name"],
|
||||
"schedule": job["schedule_display"],
|
||||
"repeat": repeat_display,
|
||||
"deliver": job.get("deliver", "local"),
|
||||
"next_run_at": job["next_run_at"],
|
||||
"message": f"Cronjob '{job['name']}' created. It will run {repeat_display}, deliver to {job.get('deliver', 'local')}, next at {job['next_run_at']}."
|
||||
}, indent=2)
|
||||
|
||||
except Exception as e:
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}, indent=2)
|
||||
|
||||
|
||||
SCHEDULE_CRONJOB_SCHEMA = {
|
||||
"name": "schedule_cronjob",
|
||||
"description": """Schedule an automated task to run the agent on a schedule.
|
||||
|
||||
⚠️ CRITICAL: The cronjob runs in a FRESH SESSION with NO CONTEXT from this conversation.
|
||||
The prompt must be COMPLETELY SELF-CONTAINED with ALL necessary information including:
|
||||
- Full context and background
|
||||
- Specific file paths, URLs, server addresses
|
||||
- Clear instructions and success criteria
|
||||
- Any credentials or configuration details
|
||||
|
||||
The future agent will NOT remember anything from the current conversation.
|
||||
|
||||
SCHEDULE FORMATS:
|
||||
- One-shot: "30m", "2h", "1d" (runs once after delay)
|
||||
- Interval: "every 30m", "every 2h" (recurring)
|
||||
- Cron: "0 9 * * *" (cron expression for precise scheduling)
|
||||
- Timestamp: "2026-02-03T14:00:00" (specific date/time)
|
||||
|
||||
REPEAT BEHAVIOR:
|
||||
- One-shot schedules: run once by default
|
||||
- Intervals/cron: run forever by default
|
||||
- Set repeat=N to run exactly N times then auto-delete
|
||||
|
||||
DELIVERY OPTIONS (where output goes):
|
||||
- "origin": Back to current chat (default if in messaging platform)
|
||||
- "local": Save to local files only (default if in CLI)
|
||||
- "telegram": Send to Telegram home channel
|
||||
- "discord": Send to Discord home channel
|
||||
- "telegram:123456": Send to specific chat (if user provides ID)
|
||||
|
||||
Use for: reminders, periodic checks, scheduled reports, automated maintenance.""",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"prompt": {
|
||||
"type": "string",
|
||||
"description": "Complete, self-contained instructions. Must include ALL context - the future agent will have NO memory of this conversation."
|
||||
},
|
||||
"schedule": {
|
||||
"type": "string",
|
||||
"description": "When to run: '30m' (once in 30min), 'every 30m' (recurring), '0 9 * * *' (cron), or ISO timestamp"
|
||||
},
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "Optional human-friendly name for the job"
|
||||
},
|
||||
"repeat": {
|
||||
"type": "integer",
|
||||
"description": "How many times to run. Omit for default (once for one-shot, forever for recurring). Set to N for exactly N runs."
|
||||
},
|
||||
"deliver": {
|
||||
"type": "string",
|
||||
"description": "Where to send output: 'origin' (back to this chat), 'local' (files only), 'telegram', 'discord', or 'platform:chat_id'"
|
||||
}
|
||||
},
|
||||
"required": ["prompt", "schedule"]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tool: list_cronjobs
|
||||
# =============================================================================
|
||||
|
||||
def list_cronjobs(include_disabled: bool = False, task_id: str = None) -> str:
|
||||
"""
|
||||
List all scheduled cronjobs.
|
||||
|
||||
Returns information about each job including:
|
||||
- Job ID (needed for removal)
|
||||
- Name
|
||||
- Schedule (human-readable)
|
||||
- Repeat status (completed/total or 'forever')
|
||||
- Next scheduled run time
|
||||
- Last run time and status (if any)
|
||||
|
||||
Args:
|
||||
include_disabled: Whether to include disabled/completed jobs
|
||||
|
||||
Returns:
|
||||
JSON array of all scheduled jobs
|
||||
"""
|
||||
try:
|
||||
jobs = list_jobs(include_disabled=include_disabled)
|
||||
|
||||
formatted_jobs = []
|
||||
for job in jobs:
|
||||
# Format repeat status
|
||||
times = job["repeat"].get("times")
|
||||
completed = job["repeat"].get("completed", 0)
|
||||
if times is None:
|
||||
repeat_status = "forever"
|
||||
else:
|
||||
repeat_status = f"{completed}/{times}"
|
||||
|
||||
formatted_jobs.append({
|
||||
"job_id": job["id"],
|
||||
"name": job["name"],
|
||||
"prompt_preview": job["prompt"][:100] + "..." if len(job["prompt"]) > 100 else job["prompt"],
|
||||
"schedule": job["schedule_display"],
|
||||
"repeat": repeat_status,
|
||||
"deliver": job.get("deliver", "local"),
|
||||
"next_run_at": job.get("next_run_at"),
|
||||
"last_run_at": job.get("last_run_at"),
|
||||
"last_status": job.get("last_status"),
|
||||
"enabled": job.get("enabled", True)
|
||||
})
|
||||
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
"count": len(formatted_jobs),
|
||||
"jobs": formatted_jobs
|
||||
}, indent=2)
|
||||
|
||||
except Exception as e:
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}, indent=2)
|
||||
|
||||
|
||||
LIST_CRONJOBS_SCHEMA = {
|
||||
"name": "list_cronjobs",
|
||||
"description": """List all scheduled cronjobs with their IDs, schedules, and status.
|
||||
|
||||
Use this to:
|
||||
- See what jobs are currently scheduled
|
||||
- Find job IDs for removal with remove_cronjob
|
||||
- Check job status and next run times
|
||||
|
||||
Returns job_id, name, schedule, repeat status, next/last run times.""",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"include_disabled": {
|
||||
"type": "boolean",
|
||||
"description": "Include disabled/completed jobs in the list (default: false)"
|
||||
}
|
||||
},
|
||||
"required": []
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tool: remove_cronjob
|
||||
# =============================================================================
|
||||
|
||||
def remove_cronjob(job_id: str, task_id: str = None) -> str:
|
||||
"""
|
||||
Remove a scheduled cronjob by its ID.
|
||||
|
||||
Use list_cronjobs first to find the job_id of the job you want to remove.
|
||||
|
||||
Args:
|
||||
job_id: The ID of the job to remove (from list_cronjobs output)
|
||||
|
||||
Returns:
|
||||
JSON confirmation of removal
|
||||
"""
|
||||
try:
|
||||
job = get_job(job_id)
|
||||
if not job:
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": f"Job with ID '{job_id}' not found. Use list_cronjobs to see available jobs."
|
||||
}, indent=2)
|
||||
|
||||
removed = remove_job(job_id)
|
||||
if removed:
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
"message": f"Cronjob '{job['name']}' (ID: {job_id}) has been removed.",
|
||||
"removed_job": {
|
||||
"id": job_id,
|
||||
"name": job["name"],
|
||||
"schedule": job["schedule_display"]
|
||||
}
|
||||
}, indent=2)
|
||||
else:
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": f"Failed to remove job '{job_id}'"
|
||||
}, indent=2)
|
||||
|
||||
except Exception as e:
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}, indent=2)
|
||||
|
||||
|
||||
REMOVE_CRONJOB_SCHEMA = {
|
||||
"name": "remove_cronjob",
|
||||
"description": """Remove a scheduled cronjob by its ID.
|
||||
|
||||
Use list_cronjobs first to find the job_id of the job you want to remove.
|
||||
Jobs that have completed their repeat count are auto-removed, but you can
|
||||
use this to cancel a job before it completes.""",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"job_id": {
|
||||
"type": "string",
|
||||
"description": "The ID of the cronjob to remove (from list_cronjobs output)"
|
||||
}
|
||||
},
|
||||
"required": ["job_id"]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Requirements check
|
||||
# =============================================================================
|
||||
|
||||
def check_cronjob_requirements() -> bool:
|
||||
"""
|
||||
Check if cronjob tools can be used.
|
||||
|
||||
Only available in interactive CLI mode (HERMES_INTERACTIVE=1).
|
||||
"""
|
||||
return os.getenv("HERMES_INTERACTIVE") == "1"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Exports
|
||||
# =============================================================================
|
||||
|
||||
def get_cronjob_tool_definitions():
|
||||
"""Return tool definitions for cronjob management."""
|
||||
return [
|
||||
SCHEDULE_CRONJOB_SCHEMA,
|
||||
LIST_CRONJOBS_SCHEMA,
|
||||
REMOVE_CRONJOB_SCHEMA
|
||||
]
|
||||
|
||||
|
||||
# For direct testing
|
||||
if __name__ == "__main__":
|
||||
# Test the tools
|
||||
print("Testing schedule_cronjob:")
|
||||
result = schedule_cronjob(
|
||||
prompt="Test prompt for cron job",
|
||||
schedule="5m",
|
||||
name="Test Job"
|
||||
)
|
||||
print(result)
|
||||
|
||||
print("\nTesting list_cronjobs:")
|
||||
result = list_cronjobs()
|
||||
print(result)
|
||||
937
tools/file_operations.py
Normal file
937
tools/file_operations.py
Normal file
|
|
@ -0,0 +1,937 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
File Operations Module
|
||||
|
||||
Provides file manipulation capabilities (read, write, patch, search) that work
|
||||
across all terminal backends (local, docker, singularity, ssh, modal).
|
||||
|
||||
The key insight is that all file operations can be expressed as shell commands,
|
||||
so we wrap the terminal backend's execute() interface to provide a unified file API.
|
||||
|
||||
Usage:
|
||||
from tools.file_operations import ShellFileOperations
|
||||
from tools.terminal_tool import _active_environments
|
||||
|
||||
# Get file operations for a terminal environment
|
||||
file_ops = ShellFileOperations(terminal_env)
|
||||
|
||||
# Read a file
|
||||
result = file_ops.read_file("/path/to/file.py")
|
||||
|
||||
# Write a file
|
||||
result = file_ops.write_file("/path/to/new.py", "print('hello')")
|
||||
|
||||
# Search for content
|
||||
result = file_ops.search("TODO", path=".", file_glob="*.py")
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import uuid
|
||||
import difflib
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional, List, Dict, Any, Tuple
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Result Data Classes
|
||||
# =============================================================================
|
||||
|
||||
@dataclass
|
||||
class ReadResult:
|
||||
"""Result from reading a file."""
|
||||
content: str = ""
|
||||
total_lines: int = 0
|
||||
file_size: int = 0
|
||||
truncated: bool = False
|
||||
hint: Optional[str] = None
|
||||
is_binary: bool = False
|
||||
is_image: bool = False
|
||||
base64_content: Optional[str] = None
|
||||
mime_type: Optional[str] = None
|
||||
dimensions: Optional[str] = None # For images: "WIDTHxHEIGHT"
|
||||
error: Optional[str] = None
|
||||
similar_files: List[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {k: v for k, v in self.__dict__.items() if v is not None and v != [] and v != ""}
|
||||
|
||||
|
||||
@dataclass
|
||||
class WriteResult:
|
||||
"""Result from writing a file."""
|
||||
bytes_written: int = 0
|
||||
dirs_created: bool = False
|
||||
error: Optional[str] = None
|
||||
warning: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {k: v for k, v in self.__dict__.items() if v is not None}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PatchResult:
|
||||
"""Result from patching a file."""
|
||||
success: bool = False
|
||||
diff: str = ""
|
||||
files_modified: List[str] = field(default_factory=list)
|
||||
files_created: List[str] = field(default_factory=list)
|
||||
files_deleted: List[str] = field(default_factory=list)
|
||||
lint: Optional[Dict[str, Any]] = None
|
||||
error: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
result = {"success": self.success}
|
||||
if self.diff:
|
||||
result["diff"] = self.diff
|
||||
if self.files_modified:
|
||||
result["files_modified"] = self.files_modified
|
||||
if self.files_created:
|
||||
result["files_created"] = self.files_created
|
||||
if self.files_deleted:
|
||||
result["files_deleted"] = self.files_deleted
|
||||
if self.lint:
|
||||
result["lint"] = self.lint
|
||||
if self.error:
|
||||
result["error"] = self.error
|
||||
return result
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchMatch:
|
||||
"""A single search match."""
|
||||
path: str
|
||||
line_number: int
|
||||
content: str
|
||||
mtime: float = 0.0 # Modification time for sorting
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchResult:
|
||||
"""Result from searching."""
|
||||
matches: List[SearchMatch] = field(default_factory=list)
|
||||
files: List[str] = field(default_factory=list)
|
||||
counts: Dict[str, int] = field(default_factory=dict)
|
||||
total_count: int = 0
|
||||
truncated: bool = False
|
||||
error: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
result = {"total_count": self.total_count}
|
||||
if self.matches:
|
||||
result["matches"] = [
|
||||
{"path": m.path, "line": m.line_number, "content": m.content}
|
||||
for m in self.matches
|
||||
]
|
||||
if self.files:
|
||||
result["files"] = self.files
|
||||
if self.counts:
|
||||
result["counts"] = self.counts
|
||||
if self.truncated:
|
||||
result["truncated"] = True
|
||||
if self.error:
|
||||
result["error"] = self.error
|
||||
return result
|
||||
|
||||
|
||||
@dataclass
|
||||
class LintResult:
|
||||
"""Result from linting a file."""
|
||||
success: bool = True
|
||||
skipped: bool = False
|
||||
output: str = ""
|
||||
message: str = ""
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
if self.skipped:
|
||||
return {"status": "skipped", "message": self.message}
|
||||
return {
|
||||
"status": "ok" if self.success else "error",
|
||||
"output": self.output
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExecuteResult:
|
||||
"""Result from executing a shell command."""
|
||||
stdout: str = ""
|
||||
exit_code: int = 0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Abstract Interface
|
||||
# =============================================================================
|
||||
|
||||
class FileOperations(ABC):
|
||||
"""Abstract interface for file operations across terminal backends."""
|
||||
|
||||
@abstractmethod
|
||||
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
|
||||
"""Read a file with pagination support."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def write_file(self, path: str, content: str) -> WriteResult:
|
||||
"""Write content to a file, creating directories as needed."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def patch_replace(self, path: str, old_string: str, new_string: str,
|
||||
replace_all: bool = False) -> PatchResult:
|
||||
"""Replace text in a file using fuzzy matching."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def patch_v4a(self, patch_content: str) -> PatchResult:
|
||||
"""Apply a V4A format patch."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def search(self, pattern: str, path: str = ".", target: str = "content",
|
||||
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
|
||||
output_mode: str = "content", context: int = 0) -> SearchResult:
|
||||
"""Search for content or files."""
|
||||
...
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Shell-based Implementation
|
||||
# =============================================================================
|
||||
|
||||
# Binary file extensions (fast path check)
|
||||
BINARY_EXTENSIONS = {
|
||||
# Images
|
||||
'.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.ico', '.tiff', '.tif',
|
||||
'.svg', # SVG is text but often treated as binary
|
||||
# Audio/Video
|
||||
'.mp3', '.mp4', '.wav', '.avi', '.mov', '.mkv', '.flac', '.ogg', '.webm',
|
||||
# Archives
|
||||
'.zip', '.tar', '.gz', '.bz2', '.xz', '.7z', '.rar',
|
||||
# Documents
|
||||
'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx',
|
||||
# Compiled/Binary
|
||||
'.exe', '.dll', '.so', '.dylib', '.o', '.a', '.pyc', '.pyo', '.class',
|
||||
'.wasm', '.bin',
|
||||
# Fonts
|
||||
'.ttf', '.otf', '.woff', '.woff2', '.eot',
|
||||
# Other
|
||||
'.db', '.sqlite', '.sqlite3',
|
||||
}
|
||||
|
||||
# Image extensions (subset of binary that we can return as base64)
|
||||
IMAGE_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.ico'}
|
||||
|
||||
# Linters by file extension
|
||||
LINTERS = {
|
||||
'.py': 'python -m py_compile {file} 2>&1',
|
||||
'.js': 'node --check {file} 2>&1',
|
||||
'.ts': 'npx tsc --noEmit {file} 2>&1',
|
||||
'.go': 'go vet {file} 2>&1',
|
||||
'.rs': 'rustfmt --check {file} 2>&1',
|
||||
}
|
||||
|
||||
# Max limits for read operations
|
||||
MAX_LINES = 2000
|
||||
MAX_LINE_LENGTH = 2000
|
||||
MAX_FILE_SIZE = 50 * 1024 # 50KB
|
||||
|
||||
|
||||
class ShellFileOperations(FileOperations):
|
||||
"""
|
||||
File operations implemented via shell commands.
|
||||
|
||||
Works with ANY terminal backend that has execute(command, cwd) method.
|
||||
This includes local, docker, singularity, ssh, and modal environments.
|
||||
"""
|
||||
|
||||
def __init__(self, terminal_env, cwd: str = None):
|
||||
"""
|
||||
Initialize file operations with a terminal environment.
|
||||
|
||||
Args:
|
||||
terminal_env: Any object with execute(command, cwd) method.
|
||||
Returns {"output": str, "returncode": int}
|
||||
cwd: Working directory (defaults to env's cwd or current directory)
|
||||
"""
|
||||
self.env = terminal_env
|
||||
# Determine cwd from various possible sources
|
||||
self.cwd = cwd or getattr(terminal_env, 'cwd', None) or \
|
||||
getattr(getattr(terminal_env, 'config', None), 'cwd', None) or os.getcwd()
|
||||
|
||||
# Cache for command availability checks
|
||||
self._command_cache: Dict[str, bool] = {}
|
||||
|
||||
def _exec(self, command: str, cwd: str = None, timeout: int = None) -> ExecuteResult:
|
||||
"""Execute command via terminal backend."""
|
||||
kwargs = {}
|
||||
if timeout:
|
||||
kwargs['timeout'] = timeout
|
||||
|
||||
result = self.env.execute(command, cwd=cwd or self.cwd, **kwargs)
|
||||
return ExecuteResult(
|
||||
stdout=result.get("output", ""),
|
||||
exit_code=result.get("returncode", 0)
|
||||
)
|
||||
|
||||
def _has_command(self, cmd: str) -> bool:
|
||||
"""Check if a command exists in the environment (cached)."""
|
||||
if cmd not in self._command_cache:
|
||||
result = self._exec(f"command -v {cmd} >/dev/null 2>&1 && echo 'yes'")
|
||||
self._command_cache[cmd] = result.stdout.strip() == 'yes'
|
||||
return self._command_cache[cmd]
|
||||
|
||||
def _is_likely_binary(self, path: str, content_sample: str = None) -> bool:
|
||||
"""
|
||||
Check if a file is likely binary.
|
||||
|
||||
Uses extension check (fast) + content analysis (fallback).
|
||||
"""
|
||||
ext = os.path.splitext(path)[1].lower()
|
||||
if ext in BINARY_EXTENSIONS:
|
||||
return True
|
||||
|
||||
# Content analysis: >30% non-printable chars = binary
|
||||
if content_sample:
|
||||
if not content_sample:
|
||||
return False
|
||||
non_printable = sum(1 for c in content_sample[:1000]
|
||||
if ord(c) < 32 and c not in '\n\r\t')
|
||||
return non_printable / min(len(content_sample), 1000) > 0.30
|
||||
|
||||
return False
|
||||
|
||||
def _is_image(self, path: str) -> bool:
|
||||
"""Check if file is an image we can return as base64."""
|
||||
ext = os.path.splitext(path)[1].lower()
|
||||
return ext in IMAGE_EXTENSIONS
|
||||
|
||||
def _add_line_numbers(self, content: str, start_line: int = 1) -> str:
|
||||
"""Add line numbers to content in LINE_NUM|CONTENT format."""
|
||||
lines = content.split('\n')
|
||||
numbered = []
|
||||
for i, line in enumerate(lines, start=start_line):
|
||||
# Truncate long lines
|
||||
if len(line) > MAX_LINE_LENGTH:
|
||||
line = line[:MAX_LINE_LENGTH] + "... [truncated]"
|
||||
numbered.append(f"{i:6d}|{line}")
|
||||
return '\n'.join(numbered)
|
||||
|
||||
def _expand_path(self, path: str) -> str:
|
||||
"""
|
||||
Expand shell-style paths like ~ and ~user to absolute paths.
|
||||
|
||||
This must be done BEFORE shell escaping, since ~ doesn't expand
|
||||
inside single quotes.
|
||||
"""
|
||||
if not path:
|
||||
return path
|
||||
|
||||
# Handle ~ and ~user
|
||||
if path.startswith('~'):
|
||||
# Get home directory via the terminal environment
|
||||
result = self._exec("echo $HOME")
|
||||
if result.exit_code == 0 and result.stdout.strip():
|
||||
home = result.stdout.strip()
|
||||
if path == '~':
|
||||
return home
|
||||
elif path.startswith('~/'):
|
||||
return home + path[1:] # Replace ~ with home
|
||||
# ~username format - let shell expand it
|
||||
expand_result = self._exec(f"echo {path}")
|
||||
if expand_result.exit_code == 0:
|
||||
return expand_result.stdout.strip()
|
||||
|
||||
return path
|
||||
|
||||
def _escape_shell_arg(self, arg: str) -> str:
|
||||
"""Escape a string for safe use in shell commands."""
|
||||
# Use single quotes and escape any single quotes in the string
|
||||
return "'" + arg.replace("'", "'\"'\"'") + "'"
|
||||
|
||||
def _unified_diff(self, old_content: str, new_content: str, filename: str) -> str:
|
||||
"""Generate unified diff between old and new content."""
|
||||
old_lines = old_content.splitlines(keepends=True)
|
||||
new_lines = new_content.splitlines(keepends=True)
|
||||
diff = difflib.unified_diff(
|
||||
old_lines, new_lines,
|
||||
fromfile=f"a/{filename}",
|
||||
tofile=f"b/{filename}"
|
||||
)
|
||||
return ''.join(diff)
|
||||
|
||||
# =========================================================================
|
||||
# READ Implementation
|
||||
# =========================================================================
|
||||
|
||||
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
|
||||
"""
|
||||
Read a file with pagination, binary detection, and line numbers.
|
||||
|
||||
Args:
|
||||
path: File path (absolute or relative to cwd)
|
||||
offset: Line number to start from (1-indexed, default 1)
|
||||
limit: Maximum lines to return (default 500, max 2000)
|
||||
|
||||
Returns:
|
||||
ReadResult with content, metadata, or error info
|
||||
"""
|
||||
# Expand ~ and other shell paths
|
||||
path = self._expand_path(path)
|
||||
|
||||
# Clamp limit
|
||||
limit = min(limit, MAX_LINES)
|
||||
|
||||
# Check if file exists and get metadata
|
||||
stat_cmd = f"stat -c '%s' {self._escape_shell_arg(path)} 2>/dev/null"
|
||||
stat_result = self._exec(stat_cmd)
|
||||
|
||||
if stat_result.exit_code != 0:
|
||||
# File not found - try to suggest similar files
|
||||
return self._suggest_similar_files(path)
|
||||
|
||||
try:
|
||||
file_size = int(stat_result.stdout.strip())
|
||||
except ValueError:
|
||||
file_size = 0
|
||||
|
||||
# Check if file is too large
|
||||
if file_size > MAX_FILE_SIZE:
|
||||
# Still try to read, but warn
|
||||
pass
|
||||
|
||||
# Check if it's an image - return base64
|
||||
if self._is_image(path):
|
||||
return self._read_image(path)
|
||||
|
||||
# Read a sample to check for binary content
|
||||
sample_cmd = f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null"
|
||||
sample_result = self._exec(sample_cmd)
|
||||
|
||||
if self._is_likely_binary(path, sample_result.stdout):
|
||||
return ReadResult(
|
||||
is_binary=True,
|
||||
file_size=file_size,
|
||||
error="Binary file - cannot display as text. Use appropriate tools to handle this file type."
|
||||
)
|
||||
|
||||
# Read with pagination using sed
|
||||
end_line = offset + limit - 1
|
||||
read_cmd = f"sed -n '{offset},{end_line}p' {self._escape_shell_arg(path)}"
|
||||
read_result = self._exec(read_cmd)
|
||||
|
||||
if read_result.exit_code != 0:
|
||||
return ReadResult(error=f"Failed to read file: {read_result.stdout}")
|
||||
|
||||
# Get total line count
|
||||
wc_cmd = f"wc -l < {self._escape_shell_arg(path)}"
|
||||
wc_result = self._exec(wc_cmd)
|
||||
try:
|
||||
total_lines = int(wc_result.stdout.strip())
|
||||
except ValueError:
|
||||
total_lines = 0
|
||||
|
||||
# Check if truncated
|
||||
truncated = total_lines > end_line
|
||||
hint = None
|
||||
if truncated:
|
||||
hint = f"Use offset={end_line + 1} to continue reading (showing {offset}-{end_line} of {total_lines} lines)"
|
||||
|
||||
return ReadResult(
|
||||
content=self._add_line_numbers(read_result.stdout, offset),
|
||||
total_lines=total_lines,
|
||||
file_size=file_size,
|
||||
truncated=truncated,
|
||||
hint=hint
|
||||
)
|
||||
|
||||
def _read_image(self, path: str) -> ReadResult:
|
||||
"""Read an image file, returning base64 content."""
|
||||
# Get file size
|
||||
stat_cmd = f"stat -c '%s' {self._escape_shell_arg(path)} 2>/dev/null"
|
||||
stat_result = self._exec(stat_cmd)
|
||||
try:
|
||||
file_size = int(stat_result.stdout.strip())
|
||||
except ValueError:
|
||||
file_size = 0
|
||||
|
||||
# Get base64 content
|
||||
b64_cmd = f"base64 -w 0 {self._escape_shell_arg(path)} 2>/dev/null"
|
||||
b64_result = self._exec(b64_cmd, timeout=30)
|
||||
|
||||
if b64_result.exit_code != 0:
|
||||
return ReadResult(
|
||||
is_image=True,
|
||||
is_binary=True,
|
||||
file_size=file_size,
|
||||
error=f"Failed to read image: {b64_result.stdout}"
|
||||
)
|
||||
|
||||
# Try to get dimensions (requires ImageMagick)
|
||||
dimensions = None
|
||||
if self._has_command('identify'):
|
||||
dim_cmd = f"identify -format '%wx%h' {self._escape_shell_arg(path)} 2>/dev/null"
|
||||
dim_result = self._exec(dim_cmd)
|
||||
if dim_result.exit_code == 0:
|
||||
dimensions = dim_result.stdout.strip()
|
||||
|
||||
# Determine MIME type from extension
|
||||
ext = os.path.splitext(path)[1].lower()
|
||||
mime_types = {
|
||||
'.png': 'image/png',
|
||||
'.jpg': 'image/jpeg',
|
||||
'.jpeg': 'image/jpeg',
|
||||
'.gif': 'image/gif',
|
||||
'.webp': 'image/webp',
|
||||
'.bmp': 'image/bmp',
|
||||
'.ico': 'image/x-icon',
|
||||
}
|
||||
mime_type = mime_types.get(ext, 'application/octet-stream')
|
||||
|
||||
return ReadResult(
|
||||
is_image=True,
|
||||
is_binary=True,
|
||||
file_size=file_size,
|
||||
base64_content=b64_result.stdout,
|
||||
mime_type=mime_type,
|
||||
dimensions=dimensions
|
||||
)
|
||||
|
||||
def _suggest_similar_files(self, path: str) -> ReadResult:
|
||||
"""Suggest similar files when the requested file is not found."""
|
||||
# Get directory and filename
|
||||
dir_path = os.path.dirname(path) or "."
|
||||
filename = os.path.basename(path)
|
||||
|
||||
# List files in directory
|
||||
ls_cmd = f"ls -1 {self._escape_shell_arg(dir_path)} 2>/dev/null | head -20"
|
||||
ls_result = self._exec(ls_cmd)
|
||||
|
||||
similar = []
|
||||
if ls_result.exit_code == 0 and ls_result.stdout.strip():
|
||||
files = ls_result.stdout.strip().split('\n')
|
||||
# Simple similarity: files that share some characters with the target
|
||||
for f in files:
|
||||
# Check if filenames share significant overlap
|
||||
common = set(filename.lower()) & set(f.lower())
|
||||
if len(common) >= len(filename) * 0.5: # 50% character overlap
|
||||
similar.append(os.path.join(dir_path, f))
|
||||
|
||||
return ReadResult(
|
||||
error=f"File not found: {path}",
|
||||
similar_files=similar[:5] # Limit to 5 suggestions
|
||||
)
|
||||
|
||||
# =========================================================================
|
||||
# WRITE Implementation
|
||||
# =========================================================================
|
||||
|
||||
def write_file(self, path: str, content: str) -> WriteResult:
|
||||
"""
|
||||
Write content to a file, creating parent directories as needed.
|
||||
|
||||
Uses heredoc with unique marker for safe shell execution.
|
||||
|
||||
Args:
|
||||
path: File path to write
|
||||
content: Content to write
|
||||
|
||||
Returns:
|
||||
WriteResult with bytes written or error
|
||||
"""
|
||||
# Expand ~ and other shell paths
|
||||
path = self._expand_path(path)
|
||||
|
||||
# Create parent directories
|
||||
parent = os.path.dirname(path)
|
||||
dirs_created = False
|
||||
|
||||
if parent:
|
||||
mkdir_cmd = f"mkdir -p {self._escape_shell_arg(parent)}"
|
||||
mkdir_result = self._exec(mkdir_cmd)
|
||||
if mkdir_result.exit_code == 0:
|
||||
dirs_created = True
|
||||
|
||||
# Generate unique marker for heredoc that won't appear in content
|
||||
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
|
||||
while marker in content:
|
||||
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# Write using heredoc with single-quoted marker (prevents all expansion)
|
||||
# The single quotes around the marker prevent variable expansion
|
||||
write_cmd = f"cat > {self._escape_shell_arg(path)} << '{marker}'\n{content}\n{marker}"
|
||||
write_result = self._exec(write_cmd)
|
||||
|
||||
if write_result.exit_code != 0:
|
||||
return WriteResult(error=f"Failed to write file: {write_result.stdout}")
|
||||
|
||||
# Get bytes written
|
||||
stat_cmd = f"stat -c '%s' {self._escape_shell_arg(path)} 2>/dev/null"
|
||||
stat_result = self._exec(stat_cmd)
|
||||
|
||||
try:
|
||||
bytes_written = int(stat_result.stdout.strip())
|
||||
except ValueError:
|
||||
bytes_written = len(content.encode('utf-8'))
|
||||
|
||||
return WriteResult(
|
||||
bytes_written=bytes_written,
|
||||
dirs_created=dirs_created
|
||||
)
|
||||
|
||||
# =========================================================================
|
||||
# PATCH Implementation (Replace Mode)
|
||||
# =========================================================================
|
||||
|
||||
def patch_replace(self, path: str, old_string: str, new_string: str,
|
||||
replace_all: bool = False) -> PatchResult:
|
||||
"""
|
||||
Replace text in a file using fuzzy matching.
|
||||
|
||||
Args:
|
||||
path: File path to modify
|
||||
old_string: Text to find (must be unique unless replace_all=True)
|
||||
new_string: Replacement text
|
||||
replace_all: If True, replace all occurrences
|
||||
|
||||
Returns:
|
||||
PatchResult with diff and lint results
|
||||
"""
|
||||
# Expand ~ and other shell paths
|
||||
path = self._expand_path(path)
|
||||
|
||||
# Read current content
|
||||
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
|
||||
read_result = self._exec(read_cmd)
|
||||
|
||||
if read_result.exit_code != 0:
|
||||
return PatchResult(error=f"Failed to read file: {path}")
|
||||
|
||||
content = read_result.stdout
|
||||
|
||||
# Import and use fuzzy matching
|
||||
from tools.fuzzy_match import fuzzy_find_and_replace
|
||||
|
||||
new_content, match_count, error = fuzzy_find_and_replace(
|
||||
content, old_string, new_string, replace_all
|
||||
)
|
||||
|
||||
if error:
|
||||
return PatchResult(error=error)
|
||||
|
||||
if match_count == 0:
|
||||
return PatchResult(error=f"Could not find match for old_string in {path}")
|
||||
|
||||
# Write back
|
||||
write_result = self.write_file(path, new_content)
|
||||
if write_result.error:
|
||||
return PatchResult(error=f"Failed to write changes: {write_result.error}")
|
||||
|
||||
# Generate diff
|
||||
diff = self._unified_diff(content, new_content, path)
|
||||
|
||||
# Auto-lint
|
||||
lint_result = self._check_lint(path)
|
||||
|
||||
return PatchResult(
|
||||
success=True,
|
||||
diff=diff,
|
||||
files_modified=[path],
|
||||
lint=lint_result.to_dict() if lint_result else None
|
||||
)
|
||||
|
||||
def patch_v4a(self, patch_content: str) -> PatchResult:
|
||||
"""
|
||||
Apply a V4A format patch.
|
||||
|
||||
V4A format:
|
||||
*** Begin Patch
|
||||
*** Update File: path/to/file.py
|
||||
@@ context hint @@
|
||||
context line
|
||||
-removed line
|
||||
+added line
|
||||
*** End Patch
|
||||
|
||||
Args:
|
||||
patch_content: V4A format patch string
|
||||
|
||||
Returns:
|
||||
PatchResult with changes made
|
||||
"""
|
||||
# Import patch parser
|
||||
from tools.patch_parser import parse_v4a_patch, apply_v4a_operations
|
||||
|
||||
operations, parse_error = parse_v4a_patch(patch_content)
|
||||
if parse_error:
|
||||
return PatchResult(error=f"Failed to parse patch: {parse_error}")
|
||||
|
||||
# Apply operations
|
||||
result = apply_v4a_operations(operations, self)
|
||||
return result
|
||||
|
||||
def _check_lint(self, path: str) -> LintResult:
|
||||
"""
|
||||
Run syntax check on a file after editing.
|
||||
|
||||
Args:
|
||||
path: File path to lint
|
||||
|
||||
Returns:
|
||||
LintResult with status and any errors
|
||||
"""
|
||||
ext = os.path.splitext(path)[1].lower()
|
||||
|
||||
if ext not in LINTERS:
|
||||
return LintResult(skipped=True, message=f"No linter for {ext} files")
|
||||
|
||||
# Check if linter command is available
|
||||
linter_cmd = LINTERS[ext]
|
||||
# Extract the base command (first word)
|
||||
base_cmd = linter_cmd.split()[0]
|
||||
|
||||
if not self._has_command(base_cmd):
|
||||
return LintResult(skipped=True, message=f"{base_cmd} not available")
|
||||
|
||||
# Run linter
|
||||
cmd = linter_cmd.format(file=self._escape_shell_arg(path))
|
||||
result = self._exec(cmd, timeout=30)
|
||||
|
||||
return LintResult(
|
||||
success=result.exit_code == 0,
|
||||
output=result.stdout.strip() if result.stdout.strip() else ""
|
||||
)
|
||||
|
||||
# =========================================================================
|
||||
# SEARCH Implementation
|
||||
# =========================================================================
|
||||
|
||||
def search(self, pattern: str, path: str = ".", target: str = "content",
|
||||
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
|
||||
output_mode: str = "content", context: int = 0) -> SearchResult:
|
||||
"""
|
||||
Search for content or files.
|
||||
|
||||
Args:
|
||||
pattern: Regex (for content) or glob pattern (for files)
|
||||
path: Directory/file to search (default: cwd)
|
||||
target: "content" (grep) or "files" (glob)
|
||||
file_glob: File pattern filter for content search (e.g., "*.py")
|
||||
limit: Max results (default 50)
|
||||
offset: Skip first N results
|
||||
output_mode: "content", "files_only", or "count"
|
||||
context: Lines of context around matches
|
||||
|
||||
Returns:
|
||||
SearchResult with matches or file list
|
||||
"""
|
||||
# Expand ~ and other shell paths
|
||||
path = self._expand_path(path)
|
||||
|
||||
if target == "files":
|
||||
return self._search_files(pattern, path, limit, offset)
|
||||
else:
|
||||
return self._search_content(pattern, path, file_glob, limit, offset,
|
||||
output_mode, context)
|
||||
|
||||
def _search_files(self, pattern: str, path: str, limit: int, offset: int) -> SearchResult:
|
||||
"""Search for files by name pattern (glob-like)."""
|
||||
# Check if find is available (not on Windows without Git Bash/WSL)
|
||||
if not self._has_command('find'):
|
||||
return SearchResult(
|
||||
error="File search requires 'find' command. "
|
||||
"On Windows, use Git Bash, WSL, or install Unix tools."
|
||||
)
|
||||
|
||||
# Auto-prepend **/ for recursive search if not already present
|
||||
if not pattern.startswith('**/') and '/' not in pattern:
|
||||
search_pattern = pattern
|
||||
else:
|
||||
search_pattern = pattern.split('/')[-1]
|
||||
|
||||
# Use find with modification time sorting
|
||||
# -printf '%T@ %p\n' outputs: timestamp path
|
||||
# sort -rn sorts by timestamp descending (newest first)
|
||||
cmd = f"find {self._escape_shell_arg(path)} -type f -name {self._escape_shell_arg(search_pattern)} " \
|
||||
f"-printf '%T@ %p\\n' 2>/dev/null | sort -rn | tail -n +{offset + 1} | head -n {limit}"
|
||||
|
||||
result = self._exec(cmd, timeout=60)
|
||||
|
||||
if result.exit_code != 0 and not result.stdout.strip():
|
||||
# Try without -printf (BSD find compatibility)
|
||||
cmd_simple = f"find {self._escape_shell_arg(path)} -type f -name {self._escape_shell_arg(search_pattern)} " \
|
||||
f"2>/dev/null | head -n {limit + offset} | tail -n +{offset + 1}"
|
||||
result = self._exec(cmd_simple, timeout=60)
|
||||
|
||||
files = []
|
||||
for line in result.stdout.strip().split('\n'):
|
||||
if not line:
|
||||
continue
|
||||
# Parse "timestamp path" format
|
||||
parts = line.split(' ', 1)
|
||||
if len(parts) == 2 and parts[0].replace('.', '').isdigit():
|
||||
files.append(parts[1])
|
||||
else:
|
||||
files.append(line)
|
||||
|
||||
return SearchResult(
|
||||
files=files,
|
||||
total_count=len(files)
|
||||
)
|
||||
|
||||
def _search_content(self, pattern: str, path: str, file_glob: Optional[str],
|
||||
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
|
||||
"""Search for content inside files (grep-like)."""
|
||||
# Try ripgrep first (fast), fallback to grep (slower but works)
|
||||
if self._has_command('rg'):
|
||||
return self._search_with_rg(pattern, path, file_glob, limit, offset,
|
||||
output_mode, context)
|
||||
elif self._has_command('grep'):
|
||||
return self._search_with_grep(pattern, path, file_glob, limit, offset,
|
||||
output_mode, context)
|
||||
else:
|
||||
# Neither rg nor grep available (Windows without Git Bash, etc.)
|
||||
return SearchResult(
|
||||
error="Content search requires ripgrep (rg) or grep. "
|
||||
"Install ripgrep: https://github.com/BurntSushi/ripgrep#installation"
|
||||
)
|
||||
|
||||
def _search_with_rg(self, pattern: str, path: str, file_glob: Optional[str],
|
||||
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
|
||||
"""Search using ripgrep."""
|
||||
cmd_parts = ["rg", "--line-number", "--no-heading"]
|
||||
|
||||
# Add context if requested
|
||||
if context > 0:
|
||||
cmd_parts.extend(["-C", str(context)])
|
||||
|
||||
# Add file glob filter
|
||||
if file_glob:
|
||||
cmd_parts.extend(["--glob", file_glob])
|
||||
|
||||
# Output mode handling
|
||||
if output_mode == "files_only":
|
||||
cmd_parts.append("-l") # Files only
|
||||
elif output_mode == "count":
|
||||
cmd_parts.append("-c") # Count per file
|
||||
|
||||
# Add pattern and path
|
||||
cmd_parts.append(self._escape_shell_arg(pattern))
|
||||
cmd_parts.append(self._escape_shell_arg(path))
|
||||
|
||||
# Limit results
|
||||
cmd_parts.extend(["|", "head", "-n", str(limit + offset)])
|
||||
|
||||
cmd = " ".join(cmd_parts)
|
||||
result = self._exec(cmd, timeout=60)
|
||||
|
||||
# Parse results based on output mode
|
||||
if output_mode == "files_only":
|
||||
files = [f for f in result.stdout.strip().split('\n') if f][offset:]
|
||||
return SearchResult(files=files[:limit], total_count=len(files))
|
||||
|
||||
elif output_mode == "count":
|
||||
counts = {}
|
||||
for line in result.stdout.strip().split('\n'):
|
||||
if ':' in line:
|
||||
parts = line.rsplit(':', 1)
|
||||
if len(parts) == 2:
|
||||
try:
|
||||
counts[parts[0]] = int(parts[1])
|
||||
except ValueError:
|
||||
pass
|
||||
return SearchResult(counts=counts, total_count=sum(counts.values()))
|
||||
|
||||
else:
|
||||
# Parse content matches
|
||||
matches = []
|
||||
for line in result.stdout.strip().split('\n')[offset:]:
|
||||
if not line:
|
||||
continue
|
||||
# Format: file:line:content
|
||||
parts = line.split(':', 2)
|
||||
if len(parts) >= 3:
|
||||
try:
|
||||
matches.append(SearchMatch(
|
||||
path=parts[0],
|
||||
line_number=int(parts[1]),
|
||||
content=parts[2][:500] # Truncate long lines
|
||||
))
|
||||
except ValueError:
|
||||
# Line number not an int, skip
|
||||
pass
|
||||
|
||||
return SearchResult(
|
||||
matches=matches[:limit],
|
||||
total_count=len(matches),
|
||||
truncated=len(matches) > limit
|
||||
)
|
||||
|
||||
def _search_with_grep(self, pattern: str, path: str, file_glob: Optional[str],
|
||||
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
|
||||
"""Fallback search using grep."""
|
||||
cmd_parts = ["grep", "-rn"]
|
||||
|
||||
# Add context if requested
|
||||
if context > 0:
|
||||
cmd_parts.extend(["-C", str(context)])
|
||||
|
||||
# Add file pattern filter
|
||||
if file_glob:
|
||||
cmd_parts.extend(["--include", file_glob])
|
||||
|
||||
# Output mode handling
|
||||
if output_mode == "files_only":
|
||||
cmd_parts.append("-l")
|
||||
elif output_mode == "count":
|
||||
cmd_parts.append("-c")
|
||||
|
||||
# Add pattern and path
|
||||
cmd_parts.append(self._escape_shell_arg(pattern))
|
||||
cmd_parts.append(self._escape_shell_arg(path))
|
||||
|
||||
# Limit and offset
|
||||
cmd_parts.extend(["|", "tail", "-n", f"+{offset + 1}", "|", "head", "-n", str(limit)])
|
||||
|
||||
cmd = " ".join(cmd_parts)
|
||||
result = self._exec(cmd, timeout=60)
|
||||
|
||||
# Parse results (same format as rg)
|
||||
if output_mode == "files_only":
|
||||
files = [f for f in result.stdout.strip().split('\n') if f]
|
||||
return SearchResult(files=files, total_count=len(files))
|
||||
|
||||
elif output_mode == "count":
|
||||
counts = {}
|
||||
for line in result.stdout.strip().split('\n'):
|
||||
if ':' in line:
|
||||
parts = line.rsplit(':', 1)
|
||||
if len(parts) == 2:
|
||||
try:
|
||||
counts[parts[0]] = int(parts[1])
|
||||
except ValueError:
|
||||
pass
|
||||
return SearchResult(counts=counts, total_count=sum(counts.values()))
|
||||
|
||||
else:
|
||||
matches = []
|
||||
for line in result.stdout.strip().split('\n'):
|
||||
if not line:
|
||||
continue
|
||||
parts = line.split(':', 2)
|
||||
if len(parts) >= 3:
|
||||
try:
|
||||
matches.append(SearchMatch(
|
||||
path=parts[0],
|
||||
line_number=int(parts[1]),
|
||||
content=parts[2][:500]
|
||||
))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return SearchResult(
|
||||
matches=matches,
|
||||
total_count=len(matches)
|
||||
)
|
||||
177
tools/file_tools.py
Normal file
177
tools/file_tools.py
Normal file
|
|
@ -0,0 +1,177 @@
|
|||
#!/usr/bin/env python3
|
||||
"""File Tools Module - LLM agent file manipulation tools."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
from typing import Optional
|
||||
from tools.file_operations import ShellFileOperations
|
||||
|
||||
_file_ops_lock = threading.Lock()
|
||||
_file_ops_cache: dict = {}
|
||||
|
||||
|
||||
def _get_file_ops(task_id: str = "default") -> ShellFileOperations:
|
||||
"""Get or create ShellFileOperations for a terminal environment.
|
||||
|
||||
Respects the TERMINAL_ENV setting -- if the task_id doesn't have an
|
||||
environment yet, creates one using the configured backend (local, docker,
|
||||
modal, etc.) rather than always defaulting to local.
|
||||
"""
|
||||
from tools.terminal_tool import (
|
||||
_active_environments, _env_lock, _create_environment,
|
||||
_get_env_config, _last_activity, _start_cleanup_thread,
|
||||
_check_disk_usage_warning,
|
||||
)
|
||||
import time
|
||||
|
||||
# Fast path: check cache without heavy locks
|
||||
with _file_ops_lock:
|
||||
if task_id in _file_ops_cache:
|
||||
return _file_ops_cache[task_id]
|
||||
|
||||
# Check if we need to create a new environment
|
||||
needs_creation = False
|
||||
with _env_lock:
|
||||
if task_id not in _active_environments:
|
||||
needs_creation = True
|
||||
|
||||
# Create environment OUTSIDE locks so we don't block other rollouts
|
||||
# during slow Modal/Docker startup (~10s)
|
||||
if needs_creation:
|
||||
config = _get_env_config()
|
||||
env_type = config["env_type"]
|
||||
|
||||
if env_type == "docker":
|
||||
image = config["docker_image"]
|
||||
elif env_type == "singularity":
|
||||
image = config["singularity_image"]
|
||||
elif env_type == "modal":
|
||||
image = config["modal_image"]
|
||||
else:
|
||||
image = ""
|
||||
|
||||
cwd = config["cwd"]
|
||||
_check_disk_usage_warning()
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
print(f"[FileTools] Creating new {env_type} environment for task {task_id[:8]}...", flush=True)
|
||||
|
||||
new_env = _create_environment(
|
||||
env_type=env_type,
|
||||
image=image,
|
||||
cwd=cwd,
|
||||
timeout=config["timeout"],
|
||||
)
|
||||
|
||||
# Store under lock (brief) -- do NOT call _start_cleanup_thread inside
|
||||
# the lock because it also acquires _env_lock (non-reentrant = deadlock)
|
||||
created = False
|
||||
with _env_lock:
|
||||
if task_id not in _active_environments:
|
||||
_active_environments[task_id] = new_env
|
||||
created = True
|
||||
else:
|
||||
try:
|
||||
if hasattr(new_env, 'stop'):
|
||||
new_env.stop()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if created:
|
||||
_start_cleanup_thread()
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
print(f"[FileTools] {env_type} environment ready for task {task_id[:8]}", flush=True)
|
||||
|
||||
# Now get the environment and build file_ops
|
||||
with _env_lock:
|
||||
_last_activity[task_id] = time.time()
|
||||
terminal_env = _active_environments[task_id]
|
||||
|
||||
file_ops = ShellFileOperations(terminal_env)
|
||||
with _file_ops_lock:
|
||||
_file_ops_cache[task_id] = file_ops
|
||||
return file_ops
|
||||
|
||||
|
||||
def clear_file_ops_cache(task_id: str = None):
|
||||
"""Clear the file operations cache."""
|
||||
with _file_ops_lock:
|
||||
if task_id:
|
||||
_file_ops_cache.pop(task_id, None)
|
||||
else:
|
||||
_file_ops_cache.clear()
|
||||
|
||||
|
||||
def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
|
||||
"""Read a file with pagination and line numbers."""
|
||||
try:
|
||||
file_ops = _get_file_ops(task_id)
|
||||
result = file_ops.read_file(path, offset, limit)
|
||||
return json.dumps(result.to_dict(), ensure_ascii=False)
|
||||
except Exception as e:
|
||||
return json.dumps({"error": str(e)}, ensure_ascii=False)
|
||||
|
||||
|
||||
def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
|
||||
"""Write content to a file."""
|
||||
try:
|
||||
file_ops = _get_file_ops(task_id)
|
||||
result = file_ops.write_file(path, content)
|
||||
return json.dumps(result.to_dict(), ensure_ascii=False)
|
||||
except Exception as e:
|
||||
print(f"[FileTools] write_file error: {type(e).__name__}: {e}", flush=True)
|
||||
return json.dumps({"error": str(e)}, ensure_ascii=False)
|
||||
|
||||
|
||||
def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
|
||||
new_string: str = None, replace_all: bool = False, patch: str = None,
|
||||
task_id: str = "default") -> str:
|
||||
"""Patch a file using replace mode or V4A patch format."""
|
||||
try:
|
||||
file_ops = _get_file_ops(task_id)
|
||||
|
||||
if mode == "replace":
|
||||
if not path:
|
||||
return json.dumps({"error": "path required"})
|
||||
if old_string is None or new_string is None:
|
||||
return json.dumps({"error": "old_string and new_string required"})
|
||||
result = file_ops.patch_replace(path, old_string, new_string, replace_all)
|
||||
elif mode == "patch":
|
||||
if not patch:
|
||||
return json.dumps({"error": "patch content required"})
|
||||
result = file_ops.patch_v4a(patch)
|
||||
else:
|
||||
return json.dumps({"error": f"Unknown mode: {mode}"})
|
||||
|
||||
return json.dumps(result.to_dict(), ensure_ascii=False)
|
||||
except Exception as e:
|
||||
return json.dumps({"error": str(e)}, ensure_ascii=False)
|
||||
|
||||
|
||||
def search_tool(pattern: str, target: str = "content", path: str = ".",
|
||||
file_glob: str = None, limit: int = 50, offset: int = 0,
|
||||
output_mode: str = "content", context: int = 0,
|
||||
task_id: str = "default") -> str:
|
||||
"""Search for content or files."""
|
||||
try:
|
||||
file_ops = _get_file_ops(task_id)
|
||||
result = file_ops.search(
|
||||
pattern=pattern, path=path, target=target, file_glob=file_glob,
|
||||
limit=limit, offset=offset, output_mode=output_mode, context=context
|
||||
)
|
||||
return json.dumps(result.to_dict(), ensure_ascii=False)
|
||||
except Exception as e:
|
||||
return json.dumps({"error": str(e)}, ensure_ascii=False)
|
||||
|
||||
|
||||
FILE_TOOLS = [
|
||||
{"name": "read_file", "function": read_file_tool},
|
||||
{"name": "write_file", "function": write_file_tool},
|
||||
{"name": "patch", "function": patch_tool},
|
||||
{"name": "search", "function": search_tool}
|
||||
]
|
||||
|
||||
|
||||
def get_file_tools():
|
||||
"""Get the list of file tool definitions."""
|
||||
return FILE_TOOLS
|
||||
478
tools/fuzzy_match.py
Normal file
478
tools/fuzzy_match.py
Normal file
|
|
@ -0,0 +1,478 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Fuzzy Matching Module for File Operations
|
||||
|
||||
Implements a multi-strategy matching chain to robustly find and replace text,
|
||||
accommodating variations in whitespace, indentation, and escaping common
|
||||
in LLM-generated code.
|
||||
|
||||
The 9-strategy chain (inspired by OpenCode):
|
||||
1. Exact match - Direct string comparison
|
||||
2. Line-trimmed - Strip leading/trailing whitespace per line
|
||||
3. Block anchor - Match first+last lines, use similarity for middle
|
||||
4. Whitespace normalized - Collapse multiple spaces/tabs to single space
|
||||
5. Indentation flexible - Ignore indentation differences entirely
|
||||
6. Escape normalized - Convert \\n literals to actual newlines
|
||||
7. Trimmed boundary - Trim first/last line whitespace only
|
||||
8. Context-aware - 50% line similarity threshold
|
||||
9. Multi-occurrence - For replace_all flag
|
||||
|
||||
Usage:
|
||||
from tools.fuzzy_match import fuzzy_find_and_replace
|
||||
|
||||
new_content, match_count, error = fuzzy_find_and_replace(
|
||||
content="def foo():\\n pass",
|
||||
old_string="def foo():",
|
||||
new_string="def bar():",
|
||||
replace_all=False
|
||||
)
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import Tuple, Optional, List, Callable
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
|
||||
def fuzzy_find_and_replace(content: str, old_string: str, new_string: str,
|
||||
replace_all: bool = False) -> Tuple[str, int, Optional[str]]:
|
||||
"""
|
||||
Find and replace text using a chain of increasingly fuzzy matching strategies.
|
||||
|
||||
Args:
|
||||
content: The file content to search in
|
||||
old_string: The text to find
|
||||
new_string: The replacement text
|
||||
replace_all: If True, replace all occurrences; if False, require uniqueness
|
||||
|
||||
Returns:
|
||||
Tuple of (new_content, match_count, error_message)
|
||||
- If successful: (modified_content, number_of_replacements, None)
|
||||
- If failed: (original_content, 0, error_description)
|
||||
"""
|
||||
if not old_string:
|
||||
return content, 0, "old_string cannot be empty"
|
||||
|
||||
if old_string == new_string:
|
||||
return content, 0, "old_string and new_string are identical"
|
||||
|
||||
# Try each matching strategy in order
|
||||
strategies: List[Tuple[str, Callable]] = [
|
||||
("exact", _strategy_exact),
|
||||
("line_trimmed", _strategy_line_trimmed),
|
||||
("whitespace_normalized", _strategy_whitespace_normalized),
|
||||
("indentation_flexible", _strategy_indentation_flexible),
|
||||
("escape_normalized", _strategy_escape_normalized),
|
||||
("trimmed_boundary", _strategy_trimmed_boundary),
|
||||
("block_anchor", _strategy_block_anchor),
|
||||
("context_aware", _strategy_context_aware),
|
||||
]
|
||||
|
||||
for strategy_name, strategy_fn in strategies:
|
||||
matches = strategy_fn(content, old_string)
|
||||
|
||||
if matches:
|
||||
# Found matches with this strategy
|
||||
if len(matches) > 1 and not replace_all:
|
||||
return content, 0, (
|
||||
f"Found {len(matches)} matches for old_string. "
|
||||
f"Provide more context to make it unique, or use replace_all=True."
|
||||
)
|
||||
|
||||
# Perform replacement
|
||||
new_content = _apply_replacements(content, matches, new_string)
|
||||
return new_content, len(matches), None
|
||||
|
||||
# No strategy found a match
|
||||
return content, 0, "Could not find a match for old_string in the file"
|
||||
|
||||
|
||||
def _apply_replacements(content: str, matches: List[Tuple[int, int]], new_string: str) -> str:
|
||||
"""
|
||||
Apply replacements at the given positions.
|
||||
|
||||
Args:
|
||||
content: Original content
|
||||
matches: List of (start, end) positions to replace
|
||||
new_string: Replacement text
|
||||
|
||||
Returns:
|
||||
Content with replacements applied
|
||||
"""
|
||||
# Sort matches by position (descending) to replace from end to start
|
||||
# This preserves positions of earlier matches
|
||||
sorted_matches = sorted(matches, key=lambda x: x[0], reverse=True)
|
||||
|
||||
result = content
|
||||
for start, end in sorted_matches:
|
||||
result = result[:start] + new_string + result[end:]
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Matching Strategies
|
||||
# =============================================================================
|
||||
|
||||
def _strategy_exact(content: str, pattern: str) -> List[Tuple[int, int]]:
|
||||
"""Strategy 1: Exact string match."""
|
||||
matches = []
|
||||
start = 0
|
||||
while True:
|
||||
pos = content.find(pattern, start)
|
||||
if pos == -1:
|
||||
break
|
||||
matches.append((pos, pos + len(pattern)))
|
||||
start = pos + 1
|
||||
return matches
|
||||
|
||||
|
||||
def _strategy_line_trimmed(content: str, pattern: str) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
Strategy 2: Match with line-by-line whitespace trimming.
|
||||
|
||||
Strips leading/trailing whitespace from each line before matching.
|
||||
"""
|
||||
# Normalize pattern and content by trimming each line
|
||||
pattern_lines = [line.strip() for line in pattern.split('\n')]
|
||||
pattern_normalized = '\n'.join(pattern_lines)
|
||||
|
||||
content_lines = content.split('\n')
|
||||
content_normalized_lines = [line.strip() for line in content_lines]
|
||||
|
||||
# Build mapping from normalized positions back to original positions
|
||||
return _find_normalized_matches(
|
||||
content, content_lines, content_normalized_lines,
|
||||
pattern, pattern_normalized
|
||||
)
|
||||
|
||||
|
||||
def _strategy_whitespace_normalized(content: str, pattern: str) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
Strategy 3: Collapse multiple whitespace to single space.
|
||||
"""
|
||||
def normalize(s):
|
||||
# Collapse multiple spaces/tabs to single space, preserve newlines
|
||||
return re.sub(r'[ \t]+', ' ', s)
|
||||
|
||||
pattern_normalized = normalize(pattern)
|
||||
content_normalized = normalize(content)
|
||||
|
||||
# Find in normalized, map back to original
|
||||
matches_in_normalized = _strategy_exact(content_normalized, pattern_normalized)
|
||||
|
||||
if not matches_in_normalized:
|
||||
return []
|
||||
|
||||
# Map positions back to original content
|
||||
return _map_normalized_positions(content, content_normalized, matches_in_normalized)
|
||||
|
||||
|
||||
def _strategy_indentation_flexible(content: str, pattern: str) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
Strategy 4: Ignore indentation differences entirely.
|
||||
|
||||
Strips all leading whitespace from lines before matching.
|
||||
"""
|
||||
def strip_indent(s):
|
||||
return '\n'.join(line.lstrip() for line in s.split('\n'))
|
||||
|
||||
pattern_stripped = strip_indent(pattern)
|
||||
|
||||
content_lines = content.split('\n')
|
||||
content_stripped_lines = [line.lstrip() for line in content_lines]
|
||||
pattern_lines = [line.lstrip() for line in pattern.split('\n')]
|
||||
|
||||
return _find_normalized_matches(
|
||||
content, content_lines, content_stripped_lines,
|
||||
pattern, '\n'.join(pattern_lines)
|
||||
)
|
||||
|
||||
|
||||
def _strategy_escape_normalized(content: str, pattern: str) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
Strategy 5: Convert escape sequences to actual characters.
|
||||
|
||||
Handles \\n -> newline, \\t -> tab, etc.
|
||||
"""
|
||||
def unescape(s):
|
||||
# Convert common escape sequences
|
||||
return s.replace('\\n', '\n').replace('\\t', '\t').replace('\\r', '\r')
|
||||
|
||||
pattern_unescaped = unescape(pattern)
|
||||
|
||||
if pattern_unescaped == pattern:
|
||||
# No escapes to convert, skip this strategy
|
||||
return []
|
||||
|
||||
return _strategy_exact(content, pattern_unescaped)
|
||||
|
||||
|
||||
def _strategy_trimmed_boundary(content: str, pattern: str) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
Strategy 6: Trim whitespace from first and last lines only.
|
||||
|
||||
Useful when the pattern boundaries have whitespace differences.
|
||||
"""
|
||||
pattern_lines = pattern.split('\n')
|
||||
if not pattern_lines:
|
||||
return []
|
||||
|
||||
# Trim only first and last lines
|
||||
pattern_lines[0] = pattern_lines[0].strip()
|
||||
if len(pattern_lines) > 1:
|
||||
pattern_lines[-1] = pattern_lines[-1].strip()
|
||||
|
||||
modified_pattern = '\n'.join(pattern_lines)
|
||||
|
||||
content_lines = content.split('\n')
|
||||
|
||||
# Search through content for matching block
|
||||
matches = []
|
||||
pattern_line_count = len(pattern_lines)
|
||||
|
||||
for i in range(len(content_lines) - pattern_line_count + 1):
|
||||
block_lines = content_lines[i:i + pattern_line_count]
|
||||
|
||||
# Trim first and last of this block
|
||||
check_lines = block_lines.copy()
|
||||
check_lines[0] = check_lines[0].strip()
|
||||
if len(check_lines) > 1:
|
||||
check_lines[-1] = check_lines[-1].strip()
|
||||
|
||||
if '\n'.join(check_lines) == modified_pattern:
|
||||
# Found match - calculate original positions
|
||||
start_pos = sum(len(line) + 1 for line in content_lines[:i])
|
||||
end_pos = sum(len(line) + 1 for line in content_lines[:i + pattern_line_count]) - 1
|
||||
if end_pos >= len(content):
|
||||
end_pos = len(content)
|
||||
matches.append((start_pos, end_pos))
|
||||
|
||||
return matches
|
||||
|
||||
|
||||
def _strategy_block_anchor(content: str, pattern: str) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
Strategy 7: Match by anchoring on first and last lines.
|
||||
|
||||
If first and last lines match exactly, accept middle with 70% similarity.
|
||||
"""
|
||||
pattern_lines = pattern.split('\n')
|
||||
if len(pattern_lines) < 2:
|
||||
return [] # Need at least 2 lines for anchoring
|
||||
|
||||
first_line = pattern_lines[0].strip()
|
||||
last_line = pattern_lines[-1].strip()
|
||||
|
||||
content_lines = content.split('\n')
|
||||
matches = []
|
||||
|
||||
pattern_line_count = len(pattern_lines)
|
||||
|
||||
for i in range(len(content_lines) - pattern_line_count + 1):
|
||||
# Check if first and last lines match
|
||||
if (content_lines[i].strip() == first_line and
|
||||
content_lines[i + pattern_line_count - 1].strip() == last_line):
|
||||
|
||||
# Check middle similarity
|
||||
if pattern_line_count <= 2:
|
||||
# Only first and last, they match
|
||||
similarity = 1.0
|
||||
else:
|
||||
content_middle = '\n'.join(content_lines[i+1:i+pattern_line_count-1])
|
||||
pattern_middle = '\n'.join(pattern_lines[1:-1])
|
||||
similarity = SequenceMatcher(None, content_middle, pattern_middle).ratio()
|
||||
|
||||
if similarity >= 0.70:
|
||||
# Calculate positions
|
||||
start_pos = sum(len(line) + 1 for line in content_lines[:i])
|
||||
end_pos = sum(len(line) + 1 for line in content_lines[:i + pattern_line_count]) - 1
|
||||
if end_pos >= len(content):
|
||||
end_pos = len(content)
|
||||
matches.append((start_pos, end_pos))
|
||||
|
||||
return matches
|
||||
|
||||
|
||||
def _strategy_context_aware(content: str, pattern: str) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
Strategy 8: Line-by-line similarity with 50% threshold.
|
||||
|
||||
Finds blocks where at least 50% of lines have high similarity.
|
||||
"""
|
||||
pattern_lines = pattern.split('\n')
|
||||
content_lines = content.split('\n')
|
||||
|
||||
if not pattern_lines:
|
||||
return []
|
||||
|
||||
matches = []
|
||||
pattern_line_count = len(pattern_lines)
|
||||
|
||||
for i in range(len(content_lines) - pattern_line_count + 1):
|
||||
block_lines = content_lines[i:i + pattern_line_count]
|
||||
|
||||
# Calculate line-by-line similarity
|
||||
high_similarity_count = 0
|
||||
for p_line, c_line in zip(pattern_lines, block_lines):
|
||||
sim = SequenceMatcher(None, p_line.strip(), c_line.strip()).ratio()
|
||||
if sim >= 0.80:
|
||||
high_similarity_count += 1
|
||||
|
||||
# Need at least 50% of lines to have high similarity
|
||||
if high_similarity_count >= len(pattern_lines) * 0.5:
|
||||
start_pos = sum(len(line) + 1 for line in content_lines[:i])
|
||||
end_pos = sum(len(line) + 1 for line in content_lines[:i + pattern_line_count]) - 1
|
||||
if end_pos >= len(content):
|
||||
end_pos = len(content)
|
||||
matches.append((start_pos, end_pos))
|
||||
|
||||
return matches
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Helper Functions
|
||||
# =============================================================================
|
||||
|
||||
def _find_normalized_matches(content: str, content_lines: List[str],
|
||||
content_normalized_lines: List[str],
|
||||
pattern: str, pattern_normalized: str) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
Find matches in normalized content and map back to original positions.
|
||||
|
||||
Args:
|
||||
content: Original content string
|
||||
content_lines: Original content split by lines
|
||||
content_normalized_lines: Normalized content lines
|
||||
pattern: Original pattern
|
||||
pattern_normalized: Normalized pattern
|
||||
|
||||
Returns:
|
||||
List of (start, end) positions in the original content
|
||||
"""
|
||||
pattern_norm_lines = pattern_normalized.split('\n')
|
||||
num_pattern_lines = len(pattern_norm_lines)
|
||||
|
||||
matches = []
|
||||
|
||||
for i in range(len(content_normalized_lines) - num_pattern_lines + 1):
|
||||
# Check if this block matches
|
||||
block = '\n'.join(content_normalized_lines[i:i + num_pattern_lines])
|
||||
|
||||
if block == pattern_normalized:
|
||||
# Found a match - calculate original positions
|
||||
start_pos = sum(len(line) + 1 for line in content_lines[:i])
|
||||
end_pos = sum(len(line) + 1 for line in content_lines[:i + num_pattern_lines]) - 1
|
||||
|
||||
# Handle case where end is past content
|
||||
if end_pos >= len(content):
|
||||
end_pos = len(content)
|
||||
|
||||
matches.append((start_pos, end_pos))
|
||||
|
||||
return matches
|
||||
|
||||
|
||||
def _map_normalized_positions(original: str, normalized: str,
|
||||
normalized_matches: List[Tuple[int, int]]) -> List[Tuple[int, int]]:
|
||||
"""
|
||||
Map positions from normalized string back to original.
|
||||
|
||||
This is a best-effort mapping that works for whitespace normalization.
|
||||
"""
|
||||
if not normalized_matches:
|
||||
return []
|
||||
|
||||
# Build character mapping from normalized to original
|
||||
orig_to_norm = [] # orig_to_norm[i] = position in normalized
|
||||
|
||||
orig_idx = 0
|
||||
norm_idx = 0
|
||||
|
||||
while orig_idx < len(original) and norm_idx < len(normalized):
|
||||
if original[orig_idx] == normalized[norm_idx]:
|
||||
orig_to_norm.append(norm_idx)
|
||||
orig_idx += 1
|
||||
norm_idx += 1
|
||||
elif original[orig_idx] in ' \t' and normalized[norm_idx] == ' ':
|
||||
# Original has space/tab, normalized collapsed to space
|
||||
orig_to_norm.append(norm_idx)
|
||||
orig_idx += 1
|
||||
# Don't advance norm_idx yet - wait until all whitespace consumed
|
||||
if orig_idx < len(original) and original[orig_idx] not in ' \t':
|
||||
norm_idx += 1
|
||||
elif original[orig_idx] in ' \t':
|
||||
# Extra whitespace in original
|
||||
orig_to_norm.append(norm_idx)
|
||||
orig_idx += 1
|
||||
else:
|
||||
# Mismatch - shouldn't happen with our normalization
|
||||
orig_to_norm.append(norm_idx)
|
||||
orig_idx += 1
|
||||
|
||||
# Fill remaining
|
||||
while orig_idx < len(original):
|
||||
orig_to_norm.append(len(normalized))
|
||||
orig_idx += 1
|
||||
|
||||
# Reverse mapping: for each normalized position, find original range
|
||||
norm_to_orig_start = {}
|
||||
norm_to_orig_end = {}
|
||||
|
||||
for orig_pos, norm_pos in enumerate(orig_to_norm):
|
||||
if norm_pos not in norm_to_orig_start:
|
||||
norm_to_orig_start[norm_pos] = orig_pos
|
||||
norm_to_orig_end[norm_pos] = orig_pos
|
||||
|
||||
# Map matches
|
||||
original_matches = []
|
||||
for norm_start, norm_end in normalized_matches:
|
||||
# Find original start
|
||||
if norm_start in norm_to_orig_start:
|
||||
orig_start = norm_to_orig_start[norm_start]
|
||||
else:
|
||||
# Find nearest
|
||||
orig_start = min(i for i, n in enumerate(orig_to_norm) if n >= norm_start)
|
||||
|
||||
# Find original end
|
||||
if norm_end - 1 in norm_to_orig_end:
|
||||
orig_end = norm_to_orig_end[norm_end - 1] + 1
|
||||
else:
|
||||
orig_end = orig_start + (norm_end - norm_start)
|
||||
|
||||
# Expand to include trailing whitespace that was normalized
|
||||
while orig_end < len(original) and original[orig_end] in ' \t':
|
||||
orig_end += 1
|
||||
|
||||
original_matches.append((orig_start, min(orig_end, len(original))))
|
||||
|
||||
return original_matches
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Utility Functions
|
||||
# =============================================================================
|
||||
|
||||
def find_best_match(content: str, pattern: str) -> Optional[Tuple[int, int, str]]:
|
||||
"""
|
||||
Find the best match for a pattern and return the strategy name.
|
||||
|
||||
Returns:
|
||||
Tuple of (start, end, strategy_name) or None if no match
|
||||
"""
|
||||
strategies = [
|
||||
("exact", _strategy_exact),
|
||||
("line_trimmed", _strategy_line_trimmed),
|
||||
("whitespace_normalized", _strategy_whitespace_normalized),
|
||||
("indentation_flexible", _strategy_indentation_flexible),
|
||||
("escape_normalized", _strategy_escape_normalized),
|
||||
("trimmed_boundary", _strategy_trimmed_boundary),
|
||||
("block_anchor", _strategy_block_anchor),
|
||||
("context_aware", _strategy_context_aware),
|
||||
]
|
||||
|
||||
for strategy_name, strategy_fn in strategies:
|
||||
matches = strategy_fn(content, pattern)
|
||||
if matches:
|
||||
return (matches[0][0], matches[0][1], strategy_name)
|
||||
|
||||
return None
|
||||
439
tools/patch_parser.py
Normal file
439
tools/patch_parser.py
Normal file
|
|
@ -0,0 +1,439 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
V4A Patch Format Parser
|
||||
|
||||
Parses the V4A patch format used by codex, cline, and other coding agents.
|
||||
|
||||
V4A Format:
|
||||
*** Begin Patch
|
||||
*** Update File: path/to/file.py
|
||||
@@ optional context hint @@
|
||||
context line (space prefix)
|
||||
-removed line (minus prefix)
|
||||
+added line (plus prefix)
|
||||
*** Add File: path/to/new.py
|
||||
+new file content
|
||||
+line 2
|
||||
*** Delete File: path/to/old.py
|
||||
*** Move File: old/path.py -> new/path.py
|
||||
*** End Patch
|
||||
|
||||
Usage:
|
||||
from tools.patch_parser import parse_v4a_patch, apply_v4a_operations
|
||||
|
||||
operations, error = parse_v4a_patch(patch_content)
|
||||
if error:
|
||||
print(f"Parse error: {error}")
|
||||
else:
|
||||
result = apply_v4a_operations(operations, file_ops)
|
||||
"""
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Tuple, Any
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class OperationType(Enum):
|
||||
ADD = "add"
|
||||
UPDATE = "update"
|
||||
DELETE = "delete"
|
||||
MOVE = "move"
|
||||
|
||||
|
||||
@dataclass
|
||||
class HunkLine:
|
||||
"""A single line in a patch hunk."""
|
||||
prefix: str # ' ', '-', or '+'
|
||||
content: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Hunk:
|
||||
"""A group of changes within a file."""
|
||||
context_hint: Optional[str] = None
|
||||
lines: List[HunkLine] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PatchOperation:
|
||||
"""A single operation in a V4A patch."""
|
||||
operation: OperationType
|
||||
file_path: str
|
||||
new_path: Optional[str] = None # For move operations
|
||||
hunks: List[Hunk] = field(default_factory=list)
|
||||
content: Optional[str] = None # For add file operations
|
||||
|
||||
|
||||
def parse_v4a_patch(patch_content: str) -> Tuple[List[PatchOperation], Optional[str]]:
|
||||
"""
|
||||
Parse a V4A format patch.
|
||||
|
||||
Args:
|
||||
patch_content: The patch text in V4A format
|
||||
|
||||
Returns:
|
||||
Tuple of (operations, error_message)
|
||||
- If successful: (list_of_operations, None)
|
||||
- If failed: ([], error_description)
|
||||
"""
|
||||
lines = patch_content.split('\n')
|
||||
operations: List[PatchOperation] = []
|
||||
|
||||
# Find patch boundaries
|
||||
start_idx = None
|
||||
end_idx = None
|
||||
|
||||
for i, line in enumerate(lines):
|
||||
if '*** Begin Patch' in line or '***Begin Patch' in line:
|
||||
start_idx = i
|
||||
elif '*** End Patch' in line or '***End Patch' in line:
|
||||
end_idx = i
|
||||
break
|
||||
|
||||
if start_idx is None:
|
||||
# Try to parse without explicit begin marker
|
||||
start_idx = -1
|
||||
|
||||
if end_idx is None:
|
||||
end_idx = len(lines)
|
||||
|
||||
# Parse operations between boundaries
|
||||
i = start_idx + 1
|
||||
current_op: Optional[PatchOperation] = None
|
||||
current_hunk: Optional[Hunk] = None
|
||||
|
||||
while i < end_idx:
|
||||
line = lines[i]
|
||||
|
||||
# Check for file operation markers
|
||||
update_match = re.match(r'\*\*\*\s*Update\s+File:\s*(.+)', line)
|
||||
add_match = re.match(r'\*\*\*\s*Add\s+File:\s*(.+)', line)
|
||||
delete_match = re.match(r'\*\*\*\s*Delete\s+File:\s*(.+)', line)
|
||||
move_match = re.match(r'\*\*\*\s*Move\s+File:\s*(.+?)\s*->\s*(.+)', line)
|
||||
|
||||
if update_match:
|
||||
# Save previous operation
|
||||
if current_op:
|
||||
if current_hunk and current_hunk.lines:
|
||||
current_op.hunks.append(current_hunk)
|
||||
operations.append(current_op)
|
||||
|
||||
current_op = PatchOperation(
|
||||
operation=OperationType.UPDATE,
|
||||
file_path=update_match.group(1).strip()
|
||||
)
|
||||
current_hunk = None
|
||||
|
||||
elif add_match:
|
||||
if current_op:
|
||||
if current_hunk and current_hunk.lines:
|
||||
current_op.hunks.append(current_hunk)
|
||||
operations.append(current_op)
|
||||
|
||||
current_op = PatchOperation(
|
||||
operation=OperationType.ADD,
|
||||
file_path=add_match.group(1).strip()
|
||||
)
|
||||
current_hunk = Hunk()
|
||||
|
||||
elif delete_match:
|
||||
if current_op:
|
||||
if current_hunk and current_hunk.lines:
|
||||
current_op.hunks.append(current_hunk)
|
||||
operations.append(current_op)
|
||||
|
||||
current_op = PatchOperation(
|
||||
operation=OperationType.DELETE,
|
||||
file_path=delete_match.group(1).strip()
|
||||
)
|
||||
operations.append(current_op)
|
||||
current_op = None
|
||||
current_hunk = None
|
||||
|
||||
elif move_match:
|
||||
if current_op:
|
||||
if current_hunk and current_hunk.lines:
|
||||
current_op.hunks.append(current_hunk)
|
||||
operations.append(current_op)
|
||||
|
||||
current_op = PatchOperation(
|
||||
operation=OperationType.MOVE,
|
||||
file_path=move_match.group(1).strip(),
|
||||
new_path=move_match.group(2).strip()
|
||||
)
|
||||
operations.append(current_op)
|
||||
current_op = None
|
||||
current_hunk = None
|
||||
|
||||
elif line.startswith('@@'):
|
||||
# Context hint / hunk marker
|
||||
if current_op:
|
||||
if current_hunk and current_hunk.lines:
|
||||
current_op.hunks.append(current_hunk)
|
||||
|
||||
# Extract context hint
|
||||
hint_match = re.match(r'@@\s*(.+?)\s*@@', line)
|
||||
hint = hint_match.group(1) if hint_match else None
|
||||
current_hunk = Hunk(context_hint=hint)
|
||||
|
||||
elif current_op and line:
|
||||
# Parse hunk line
|
||||
if current_hunk is None:
|
||||
current_hunk = Hunk()
|
||||
|
||||
if line.startswith('+'):
|
||||
current_hunk.lines.append(HunkLine('+', line[1:]))
|
||||
elif line.startswith('-'):
|
||||
current_hunk.lines.append(HunkLine('-', line[1:]))
|
||||
elif line.startswith(' '):
|
||||
current_hunk.lines.append(HunkLine(' ', line[1:]))
|
||||
elif line.startswith('\\'):
|
||||
# "\ No newline at end of file" marker - skip
|
||||
pass
|
||||
else:
|
||||
# Treat as context line (implicit space prefix)
|
||||
current_hunk.lines.append(HunkLine(' ', line))
|
||||
|
||||
i += 1
|
||||
|
||||
# Don't forget the last operation
|
||||
if current_op:
|
||||
if current_hunk and current_hunk.lines:
|
||||
current_op.hunks.append(current_hunk)
|
||||
operations.append(current_op)
|
||||
|
||||
return operations, None
|
||||
|
||||
|
||||
def apply_v4a_operations(operations: List[PatchOperation],
|
||||
file_ops: Any) -> 'PatchResult':
|
||||
"""
|
||||
Apply V4A patch operations using a file operations interface.
|
||||
|
||||
Args:
|
||||
operations: List of PatchOperation from parse_v4a_patch
|
||||
file_ops: Object with read_file, write_file methods
|
||||
|
||||
Returns:
|
||||
PatchResult with results of all operations
|
||||
"""
|
||||
# Import here to avoid circular imports
|
||||
from tools.file_operations import PatchResult
|
||||
|
||||
files_modified = []
|
||||
files_created = []
|
||||
files_deleted = []
|
||||
all_diffs = []
|
||||
errors = []
|
||||
|
||||
for op in operations:
|
||||
try:
|
||||
if op.operation == OperationType.ADD:
|
||||
result = _apply_add(op, file_ops)
|
||||
if result[0]:
|
||||
files_created.append(op.file_path)
|
||||
all_diffs.append(result[1])
|
||||
else:
|
||||
errors.append(f"Failed to add {op.file_path}: {result[1]}")
|
||||
|
||||
elif op.operation == OperationType.DELETE:
|
||||
result = _apply_delete(op, file_ops)
|
||||
if result[0]:
|
||||
files_deleted.append(op.file_path)
|
||||
all_diffs.append(result[1])
|
||||
else:
|
||||
errors.append(f"Failed to delete {op.file_path}: {result[1]}")
|
||||
|
||||
elif op.operation == OperationType.MOVE:
|
||||
result = _apply_move(op, file_ops)
|
||||
if result[0]:
|
||||
files_modified.append(f"{op.file_path} -> {op.new_path}")
|
||||
all_diffs.append(result[1])
|
||||
else:
|
||||
errors.append(f"Failed to move {op.file_path}: {result[1]}")
|
||||
|
||||
elif op.operation == OperationType.UPDATE:
|
||||
result = _apply_update(op, file_ops)
|
||||
if result[0]:
|
||||
files_modified.append(op.file_path)
|
||||
all_diffs.append(result[1])
|
||||
else:
|
||||
errors.append(f"Failed to update {op.file_path}: {result[1]}")
|
||||
|
||||
except Exception as e:
|
||||
errors.append(f"Error processing {op.file_path}: {str(e)}")
|
||||
|
||||
# Run lint on all modified/created files
|
||||
lint_results = {}
|
||||
for f in files_modified + files_created:
|
||||
if hasattr(file_ops, '_check_lint'):
|
||||
lint_result = file_ops._check_lint(f)
|
||||
lint_results[f] = lint_result.to_dict()
|
||||
|
||||
combined_diff = '\n'.join(all_diffs)
|
||||
|
||||
if errors:
|
||||
return PatchResult(
|
||||
success=False,
|
||||
diff=combined_diff,
|
||||
files_modified=files_modified,
|
||||
files_created=files_created,
|
||||
files_deleted=files_deleted,
|
||||
lint=lint_results if lint_results else None,
|
||||
error='; '.join(errors)
|
||||
)
|
||||
|
||||
return PatchResult(
|
||||
success=True,
|
||||
diff=combined_diff,
|
||||
files_modified=files_modified,
|
||||
files_created=files_created,
|
||||
files_deleted=files_deleted,
|
||||
lint=lint_results if lint_results else None
|
||||
)
|
||||
|
||||
|
||||
def _apply_add(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
|
||||
"""Apply an add file operation."""
|
||||
# Extract content from hunks (all + lines)
|
||||
content_lines = []
|
||||
for hunk in op.hunks:
|
||||
for line in hunk.lines:
|
||||
if line.prefix == '+':
|
||||
content_lines.append(line.content)
|
||||
|
||||
content = '\n'.join(content_lines)
|
||||
|
||||
result = file_ops.write_file(op.file_path, content)
|
||||
if result.error:
|
||||
return False, result.error
|
||||
|
||||
diff = f"--- /dev/null\n+++ b/{op.file_path}\n"
|
||||
diff += '\n'.join(f"+{line}" for line in content_lines)
|
||||
|
||||
return True, diff
|
||||
|
||||
|
||||
def _apply_delete(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
|
||||
"""Apply a delete file operation."""
|
||||
# Read file first for diff
|
||||
read_result = file_ops.read_file(op.file_path)
|
||||
|
||||
if read_result.error and "not found" in read_result.error.lower():
|
||||
# File doesn't exist, nothing to delete
|
||||
return True, f"# {op.file_path} already deleted or doesn't exist"
|
||||
|
||||
# Delete by writing empty and then removing
|
||||
# Use shell command via the underlying environment
|
||||
rm_result = file_ops._exec(f"rm -f {file_ops._escape_shell_arg(op.file_path)}")
|
||||
|
||||
if rm_result.exit_code != 0:
|
||||
return False, rm_result.stdout
|
||||
|
||||
diff = f"--- a/{op.file_path}\n+++ /dev/null\n# File deleted"
|
||||
return True, diff
|
||||
|
||||
|
||||
def _apply_move(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
|
||||
"""Apply a move file operation."""
|
||||
# Use shell mv command
|
||||
mv_result = file_ops._exec(
|
||||
f"mv {file_ops._escape_shell_arg(op.file_path)} {file_ops._escape_shell_arg(op.new_path)}"
|
||||
)
|
||||
|
||||
if mv_result.exit_code != 0:
|
||||
return False, mv_result.stdout
|
||||
|
||||
diff = f"# Moved: {op.file_path} -> {op.new_path}"
|
||||
return True, diff
|
||||
|
||||
|
||||
def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
|
||||
"""Apply an update file operation."""
|
||||
# Read current content
|
||||
read_result = file_ops.read_file(op.file_path, limit=10000)
|
||||
|
||||
if read_result.error:
|
||||
return False, f"Cannot read file: {read_result.error}"
|
||||
|
||||
# Parse content (remove line numbers)
|
||||
current_lines = []
|
||||
for line in read_result.content.split('\n'):
|
||||
if '|' in line:
|
||||
# Line format: " 123|content"
|
||||
parts = line.split('|', 1)
|
||||
if len(parts) == 2:
|
||||
current_lines.append(parts[1])
|
||||
else:
|
||||
current_lines.append(line)
|
||||
else:
|
||||
current_lines.append(line)
|
||||
|
||||
current_content = '\n'.join(current_lines)
|
||||
|
||||
# Apply each hunk
|
||||
new_content = current_content
|
||||
|
||||
for hunk in op.hunks:
|
||||
# Build search pattern from context and removed lines
|
||||
search_lines = []
|
||||
replace_lines = []
|
||||
|
||||
for line in hunk.lines:
|
||||
if line.prefix == ' ':
|
||||
search_lines.append(line.content)
|
||||
replace_lines.append(line.content)
|
||||
elif line.prefix == '-':
|
||||
search_lines.append(line.content)
|
||||
elif line.prefix == '+':
|
||||
replace_lines.append(line.content)
|
||||
|
||||
if search_lines:
|
||||
search_pattern = '\n'.join(search_lines)
|
||||
replacement = '\n'.join(replace_lines)
|
||||
|
||||
# Use fuzzy matching
|
||||
from tools.fuzzy_match import fuzzy_find_and_replace
|
||||
new_content, count, error = fuzzy_find_and_replace(
|
||||
new_content, search_pattern, replacement, replace_all=False
|
||||
)
|
||||
|
||||
if error and count == 0:
|
||||
# Try with context hint if available
|
||||
if hunk.context_hint:
|
||||
# Find the context hint location and search nearby
|
||||
hint_pos = new_content.find(hunk.context_hint)
|
||||
if hint_pos != -1:
|
||||
# Search in a window around the hint
|
||||
window_start = max(0, hint_pos - 500)
|
||||
window_end = min(len(new_content), hint_pos + 2000)
|
||||
window = new_content[window_start:window_end]
|
||||
|
||||
window_new, count, error = fuzzy_find_and_replace(
|
||||
window, search_pattern, replacement, replace_all=False
|
||||
)
|
||||
|
||||
if count > 0:
|
||||
new_content = new_content[:window_start] + window_new + new_content[window_end:]
|
||||
error = None
|
||||
|
||||
if error:
|
||||
return False, f"Could not apply hunk: {error}"
|
||||
|
||||
# Write new content
|
||||
write_result = file_ops.write_file(op.file_path, new_content)
|
||||
if write_result.error:
|
||||
return False, write_result.error
|
||||
|
||||
# Generate diff
|
||||
import difflib
|
||||
diff_lines = difflib.unified_diff(
|
||||
current_content.splitlines(keepends=True),
|
||||
new_content.splitlines(keepends=True),
|
||||
fromfile=f"a/{op.file_path}",
|
||||
tofile=f"b/{op.file_path}"
|
||||
)
|
||||
diff = ''.join(diff_lines)
|
||||
|
||||
return True, diff
|
||||
1339
tools/rl_training_tool.py
Normal file
1339
tools/rl_training_tool.py
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -349,7 +349,7 @@ def _load_category_description(category_dir: Path) -> Optional[str]:
|
|||
return None
|
||||
|
||||
|
||||
def skills_categories(task_id: str = None) -> str:
|
||||
def skills_categories(verbose: bool = False, task_id: str = None) -> str:
|
||||
"""
|
||||
List available skill categories with descriptions (progressive disclosure tier 0).
|
||||
|
||||
|
|
@ -358,6 +358,7 @@ def skills_categories(task_id: str = None) -> str:
|
|||
or first paragraph to explain what skills are in that category.
|
||||
|
||||
Args:
|
||||
verbose: If True, include skill counts per category (default: False, but currently always included)
|
||||
task_id: Optional task identifier (unused, for API consistency)
|
||||
|
||||
Returns:
|
||||
|
|
|
|||
|
|
@ -40,7 +40,10 @@ from dataclasses import dataclass, field
|
|||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any, ClassVar, List
|
||||
|
||||
import yaml
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
yaml = None
|
||||
|
||||
# Add mini-swe-agent to path if not installed
|
||||
mini_swe_path = Path(__file__).parent.parent / "mini-swe-agent" / "src"
|
||||
|
|
@ -210,6 +213,234 @@ def _check_disk_usage_warning():
|
|||
# Session-cached sudo password (persists until CLI exits)
|
||||
_cached_sudo_password: str = ""
|
||||
|
||||
# =============================================================================
|
||||
# Dangerous Command Approval System
|
||||
# =============================================================================
|
||||
|
||||
# Session-cached dangerous command approvals (pattern -> approved)
|
||||
_session_approved_patterns: set = set()
|
||||
|
||||
# Dangerous command patterns (regex, description)
|
||||
DANGEROUS_PATTERNS = [
|
||||
(r'\brm\s+(-[^\s]*\s+)*/', "delete in root path"),
|
||||
(r'\brm\s+(-[^\s]*)?r', "recursive delete"),
|
||||
(r'\bchmod\s+(-[^\s]*\s+)*777\b', "world-writable permissions"),
|
||||
(r'\bchown\s+(-[^\s]*)?R\s+root', "recursive chown to root"),
|
||||
(r'\bmkfs\b', "format filesystem"),
|
||||
(r'\bdd\s+.*if=', "disk copy"),
|
||||
(r'>\s*/dev/sd', "write to block device"),
|
||||
(r'\bDROP\s+(TABLE|DATABASE)\b', "SQL DROP"),
|
||||
(r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', "SQL DELETE without WHERE"),
|
||||
(r'\bTRUNCATE\s+(TABLE)?\s*\w', "SQL TRUNCATE"),
|
||||
(r'>\s*/etc/', "overwrite system config"),
|
||||
(r'\bsystemctl\s+(stop|disable|mask)\b', "stop/disable system service"),
|
||||
(r'\bkill\s+-9\s+-1\b', "kill all processes"),
|
||||
(r'\bpkill\s+-9\b', "force kill processes"),
|
||||
(r':()\s*{\s*:\s*\|\s*:&\s*}\s*;:', "fork bomb"),
|
||||
]
|
||||
|
||||
|
||||
def _load_permanent_allowlist() -> set:
|
||||
"""Load permanently allowed command patterns from config."""
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
config = load_config()
|
||||
patterns = config.get("command_allowlist", [])
|
||||
return set(patterns) if patterns else set()
|
||||
except Exception:
|
||||
return set()
|
||||
|
||||
|
||||
def _save_permanent_allowlist(patterns: set):
|
||||
"""Save permanently allowed command patterns to config."""
|
||||
try:
|
||||
from hermes_cli.config import load_config, save_config
|
||||
config = load_config()
|
||||
config["command_allowlist"] = list(patterns)
|
||||
save_config(config)
|
||||
except Exception as e:
|
||||
print(f" ⚠️ Could not save allowlist: {e}")
|
||||
|
||||
|
||||
def _detect_dangerous_command(command: str) -> tuple:
|
||||
"""
|
||||
Check if command matches any dangerous patterns.
|
||||
|
||||
Returns:
|
||||
(is_dangerous, pattern_key, description) or (False, None, None)
|
||||
"""
|
||||
import re
|
||||
command_lower = command.lower()
|
||||
|
||||
for pattern, description in DANGEROUS_PATTERNS:
|
||||
if re.search(pattern, command_lower, re.IGNORECASE):
|
||||
# Use a simplified pattern key for caching (first word + key chars)
|
||||
pattern_key = pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]
|
||||
return (True, pattern_key, description)
|
||||
|
||||
return (False, None, None)
|
||||
|
||||
|
||||
def _is_command_approved(pattern_key: str) -> bool:
|
||||
"""Check if a pattern is approved (session or permanent)."""
|
||||
if pattern_key in _session_approved_patterns:
|
||||
return True
|
||||
|
||||
permanent = _load_permanent_allowlist()
|
||||
if pattern_key in permanent:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _prompt_dangerous_approval(command: str, description: str, timeout_seconds: int = 60) -> str:
|
||||
"""
|
||||
Prompt user to approve a dangerous command (CLI only).
|
||||
|
||||
Returns: 'once', 'session', 'always', or 'deny'
|
||||
"""
|
||||
import sys
|
||||
import threading
|
||||
|
||||
# Pause spinner if one is running
|
||||
os.environ["HERMES_SPINNER_PAUSE"] = "1"
|
||||
|
||||
try:
|
||||
# Use simple ASCII art for compatibility (no ANSI color codes)
|
||||
print()
|
||||
print(f" ⚠️ DANGEROUS COMMAND: {description}")
|
||||
print(f" {command[:80]}{'...' if len(command) > 80 else ''}")
|
||||
print()
|
||||
print(f" [o]nce | [s]ession | [a]lways | [d]eny")
|
||||
print()
|
||||
sys.stdout.flush()
|
||||
|
||||
result = {"choice": ""}
|
||||
|
||||
def get_input():
|
||||
try:
|
||||
result["choice"] = input(" Choice [o/s/a/D]: ").strip().lower()
|
||||
except:
|
||||
result["choice"] = ""
|
||||
|
||||
thread = threading.Thread(target=get_input, daemon=True)
|
||||
thread.start()
|
||||
thread.join(timeout=timeout_seconds)
|
||||
|
||||
if thread.is_alive():
|
||||
print("\n ⏱ Timeout - denying command")
|
||||
return "deny"
|
||||
|
||||
choice = result["choice"]
|
||||
|
||||
if choice in ('o', 'once'):
|
||||
print(" ✓ Allowed once")
|
||||
return "once"
|
||||
elif choice in ('s', 'session'):
|
||||
print(" ✓ Allowed for this session")
|
||||
return "session"
|
||||
elif choice in ('a', 'always'):
|
||||
print(" ✓ Added to permanent allowlist")
|
||||
return "always"
|
||||
else:
|
||||
print(" ✗ Denied")
|
||||
return "deny"
|
||||
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print("\n ✗ Cancelled")
|
||||
return "deny"
|
||||
finally:
|
||||
if "HERMES_SPINNER_PAUSE" in os.environ:
|
||||
del os.environ["HERMES_SPINNER_PAUSE"]
|
||||
print()
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def _check_dangerous_command(command: str, env_type: str) -> dict:
|
||||
"""
|
||||
Check if command is dangerous and handle approval.
|
||||
|
||||
Only applies to local/ssh backends in interactive contexts.
|
||||
|
||||
Args:
|
||||
command: The command to check
|
||||
env_type: The terminal backend type
|
||||
|
||||
Returns:
|
||||
{"approved": True/False, "message": str or None}
|
||||
"""
|
||||
# Skip check for isolated environments (containers are disposable)
|
||||
if env_type in ("docker", "singularity", "modal"):
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
# Detect dangerous command
|
||||
is_dangerous, pattern_key, description = _detect_dangerous_command(command)
|
||||
|
||||
if not is_dangerous:
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
# Check if already approved
|
||||
if _is_command_approved(pattern_key):
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
# Check context - only prompt in interactive modes
|
||||
is_cli = os.getenv("HERMES_INTERACTIVE")
|
||||
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
||||
|
||||
if not is_cli and not is_gateway:
|
||||
# Programmatic use - allow (user opted into local backend)
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
if is_gateway:
|
||||
# Messaging context - return informative denial, agent should ask user
|
||||
return {
|
||||
"approved": False,
|
||||
"pattern_key": pattern_key,
|
||||
"message": f"BLOCKED: This command is potentially dangerous ({description}). Tell the user and ask if they want to add this command pattern to their allowlist. They can do this via 'hermes config edit' or by running the command directly on their machine."
|
||||
}
|
||||
|
||||
# CLI context - prompt user
|
||||
choice = _prompt_dangerous_approval(command, description)
|
||||
|
||||
if choice == "deny":
|
||||
return {"approved": False, "message": "BLOCKED: User denied this potentially dangerous command. Do NOT retry this command - the user has explicitly rejected it."}
|
||||
|
||||
# Handle approval
|
||||
if choice == "session":
|
||||
_session_approved_patterns.add(pattern_key)
|
||||
elif choice == "always":
|
||||
_session_approved_patterns.add(pattern_key)
|
||||
permanent = _load_permanent_allowlist()
|
||||
permanent.add(pattern_key)
|
||||
_save_permanent_allowlist(permanent)
|
||||
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
|
||||
def _handle_sudo_failure(output: str, env_type: str) -> str:
|
||||
"""
|
||||
Check for sudo failure and add helpful message for messaging contexts.
|
||||
|
||||
Returns enhanced output if sudo failed in messaging context, else original.
|
||||
"""
|
||||
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
||||
|
||||
if not is_gateway:
|
||||
return output
|
||||
|
||||
# Check for sudo failure indicators
|
||||
sudo_failures = [
|
||||
"sudo: a password is required",
|
||||
"sudo: no tty present",
|
||||
"sudo: a terminal is required",
|
||||
]
|
||||
|
||||
for failure in sudo_failures:
|
||||
if failure in output:
|
||||
return output + "\n\n💡 Tip: To enable sudo over messaging, add SUDO_PASSWORD to ~/.hermes/.env on the agent machine."
|
||||
|
||||
return output
|
||||
|
||||
|
||||
def _prompt_for_sudo_password(timeout_seconds: int = 45) -> str:
|
||||
"""
|
||||
|
|
@ -726,6 +957,9 @@ class _DockerEnvironment:
|
|||
pass
|
||||
|
||||
|
||||
pass
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModalProfile:
|
||||
|
|
@ -1315,7 +1549,7 @@ class _ModalSandboxEnvironment:
|
|||
TERMINAL_TOOL_DESCRIPTION = """Execute commands on a secure Linux environment.
|
||||
|
||||
**Environment:**
|
||||
- Isolated execution environment (local, Docker, Singularity, or Modal cloud based on configuration)
|
||||
- Isolated execution environment (local, Docker, or Modal cloud based on configuration)
|
||||
- Filesystem persists between tool calls within the same task
|
||||
- Internet access available
|
||||
|
||||
|
|
@ -1323,20 +1557,17 @@ TERMINAL_TOOL_DESCRIPTION = """Execute commands on a secure Linux environment.
|
|||
- Simple commands: Just provide the 'command' parameter
|
||||
- Background processes: Set 'background': True for servers/long-running tasks
|
||||
- Command timeout: Optional 'timeout' parameter in seconds
|
||||
- Modal profiles: Use 'profile' parameter for specialized environments (e.g., GPU)
|
||||
|
||||
**Examples:**
|
||||
- Run command: `{"command": "ls -la"}`
|
||||
- Background task: `{"command": "source venv/bin/activate && python server.py", "background": True}`
|
||||
- With timeout: `{"command": "long_task.sh", "timeout": 300}`
|
||||
- GPU task (Modal): `{"command": "python train.py", "profile": "pytorch-gpu"}`
|
||||
|
||||
**Best Practices:**
|
||||
- Run servers/long processes in background
|
||||
- Monitor disk usage for large tasks
|
||||
- Install whatever tools you need with apt-get or pip
|
||||
- Do not be afraid to run pip with --break-system-packages
|
||||
- For ML/GPU tasks with Modal, use the appropriate profile
|
||||
|
||||
**Things to avoid:**
|
||||
- Do NOT use interactive tools such as tmux, vim, nano, python repl - you will get stuck.
|
||||
|
|
@ -1354,12 +1585,27 @@ _cleanup_running = False
|
|||
# Configuration from environment variables
|
||||
def _get_env_config() -> Dict[str, Any]:
|
||||
"""Get terminal environment configuration from environment variables."""
|
||||
# Default image with Python and Node.js for maximum compatibility
|
||||
default_image = "nikolaik/python-nodejs:python3.11-nodejs20"
|
||||
env_type = os.getenv("TERMINAL_ENV", "local")
|
||||
|
||||
# Default cwd depends on backend:
|
||||
# - local/ssh: current working directory (CLI resolves "." before we get here)
|
||||
# - docker/singularity: /tmp inside the container (singularity bind-mounts /scratch there)
|
||||
# - modal: /root (ephemeral cloud container, full filesystem access)
|
||||
if env_type == "modal":
|
||||
default_cwd = "/root"
|
||||
elif env_type in ("docker", "singularity"):
|
||||
default_cwd = "/tmp"
|
||||
else:
|
||||
default_cwd = os.getcwd()
|
||||
|
||||
return {
|
||||
"env_type": os.getenv("TERMINAL_ENV", "local"), # local, docker, singularity, modal, or ssh
|
||||
"docker_image": os.getenv("TERMINAL_DOCKER_IMAGE", "python:3.11"),
|
||||
"singularity_image": os.getenv("TERMINAL_SINGULARITY_IMAGE", "docker://python:3.11"),
|
||||
"modal_image": os.getenv("TERMINAL_MODAL_IMAGE", "python:3.11"),
|
||||
"cwd": os.getenv("TERMINAL_CWD", "/tmp"),
|
||||
"env_type": env_type,
|
||||
"docker_image": os.getenv("TERMINAL_DOCKER_IMAGE", default_image),
|
||||
"singularity_image": os.getenv("TERMINAL_SINGULARITY_IMAGE", f"docker://{default_image}"),
|
||||
"modal_image": os.getenv("TERMINAL_MODAL_IMAGE", default_image),
|
||||
"cwd": os.getenv("TERMINAL_CWD", default_cwd),
|
||||
"timeout": int(os.getenv("TERMINAL_TIMEOUT", "60")),
|
||||
"lifetime_seconds": int(os.getenv("TERMINAL_LIFETIME_SECONDS", "300")),
|
||||
# SSH-specific config
|
||||
|
|
@ -1370,17 +1616,9 @@ def _get_env_config() -> Dict[str, Any]:
|
|||
}
|
||||
|
||||
|
||||
def _create_environment(
|
||||
env_type: str,
|
||||
image: str,
|
||||
cwd: str,
|
||||
timeout: int,
|
||||
ssh_config: dict = None,
|
||||
task_id: str = "",
|
||||
profile: Optional[str] = None,
|
||||
):
|
||||
def _create_environment(env_type: str, image: str, cwd: str, timeout: int, ssh_config: dict = None):
|
||||
"""
|
||||
Create an execution environment.
|
||||
Create an execution environment from mini-swe-agent.
|
||||
|
||||
Args:
|
||||
env_type: One of "local", "docker", "singularity", "modal", "ssh"
|
||||
|
|
@ -1388,8 +1626,6 @@ def _create_environment(
|
|||
cwd: Working directory
|
||||
timeout: Default command timeout
|
||||
ssh_config: SSH connection config (for env_type="ssh")
|
||||
task_id: Unique task identifier (used for Modal pool management)
|
||||
profile: Modal profile name (e.g., "pytorch-gpu") - only used for modal
|
||||
|
||||
Returns:
|
||||
Environment instance with execute() method
|
||||
|
|
@ -1409,8 +1645,8 @@ def _create_environment(
|
|||
elif env_type == "modal":
|
||||
# Use native Modal Sandbox with auto-scaling pool and profile support
|
||||
return _ModalSandboxEnvironment(
|
||||
image=image,
|
||||
cwd=cwd,
|
||||
image=image,
|
||||
cwd=cwd,
|
||||
timeout=timeout,
|
||||
task_id=task_id,
|
||||
profile=profile,
|
||||
|
|
@ -1609,7 +1845,6 @@ def cleanup_vm(task_id: str):
|
|||
|
||||
atexit.register(_stop_cleanup_thread)
|
||||
|
||||
|
||||
def _shutdown_modal_pools():
|
||||
"""Shutdown Modal pool manager on exit (silently, as interpreter is shutting down)."""
|
||||
try:
|
||||
|
|
@ -1626,18 +1861,18 @@ def terminal_tool(
|
|||
background: bool = False,
|
||||
timeout: Optional[int] = None,
|
||||
task_id: Optional[str] = None,
|
||||
force: bool = False,
|
||||
profile: Optional[str] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Execute a command using configured execution environments.
|
||||
Execute a command using mini-swe-agent's execution environments.
|
||||
|
||||
Args:
|
||||
command: The command to execute
|
||||
background: Whether to run in background (default: False)
|
||||
timeout: Command timeout in seconds (default: from config)
|
||||
task_id: Unique identifier for environment isolation (optional)
|
||||
profile: Modal profile name for heterogeneous workloads (e.g., "pytorch-gpu")
|
||||
Only used when TERMINAL_ENV=modal. If not specified, uses default profile.
|
||||
force: If True, skip dangerous command check (use after user confirms)
|
||||
|
||||
Returns:
|
||||
str: JSON string with output, exit_code, and error fields
|
||||
|
|
@ -1652,8 +1887,8 @@ def terminal_tool(
|
|||
# With custom timeout
|
||||
>>> result = terminal_tool(command="long_task.sh", timeout=300)
|
||||
|
||||
# Use GPU profile for ML tasks (Modal only)
|
||||
>>> result = terminal_tool(command="python train.py", profile="pytorch-gpu")
|
||||
# Force run after user confirmation
|
||||
# Note: force parameter is internal only, not exposed to model API
|
||||
"""
|
||||
global _active_environments, _last_activity
|
||||
|
||||
|
|
@ -1695,43 +1930,74 @@ def terminal_tool(
|
|||
_start_cleanup_thread()
|
||||
|
||||
# Get or create environment
|
||||
# Check under lock, but create OUTSIDE lock so we don't block
|
||||
# other concurrent rollouts during slow Modal/Docker startup
|
||||
needs_creation = False
|
||||
with _env_lock:
|
||||
if effective_task_id not in _active_environments:
|
||||
# Check disk usage before creating new environment (Singularity only)
|
||||
if env_type == "singularity":
|
||||
_check_disk_usage_warning()
|
||||
|
||||
try:
|
||||
# Build SSH config if using SSH environment
|
||||
ssh_config = None
|
||||
if env_type == "ssh":
|
||||
ssh_config = {
|
||||
"host": config.get("ssh_host", ""),
|
||||
"user": config.get("ssh_user", ""),
|
||||
"port": config.get("ssh_port", 22),
|
||||
"key": config.get("ssh_key", ""),
|
||||
}
|
||||
|
||||
_active_environments[effective_task_id] = _create_environment(
|
||||
env_type=env_type,
|
||||
image=image,
|
||||
cwd=cwd,
|
||||
timeout=effective_timeout,
|
||||
ssh_config=ssh_config,
|
||||
task_id=effective_task_id,
|
||||
profile=profile,
|
||||
)
|
||||
except ImportError as e:
|
||||
return json.dumps({
|
||||
"output": "",
|
||||
"exit_code": -1,
|
||||
"error": f"Terminal tool disabled: mini-swe-agent not available ({e})",
|
||||
"status": "disabled"
|
||||
}, ensure_ascii=False)
|
||||
needs_creation = True
|
||||
else:
|
||||
_last_activity[effective_task_id] = time.time()
|
||||
env = _active_environments[effective_task_id]
|
||||
|
||||
# Update last activity time
|
||||
_last_activity[effective_task_id] = time.time()
|
||||
env = _active_environments[effective_task_id]
|
||||
if needs_creation:
|
||||
_check_disk_usage_warning()
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
print(f"[Terminal] Creating new {env_type} environment for task {effective_task_id[:8]}...", flush=True)
|
||||
try:
|
||||
ssh_config = None
|
||||
if env_type == "ssh":
|
||||
ssh_config = {
|
||||
"host": config.get("ssh_host", ""),
|
||||
"user": config.get("ssh_user", ""),
|
||||
"port": config.get("ssh_port", 22),
|
||||
"key": config.get("ssh_key", ""),
|
||||
}
|
||||
|
||||
new_env = _create_environment(
|
||||
env_type=env_type,
|
||||
image=image,
|
||||
cwd=cwd,
|
||||
timeout=effective_timeout,
|
||||
ssh_config=ssh_config
|
||||
)
|
||||
except ImportError as e:
|
||||
return json.dumps({
|
||||
"output": "",
|
||||
"exit_code": -1,
|
||||
"error": f"Terminal tool disabled: mini-swe-agent not available ({e})",
|
||||
"status": "disabled"
|
||||
}, ensure_ascii=False)
|
||||
|
||||
# Store under lock (brief)
|
||||
with _env_lock:
|
||||
if effective_task_id not in _active_environments:
|
||||
_active_environments[effective_task_id] = new_env
|
||||
else:
|
||||
# Another thread created it while we were building -- clean up ours
|
||||
try:
|
||||
if hasattr(new_env, 'stop'):
|
||||
new_env.stop()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
_last_activity[effective_task_id] = time.time()
|
||||
env = _active_environments[effective_task_id]
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
print(f"[Terminal] {env_type} environment ready for task {effective_task_id[:8]}", flush=True)
|
||||
|
||||
# Check for dangerous commands (only for local/ssh in interactive modes)
|
||||
# Skip check if force=True (user has confirmed they want to run it)
|
||||
if not force:
|
||||
approval = _check_dangerous_command(command, env_type)
|
||||
if not approval["approved"]:
|
||||
# Command was blocked - return informative message
|
||||
return json.dumps({
|
||||
"output": "",
|
||||
"exit_code": -1,
|
||||
"error": approval.get("message", "Command denied - potentially dangerous operation"),
|
||||
"status": "blocked"
|
||||
}, ensure_ascii=False)
|
||||
|
||||
# Prepare command for execution
|
||||
if background:
|
||||
|
|
@ -1773,13 +2039,20 @@ def terminal_tool(
|
|||
retry_count += 1
|
||||
wait_time = 2 ** retry_count
|
||||
print(f"⚠️ Terminal: execution error, retrying in {wait_time}s (attempt {retry_count}/{max_retries})")
|
||||
print(f" Command: {command[:200]}")
|
||||
print(f" Error: {type(e).__name__}: {e}")
|
||||
print(f" Task ID: {effective_task_id}, Backend: {env_type}")
|
||||
time.sleep(wait_time)
|
||||
continue
|
||||
|
||||
print(f"❌ Terminal: execution failed after {max_retries} retries")
|
||||
print(f" Command: {command[:200]}")
|
||||
print(f" Error: {type(e).__name__}: {e}")
|
||||
print(f" Task ID: {effective_task_id}, Backend: {env_type}")
|
||||
return json.dumps({
|
||||
"output": "",
|
||||
"exit_code": -1,
|
||||
"error": f"Command execution failed: {str(e)}"
|
||||
"error": f"Command execution failed: {type(e).__name__}: {str(e)}"
|
||||
}, ensure_ascii=False)
|
||||
|
||||
# Got a result
|
||||
|
|
@ -1789,6 +2062,9 @@ def terminal_tool(
|
|||
output = result.get("output", "")
|
||||
returncode = result.get("returncode", 0)
|
||||
|
||||
# Add helpful message for sudo failures in messaging context
|
||||
output = _handle_sudo_failure(output, env_type)
|
||||
|
||||
# Truncate output if too long
|
||||
MAX_OUTPUT_CHARS = 50000
|
||||
if len(output) > MAX_OUTPUT_CHARS:
|
||||
|
|
@ -1817,16 +2093,12 @@ def check_terminal_requirements() -> bool:
|
|||
|
||||
try:
|
||||
if env_type == "local":
|
||||
# Prefer mini-swe-agent when available, but allow a subprocess fallback.
|
||||
try:
|
||||
from minisweagent.environments.local import LocalEnvironment
|
||||
|
||||
return True
|
||||
except ImportError:
|
||||
return True
|
||||
from minisweagent.environments.local import LocalEnvironment
|
||||
return True
|
||||
elif env_type == "docker":
|
||||
from minisweagent.environments.docker import DockerEnvironment
|
||||
# Check if docker is available
|
||||
import subprocess
|
||||
result = subprocess.run(["docker", "version"], capture_output=True, timeout=5)
|
||||
return result.returncode == 0
|
||||
elif env_type == "singularity":
|
||||
|
|
@ -1880,9 +2152,11 @@ if __name__ == "__main__":
|
|||
print(" result = terminal_tool(command='python server.py', background=True)")
|
||||
|
||||
print("\nEnvironment Variables:")
|
||||
print(f" TERMINAL_ENV: {os.getenv('TERMINAL_ENV', 'local')} (local/docker/modal)")
|
||||
print(f" TERMINAL_DOCKER_IMAGE: {os.getenv('TERMINAL_DOCKER_IMAGE', 'python:3.11-slim')}")
|
||||
print(f" TERMINAL_MODAL_IMAGE: {os.getenv('TERMINAL_MODAL_IMAGE', 'python:3.11-slim')}")
|
||||
print(f" TERMINAL_CWD: {os.getenv('TERMINAL_CWD', '/tmp')}")
|
||||
default_img = "nikolaik/python-nodejs:python3.11-nodejs20"
|
||||
print(f" TERMINAL_ENV: {os.getenv('TERMINAL_ENV', 'local')} (local/docker/singularity/modal/ssh)")
|
||||
print(f" TERMINAL_DOCKER_IMAGE: {os.getenv('TERMINAL_DOCKER_IMAGE', default_img)}")
|
||||
print(f" TERMINAL_SINGULARITY_IMAGE: {os.getenv('TERMINAL_SINGULARITY_IMAGE', f'docker://{default_img}')}")
|
||||
print(f" TERMINAL_MODAL_IMAGE: {os.getenv('TERMINAL_MODAL_IMAGE', default_img)}")
|
||||
print(f" TERMINAL_CWD: {os.getenv('TERMINAL_CWD', os.getcwd())}")
|
||||
print(f" TERMINAL_TIMEOUT: {os.getenv('TERMINAL_TIMEOUT', '60')}")
|
||||
print(f" TERMINAL_LIFETIME_SECONDS: {os.getenv('TERMINAL_LIFETIME_SECONDS', '300')}")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue