mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Implement dangerous command approval system for terminal tool
- Added a safety mechanism to detect and approve potentially dangerous commands (e.g., `rm -rf`, `DROP TABLE`). - Introduced an approval flow for local/SSH backends, prompting users for confirmation with options to allow once, for the session, or permanently. - Updated configuration to include a `command_allowlist` for storing approved patterns. - Enhanced messaging for sudo failures in messaging contexts. - Updated relevant documentation in AGENTS.md and TODO.md to reflect these changes.
This commit is contained in:
parent
be91af7551
commit
76d929e177
5 changed files with 306 additions and 2 deletions
29
AGENTS.md
29
AGENTS.md
|
|
@ -262,6 +262,35 @@ Terminal tool configuration (in `~/.hermes/config.yaml`):
|
|||
- `terminal.modal_image` - Image for Modal backend
|
||||
- SSH: `TERMINAL_SSH_HOST`, `TERMINAL_SSH_USER`, `TERMINAL_SSH_KEY` in .env
|
||||
|
||||
### Dangerous Command Approval
|
||||
|
||||
The terminal tool includes safety checks for potentially destructive commands (e.g., `rm -rf`, `DROP TABLE`, `chmod 777`, etc.):
|
||||
|
||||
**Behavior by Backend:**
|
||||
- **Docker/Singularity/Modal**: Commands run unrestricted (isolated containers)
|
||||
- **Local/SSH**: Dangerous commands trigger approval flow
|
||||
|
||||
**Approval Flow (CLI):**
|
||||
```
|
||||
⚠️ Potentially dangerous command detected: recursive delete
|
||||
rm -rf /tmp/test
|
||||
|
||||
[o]nce | [s]ession | [a]lways | [d]eny
|
||||
Choice [o/s/a/D]:
|
||||
```
|
||||
|
||||
**Approval Flow (Messaging):**
|
||||
- Command is blocked with explanation
|
||||
- Agent explains and asks user to confirm
|
||||
- If user says "yes/approve/do it", agent retries with `force=True`
|
||||
|
||||
**Configuration:**
|
||||
- `command_allowlist` in `~/.hermes/config.yaml` stores permanently allowed patterns
|
||||
- Add patterns via "always" approval or edit directly
|
||||
|
||||
**Sudo Handling (Messaging):**
|
||||
- If sudo fails over messaging, output includes tip to add `SUDO_PASSWORD` to `~/.hermes/.env`
|
||||
|
||||
---
|
||||
|
||||
## Adding New Tools
|
||||
|
|
|
|||
18
TODO.md
18
TODO.md
|
|
@ -423,4 +423,22 @@
|
|||
|
||||
---
|
||||
|
||||
## Recently Completed ✅
|
||||
|
||||
### Dangerous Command Approval System
|
||||
**Implemented:** Dangerous command detection and approval for terminal tool.
|
||||
|
||||
**Features:**
|
||||
- Pattern-based detection of dangerous commands (rm -rf, DROP TABLE, chmod 777, etc.)
|
||||
- CLI prompt with options: `[o]nce | [s]ession | [a]lways | [d]eny`
|
||||
- Session caching (approved patterns don't re-prompt)
|
||||
- Permanent allowlist in `~/.hermes/config.yaml`
|
||||
- Force flag for agent to bypass after user confirmation
|
||||
- Skip check for isolated backends (Docker, Singularity, Modal)
|
||||
- Helpful sudo failure messages for messaging platforms
|
||||
|
||||
**Files:** `tools/terminal_tool.py`, `model_tools.py`, `hermes_cli/config.py`
|
||||
|
||||
---
|
||||
|
||||
*Last updated: $(date +%Y-%m-%d)* 🤖
|
||||
|
|
|
|||
|
|
@ -99,6 +99,9 @@ DEFAULT_CONFIG = {
|
|||
"personality": "kawaii",
|
||||
},
|
||||
|
||||
# Permanently allowed dangerous command patterns (added via "always" approval)
|
||||
"command_allowlist": [],
|
||||
|
||||
# Config schema version - bump this when adding new required fields
|
||||
"_config_version": 1,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -274,6 +274,11 @@ def get_terminal_tool_definitions() -> List[Dict[str, Any]]:
|
|||
"type": "integer",
|
||||
"description": "Command timeout in seconds (optional)",
|
||||
"minimum": 1
|
||||
},
|
||||
"force": {
|
||||
"type": "boolean",
|
||||
"description": "Skip dangerous command safety check. Only use after user explicitly confirms they want to run a blocked command.",
|
||||
"default": False
|
||||
}
|
||||
},
|
||||
"required": ["command"]
|
||||
|
|
@ -776,8 +781,9 @@ def handle_terminal_function_call(function_name: str, function_args: Dict[str, A
|
|||
command = function_args.get("command")
|
||||
background = function_args.get("background", False)
|
||||
timeout = function_args.get("timeout")
|
||||
force = function_args.get("force", False) # Skip dangerous command check if user confirmed
|
||||
|
||||
return terminal_tool(command=command, background=background, timeout=timeout, task_id=task_id)
|
||||
return terminal_tool(command=command, background=background, timeout=timeout, task_id=task_id, force=force)
|
||||
|
||||
else:
|
||||
return json.dumps({"error": f"Unknown terminal function: {function_name}"}, ensure_ascii=False)
|
||||
|
|
|
|||
|
|
@ -207,6 +207,233 @@ def _check_disk_usage_warning():
|
|||
# Session-cached sudo password (persists until CLI exits)
|
||||
_cached_sudo_password: str = ""
|
||||
|
||||
# =============================================================================
|
||||
# Dangerous Command Approval System
|
||||
# =============================================================================
|
||||
|
||||
# Session-cached dangerous command approvals (pattern -> approved)
|
||||
_session_approved_patterns: set = set()
|
||||
|
||||
# Dangerous command patterns (regex, description)
|
||||
DANGEROUS_PATTERNS = [
|
||||
(r'\brm\s+(-[^\s]*\s+)*/', "delete in root path"),
|
||||
(r'\brm\s+(-[^\s]*)?r', "recursive delete"),
|
||||
(r'\bchmod\s+(-[^\s]*\s+)*777\b', "world-writable permissions"),
|
||||
(r'\bchown\s+(-[^\s]*)?R\s+root', "recursive chown to root"),
|
||||
(r'\bmkfs\b', "format filesystem"),
|
||||
(r'\bdd\s+.*if=', "disk copy"),
|
||||
(r'>\s*/dev/sd', "write to block device"),
|
||||
(r'\bDROP\s+(TABLE|DATABASE)\b', "SQL DROP"),
|
||||
(r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', "SQL DELETE without WHERE"),
|
||||
(r'\bTRUNCATE\s+(TABLE)?\s*\w', "SQL TRUNCATE"),
|
||||
(r'>\s*/etc/', "overwrite system config"),
|
||||
(r'\bsystemctl\s+(stop|disable|mask)\b', "stop/disable system service"),
|
||||
(r'\bkill\s+-9\s+-1\b', "kill all processes"),
|
||||
(r'\bpkill\s+-9\b', "force kill processes"),
|
||||
(r':()\s*{\s*:\s*\|\s*:&\s*}\s*;:', "fork bomb"),
|
||||
]
|
||||
|
||||
|
||||
def _load_permanent_allowlist() -> set:
|
||||
"""Load permanently allowed command patterns from config."""
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
config = load_config()
|
||||
patterns = config.get("command_allowlist", [])
|
||||
return set(patterns) if patterns else set()
|
||||
except Exception:
|
||||
return set()
|
||||
|
||||
|
||||
def _save_permanent_allowlist(patterns: set):
|
||||
"""Save permanently allowed command patterns to config."""
|
||||
try:
|
||||
from hermes_cli.config import load_config, save_config
|
||||
config = load_config()
|
||||
config["command_allowlist"] = list(patterns)
|
||||
save_config(config)
|
||||
except Exception as e:
|
||||
print(f" ⚠️ Could not save allowlist: {e}")
|
||||
|
||||
|
||||
def _detect_dangerous_command(command: str) -> tuple:
|
||||
"""
|
||||
Check if command matches any dangerous patterns.
|
||||
|
||||
Returns:
|
||||
(is_dangerous, pattern_key, description) or (False, None, None)
|
||||
"""
|
||||
import re
|
||||
command_lower = command.lower()
|
||||
|
||||
for pattern, description in DANGEROUS_PATTERNS:
|
||||
if re.search(pattern, command_lower, re.IGNORECASE):
|
||||
# Use a simplified pattern key for caching (first word + key chars)
|
||||
pattern_key = pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]
|
||||
return (True, pattern_key, description)
|
||||
|
||||
return (False, None, None)
|
||||
|
||||
|
||||
def _is_command_approved(pattern_key: str) -> bool:
|
||||
"""Check if a pattern is approved (session or permanent)."""
|
||||
if pattern_key in _session_approved_patterns:
|
||||
return True
|
||||
|
||||
permanent = _load_permanent_allowlist()
|
||||
if pattern_key in permanent:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _prompt_dangerous_approval(command: str, description: str, timeout_seconds: int = 60) -> str:
|
||||
"""
|
||||
Prompt user to approve a dangerous command (CLI only).
|
||||
|
||||
Returns: 'once', 'session', 'always', or 'deny'
|
||||
"""
|
||||
import sys
|
||||
import threading
|
||||
|
||||
# Pause spinner if one is running
|
||||
os.environ["HERMES_SPINNER_PAUSE"] = "1"
|
||||
|
||||
try:
|
||||
print()
|
||||
print(f" ⚠️ \033[33mPotentially dangerous command detected:\033[0m {description}")
|
||||
print(f" \033[2m{command[:100]}{'...' if len(command) > 100 else ''}\033[0m")
|
||||
print()
|
||||
print(f" [\033[32mo\033[0m]nce | [\033[33ms\033[0m]ession | [\033[36ma\033[0m]lways | [\033[31md\033[0m]eny")
|
||||
print()
|
||||
sys.stdout.flush()
|
||||
|
||||
result = {"choice": ""}
|
||||
|
||||
def get_input():
|
||||
try:
|
||||
result["choice"] = input(" Choice [o/s/a/D]: ").strip().lower()
|
||||
except:
|
||||
result["choice"] = ""
|
||||
|
||||
thread = threading.Thread(target=get_input, daemon=True)
|
||||
thread.start()
|
||||
thread.join(timeout=timeout_seconds)
|
||||
|
||||
if thread.is_alive():
|
||||
print("\n ⏱ Timeout - denying command")
|
||||
return "deny"
|
||||
|
||||
choice = result["choice"]
|
||||
|
||||
if choice in ('o', 'once'):
|
||||
print(" ✓ Allowed once")
|
||||
return "once"
|
||||
elif choice in ('s', 'session'):
|
||||
print(" ✓ Allowed for this session")
|
||||
return "session"
|
||||
elif choice in ('a', 'always'):
|
||||
print(" ✓ Added to permanent allowlist")
|
||||
return "always"
|
||||
else:
|
||||
print(" ✗ Denied")
|
||||
return "deny"
|
||||
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print("\n ✗ Cancelled")
|
||||
return "deny"
|
||||
finally:
|
||||
if "HERMES_SPINNER_PAUSE" in os.environ:
|
||||
del os.environ["HERMES_SPINNER_PAUSE"]
|
||||
print()
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def _check_dangerous_command(command: str, env_type: str) -> dict:
|
||||
"""
|
||||
Check if command is dangerous and handle approval.
|
||||
|
||||
Only applies to local/ssh backends in interactive contexts.
|
||||
|
||||
Args:
|
||||
command: The command to check
|
||||
env_type: The terminal backend type
|
||||
|
||||
Returns:
|
||||
{"approved": True/False, "message": str or None}
|
||||
"""
|
||||
# Skip check for isolated environments (containers are disposable)
|
||||
if env_type in ("docker", "singularity", "modal"):
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
# Detect dangerous command
|
||||
is_dangerous, pattern_key, description = _detect_dangerous_command(command)
|
||||
|
||||
if not is_dangerous:
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
# Check if already approved
|
||||
if _is_command_approved(pattern_key):
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
# Check context - only prompt in interactive modes
|
||||
is_cli = os.getenv("HERMES_INTERACTIVE")
|
||||
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
||||
|
||||
if not is_cli and not is_gateway:
|
||||
# Programmatic use - allow (user opted into local backend)
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
if is_gateway:
|
||||
# Messaging context - return informative denial, agent should ask user
|
||||
return {
|
||||
"approved": False,
|
||||
"pattern_key": pattern_key,
|
||||
"message": f"⚠️ This command was blocked because it's potentially dangerous ({description}). If you want me to run it anyway, please confirm by saying 'yes, run it' or 'approve'."
|
||||
}
|
||||
|
||||
# CLI context - prompt user
|
||||
choice = _prompt_dangerous_approval(command, description)
|
||||
|
||||
if choice == "deny":
|
||||
return {"approved": False, "message": "Command denied by user"}
|
||||
|
||||
# Handle approval
|
||||
if choice == "session":
|
||||
_session_approved_patterns.add(pattern_key)
|
||||
elif choice == "always":
|
||||
_session_approved_patterns.add(pattern_key)
|
||||
permanent = _load_permanent_allowlist()
|
||||
permanent.add(pattern_key)
|
||||
_save_permanent_allowlist(permanent)
|
||||
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
|
||||
def _handle_sudo_failure(output: str, env_type: str) -> str:
|
||||
"""
|
||||
Check for sudo failure and add helpful message for messaging contexts.
|
||||
|
||||
Returns enhanced output if sudo failed in messaging context, else original.
|
||||
"""
|
||||
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
||||
|
||||
if not is_gateway:
|
||||
return output
|
||||
|
||||
# Check for sudo failure indicators
|
||||
sudo_failures = [
|
||||
"sudo: a password is required",
|
||||
"sudo: no tty present",
|
||||
"sudo: a terminal is required",
|
||||
]
|
||||
|
||||
for failure in sudo_failures:
|
||||
if failure in output:
|
||||
return output + "\n\n💡 Tip: To enable sudo over messaging, add SUDO_PASSWORD to ~/.hermes/.env on the agent machine."
|
||||
|
||||
return output
|
||||
|
||||
|
||||
def _prompt_for_sudo_password(timeout_seconds: int = 45) -> str:
|
||||
"""
|
||||
|
|
@ -1050,7 +1277,8 @@ def terminal_tool(
|
|||
command: str,
|
||||
background: bool = False,
|
||||
timeout: Optional[int] = None,
|
||||
task_id: Optional[str] = None
|
||||
task_id: Optional[str] = None,
|
||||
force: bool = False
|
||||
) -> str:
|
||||
"""
|
||||
Execute a command using mini-swe-agent's execution environments.
|
||||
|
|
@ -1060,6 +1288,7 @@ def terminal_tool(
|
|||
background: Whether to run in background (default: False)
|
||||
timeout: Command timeout in seconds (default: from config)
|
||||
task_id: Unique identifier for environment isolation (optional)
|
||||
force: If True, skip dangerous command check (use after user confirms)
|
||||
|
||||
Returns:
|
||||
str: JSON string with output, exit_code, and error fields
|
||||
|
|
@ -1073,6 +1302,9 @@ def terminal_tool(
|
|||
|
||||
# With custom timeout
|
||||
>>> result = terminal_tool(command="long_task.sh", timeout=300)
|
||||
|
||||
# Force run after user confirmation
|
||||
>>> result = terminal_tool(command="rm -rf /tmp/old", force=True)
|
||||
"""
|
||||
global _active_environments, _last_activity
|
||||
|
||||
|
|
@ -1149,6 +1381,19 @@ def terminal_tool(
|
|||
_last_activity[effective_task_id] = time.time()
|
||||
env = _active_environments[effective_task_id]
|
||||
|
||||
# Check for dangerous commands (only for local/ssh in interactive modes)
|
||||
# Skip check if force=True (user has confirmed they want to run it)
|
||||
if not force:
|
||||
approval = _check_dangerous_command(command, env_type)
|
||||
if not approval["approved"]:
|
||||
# Command was blocked - return informative message
|
||||
return json.dumps({
|
||||
"output": "",
|
||||
"exit_code": -1,
|
||||
"error": approval.get("message", "Command denied - potentially dangerous operation"),
|
||||
"status": "blocked"
|
||||
}, ensure_ascii=False)
|
||||
|
||||
# Prepare command for execution
|
||||
if background:
|
||||
# Run in background with nohup and redirect output
|
||||
|
|
@ -1205,6 +1450,9 @@ def terminal_tool(
|
|||
output = result.get("output", "")
|
||||
returncode = result.get("returncode", 0)
|
||||
|
||||
# Add helpful message for sudo failures in messaging context
|
||||
output = _handle_sudo_failure(output, env_type)
|
||||
|
||||
# Truncate output if too long
|
||||
MAX_OUTPUT_CHARS = 50000
|
||||
if len(output) > MAX_OUTPUT_CHARS:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue