diff --git a/AGENTS.md b/AGENTS.md index fd59d3b20e..0e3eab5287 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -262,6 +262,35 @@ Terminal tool configuration (in `~/.hermes/config.yaml`): - `terminal.modal_image` - Image for Modal backend - SSH: `TERMINAL_SSH_HOST`, `TERMINAL_SSH_USER`, `TERMINAL_SSH_KEY` in .env +### Dangerous Command Approval + +The terminal tool includes safety checks for potentially destructive commands (e.g., `rm -rf`, `DROP TABLE`, `chmod 777`, etc.): + +**Behavior by Backend:** +- **Docker/Singularity/Modal**: Commands run unrestricted (isolated containers) +- **Local/SSH**: Dangerous commands trigger approval flow + +**Approval Flow (CLI):** +``` +āš ļø Potentially dangerous command detected: recursive delete + rm -rf /tmp/test + + [o]nce | [s]ession | [a]lways | [d]eny + Choice [o/s/a/D]: +``` + +**Approval Flow (Messaging):** +- Command is blocked with explanation +- Agent explains and asks user to confirm +- If user says "yes/approve/do it", agent retries with `force=True` + +**Configuration:** +- `command_allowlist` in `~/.hermes/config.yaml` stores permanently allowed patterns +- Add patterns via "always" approval or edit directly + +**Sudo Handling (Messaging):** +- If sudo fails over messaging, output includes tip to add `SUDO_PASSWORD` to `~/.hermes/.env` + --- ## Adding New Tools diff --git a/TODO.md b/TODO.md index 3f42923acf..dc116539b7 100644 --- a/TODO.md +++ b/TODO.md @@ -423,4 +423,22 @@ --- +## Recently Completed āœ… + +### Dangerous Command Approval System +**Implemented:** Dangerous command detection and approval for terminal tool. + +**Features:** +- Pattern-based detection of dangerous commands (rm -rf, DROP TABLE, chmod 777, etc.) +- CLI prompt with options: `[o]nce | [s]ession | [a]lways | [d]eny` +- Session caching (approved patterns don't re-prompt) +- Permanent allowlist in `~/.hermes/config.yaml` +- Force flag for agent to bypass after user confirmation +- Skip check for isolated backends (Docker, Singularity, Modal) +- Helpful sudo failure messages for messaging platforms + +**Files:** `tools/terminal_tool.py`, `model_tools.py`, `hermes_cli/config.py` + +--- + *Last updated: $(date +%Y-%m-%d)* šŸ¤– diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 6efcaa7f8b..65443d623a 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -99,6 +99,9 @@ DEFAULT_CONFIG = { "personality": "kawaii", }, + # Permanently allowed dangerous command patterns (added via "always" approval) + "command_allowlist": [], + # Config schema version - bump this when adding new required fields "_config_version": 1, } diff --git a/model_tools.py b/model_tools.py index 138860195f..3bdcbf4f06 100644 --- a/model_tools.py +++ b/model_tools.py @@ -274,6 +274,11 @@ def get_terminal_tool_definitions() -> List[Dict[str, Any]]: "type": "integer", "description": "Command timeout in seconds (optional)", "minimum": 1 + }, + "force": { + "type": "boolean", + "description": "Skip dangerous command safety check. Only use after user explicitly confirms they want to run a blocked command.", + "default": False } }, "required": ["command"] @@ -776,8 +781,9 @@ def handle_terminal_function_call(function_name: str, function_args: Dict[str, A command = function_args.get("command") background = function_args.get("background", False) timeout = function_args.get("timeout") + force = function_args.get("force", False) # Skip dangerous command check if user confirmed - return terminal_tool(command=command, background=background, timeout=timeout, task_id=task_id) + return terminal_tool(command=command, background=background, timeout=timeout, task_id=task_id, force=force) else: return json.dumps({"error": f"Unknown terminal function: {function_name}"}, ensure_ascii=False) diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index da2b762b15..81578e7ca9 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -207,6 +207,233 @@ def _check_disk_usage_warning(): # Session-cached sudo password (persists until CLI exits) _cached_sudo_password: str = "" +# ============================================================================= +# Dangerous Command Approval System +# ============================================================================= + +# Session-cached dangerous command approvals (pattern -> approved) +_session_approved_patterns: set = set() + +# Dangerous command patterns (regex, description) +DANGEROUS_PATTERNS = [ + (r'\brm\s+(-[^\s]*\s+)*/', "delete in root path"), + (r'\brm\s+(-[^\s]*)?r', "recursive delete"), + (r'\bchmod\s+(-[^\s]*\s+)*777\b', "world-writable permissions"), + (r'\bchown\s+(-[^\s]*)?R\s+root', "recursive chown to root"), + (r'\bmkfs\b', "format filesystem"), + (r'\bdd\s+.*if=', "disk copy"), + (r'>\s*/dev/sd', "write to block device"), + (r'\bDROP\s+(TABLE|DATABASE)\b', "SQL DROP"), + (r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', "SQL DELETE without WHERE"), + (r'\bTRUNCATE\s+(TABLE)?\s*\w', "SQL TRUNCATE"), + (r'>\s*/etc/', "overwrite system config"), + (r'\bsystemctl\s+(stop|disable|mask)\b', "stop/disable system service"), + (r'\bkill\s+-9\s+-1\b', "kill all processes"), + (r'\bpkill\s+-9\b', "force kill processes"), + (r':()\s*{\s*:\s*\|\s*:&\s*}\s*;:', "fork bomb"), +] + + +def _load_permanent_allowlist() -> set: + """Load permanently allowed command patterns from config.""" + try: + from hermes_cli.config import load_config + config = load_config() + patterns = config.get("command_allowlist", []) + return set(patterns) if patterns else set() + except Exception: + return set() + + +def _save_permanent_allowlist(patterns: set): + """Save permanently allowed command patterns to config.""" + try: + from hermes_cli.config import load_config, save_config + config = load_config() + config["command_allowlist"] = list(patterns) + save_config(config) + except Exception as e: + print(f" āš ļø Could not save allowlist: {e}") + + +def _detect_dangerous_command(command: str) -> tuple: + """ + Check if command matches any dangerous patterns. + + Returns: + (is_dangerous, pattern_key, description) or (False, None, None) + """ + import re + command_lower = command.lower() + + for pattern, description in DANGEROUS_PATTERNS: + if re.search(pattern, command_lower, re.IGNORECASE): + # Use a simplified pattern key for caching (first word + key chars) + pattern_key = pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20] + return (True, pattern_key, description) + + return (False, None, None) + + +def _is_command_approved(pattern_key: str) -> bool: + """Check if a pattern is approved (session or permanent).""" + if pattern_key in _session_approved_patterns: + return True + + permanent = _load_permanent_allowlist() + if pattern_key in permanent: + return True + + return False + + +def _prompt_dangerous_approval(command: str, description: str, timeout_seconds: int = 60) -> str: + """ + Prompt user to approve a dangerous command (CLI only). + + Returns: 'once', 'session', 'always', or 'deny' + """ + import sys + import threading + + # Pause spinner if one is running + os.environ["HERMES_SPINNER_PAUSE"] = "1" + + try: + print() + print(f" āš ļø \033[33mPotentially dangerous command detected:\033[0m {description}") + print(f" \033[2m{command[:100]}{'...' if len(command) > 100 else ''}\033[0m") + print() + print(f" [\033[32mo\033[0m]nce | [\033[33ms\033[0m]ession | [\033[36ma\033[0m]lways | [\033[31md\033[0m]eny") + print() + sys.stdout.flush() + + result = {"choice": ""} + + def get_input(): + try: + result["choice"] = input(" Choice [o/s/a/D]: ").strip().lower() + except: + result["choice"] = "" + + thread = threading.Thread(target=get_input, daemon=True) + thread.start() + thread.join(timeout=timeout_seconds) + + if thread.is_alive(): + print("\n ā± Timeout - denying command") + return "deny" + + choice = result["choice"] + + if choice in ('o', 'once'): + print(" āœ“ Allowed once") + return "once" + elif choice in ('s', 'session'): + print(" āœ“ Allowed for this session") + return "session" + elif choice in ('a', 'always'): + print(" āœ“ Added to permanent allowlist") + return "always" + else: + print(" āœ— Denied") + return "deny" + + except (EOFError, KeyboardInterrupt): + print("\n āœ— Cancelled") + return "deny" + finally: + if "HERMES_SPINNER_PAUSE" in os.environ: + del os.environ["HERMES_SPINNER_PAUSE"] + print() + sys.stdout.flush() + + +def _check_dangerous_command(command: str, env_type: str) -> dict: + """ + Check if command is dangerous and handle approval. + + Only applies to local/ssh backends in interactive contexts. + + Args: + command: The command to check + env_type: The terminal backend type + + Returns: + {"approved": True/False, "message": str or None} + """ + # Skip check for isolated environments (containers are disposable) + if env_type in ("docker", "singularity", "modal"): + return {"approved": True, "message": None} + + # Detect dangerous command + is_dangerous, pattern_key, description = _detect_dangerous_command(command) + + if not is_dangerous: + return {"approved": True, "message": None} + + # Check if already approved + if _is_command_approved(pattern_key): + return {"approved": True, "message": None} + + # Check context - only prompt in interactive modes + is_cli = os.getenv("HERMES_INTERACTIVE") + is_gateway = os.getenv("HERMES_GATEWAY_SESSION") + + if not is_cli and not is_gateway: + # Programmatic use - allow (user opted into local backend) + return {"approved": True, "message": None} + + if is_gateway: + # Messaging context - return informative denial, agent should ask user + return { + "approved": False, + "pattern_key": pattern_key, + "message": f"āš ļø This command was blocked because it's potentially dangerous ({description}). If you want me to run it anyway, please confirm by saying 'yes, run it' or 'approve'." + } + + # CLI context - prompt user + choice = _prompt_dangerous_approval(command, description) + + if choice == "deny": + return {"approved": False, "message": "Command denied by user"} + + # Handle approval + if choice == "session": + _session_approved_patterns.add(pattern_key) + elif choice == "always": + _session_approved_patterns.add(pattern_key) + permanent = _load_permanent_allowlist() + permanent.add(pattern_key) + _save_permanent_allowlist(permanent) + + return {"approved": True, "message": None} + + +def _handle_sudo_failure(output: str, env_type: str) -> str: + """ + Check for sudo failure and add helpful message for messaging contexts. + + Returns enhanced output if sudo failed in messaging context, else original. + """ + is_gateway = os.getenv("HERMES_GATEWAY_SESSION") + + if not is_gateway: + return output + + # Check for sudo failure indicators + sudo_failures = [ + "sudo: a password is required", + "sudo: no tty present", + "sudo: a terminal is required", + ] + + for failure in sudo_failures: + if failure in output: + return output + "\n\nšŸ’” Tip: To enable sudo over messaging, add SUDO_PASSWORD to ~/.hermes/.env on the agent machine." + + return output + def _prompt_for_sudo_password(timeout_seconds: int = 45) -> str: """ @@ -1050,7 +1277,8 @@ def terminal_tool( command: str, background: bool = False, timeout: Optional[int] = None, - task_id: Optional[str] = None + task_id: Optional[str] = None, + force: bool = False ) -> str: """ Execute a command using mini-swe-agent's execution environments. @@ -1060,6 +1288,7 @@ def terminal_tool( background: Whether to run in background (default: False) timeout: Command timeout in seconds (default: from config) task_id: Unique identifier for environment isolation (optional) + force: If True, skip dangerous command check (use after user confirms) Returns: str: JSON string with output, exit_code, and error fields @@ -1073,6 +1302,9 @@ def terminal_tool( # With custom timeout >>> result = terminal_tool(command="long_task.sh", timeout=300) + + # Force run after user confirmation + >>> result = terminal_tool(command="rm -rf /tmp/old", force=True) """ global _active_environments, _last_activity @@ -1149,6 +1381,19 @@ def terminal_tool( _last_activity[effective_task_id] = time.time() env = _active_environments[effective_task_id] + # Check for dangerous commands (only for local/ssh in interactive modes) + # Skip check if force=True (user has confirmed they want to run it) + if not force: + approval = _check_dangerous_command(command, env_type) + if not approval["approved"]: + # Command was blocked - return informative message + return json.dumps({ + "output": "", + "exit_code": -1, + "error": approval.get("message", "Command denied - potentially dangerous operation"), + "status": "blocked" + }, ensure_ascii=False) + # Prepare command for execution if background: # Run in background with nohup and redirect output @@ -1205,6 +1450,9 @@ def terminal_tool( output = result.get("output", "") returncode = result.get("returncode", 0) + # Add helpful message for sudo failures in messaging context + output = _handle_sudo_failure(output, env_type) + # Truncate output if too long MAX_OUTPUT_CHARS = 50000 if len(output) > MAX_OUTPUT_CHARS: