mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
feat: configurable approval mode for cron jobs (approvals.cron_mode)
Add approvals.cron_mode config option that controls how cron jobs handle
dangerous commands. Previously, cron jobs silently auto-approved all
dangerous commands because there was no user present to approve them.
Now the behavior is configurable:
- deny (default): block dangerous commands and return a message telling
the agent to find an alternative approach. The agent loop continues —
it just can't use that specific command.
- approve: auto-approve all dangerous commands (previous behavior).
When a command is blocked, the agent receives the same response format as
a user denial in the CLI — exit_code=-1, status=blocked, with a message
explaining why and pointing to the config option. This keeps the agent
loop running and encourages it to adapt.
Implementation:
- config.py: add approvals.cron_mode to DEFAULT_CONFIG
- scheduler.py: set HERMES_CRON_SESSION=1 env var before agent runs
- approval.py: both check_command_approval() and check_all_command_guards()
now check for cron sessions and apply the configured mode
- 21 new tests covering config parsing, deny/approve behavior, and
interaction with other bypass mechanisms (yolo, containers)
This commit is contained in:
parent
b02833f32d
commit
762f7e9796
4 changed files with 308 additions and 0 deletions
|
|
@ -681,6 +681,11 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|||
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
|
||||
logger.info("Prompt: %s", prompt[:100])
|
||||
|
||||
# Mark this as a cron session so the approval system can apply cron_mode.
|
||||
# This env var is process-wide and persists for the lifetime of the
|
||||
# scheduler process — every job this process runs is a cron job.
|
||||
os.environ["HERMES_CRON_SESSION"] = "1"
|
||||
|
||||
try:
|
||||
# Inject origin context so the agent's send_message tool knows the chat.
|
||||
# Must be INSIDE the try block so the finally cleanup always runs.
|
||||
|
|
|
|||
|
|
@ -737,9 +737,14 @@ DEFAULT_CONFIG = {
|
|||
# manual — always prompt the user (default)
|
||||
# smart — use auxiliary LLM to auto-approve low-risk commands, prompt for high-risk
|
||||
# off — skip all approval prompts (equivalent to --yolo)
|
||||
#
|
||||
# cron_mode — what to do when a cron job hits a dangerous command:
|
||||
# deny — block the command and let the agent find another way (default, safe)
|
||||
# approve — auto-approve all dangerous commands in cron jobs
|
||||
"approvals": {
|
||||
"mode": "manual",
|
||||
"timeout": 60,
|
||||
"cron_mode": "deny",
|
||||
},
|
||||
|
||||
# Permanently allowed dangerous command patterns (added via "always" approval)
|
||||
|
|
|
|||
256
tests/tools/test_cron_approval_mode.py
Normal file
256
tests/tools/test_cron_approval_mode.py
Normal file
|
|
@ -0,0 +1,256 @@
|
|||
"""Tests for approvals.cron_mode — configurable approval behavior for cron jobs."""
|
||||
|
||||
import os
|
||||
import pytest
|
||||
|
||||
import tools.approval as approval_module
|
||||
from tools.approval import (
|
||||
_get_cron_approval_mode,
|
||||
check_all_command_guards,
|
||||
check_dangerous_command,
|
||||
detect_dangerous_command,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clear_approval_state():
|
||||
approval_module._permanent_approved.clear()
|
||||
approval_module.clear_session("default")
|
||||
approval_module.clear_session("test-session")
|
||||
yield
|
||||
approval_module._permanent_approved.clear()
|
||||
approval_module.clear_session("default")
|
||||
approval_module.clear_session("test-session")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _get_cron_approval_mode() config parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCronApprovalModeParsing:
|
||||
def test_default_is_deny(self):
|
||||
"""When no config is set, cron_mode defaults to 'deny'."""
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {}}):
|
||||
assert _get_cron_approval_mode() == "deny"
|
||||
|
||||
def test_explicit_deny(self):
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "deny"}}):
|
||||
assert _get_cron_approval_mode() == "deny"
|
||||
|
||||
def test_explicit_approve(self):
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "approve"}}):
|
||||
assert _get_cron_approval_mode() == "approve"
|
||||
|
||||
def test_off_maps_to_approve(self):
|
||||
"""'off' is an alias for 'approve' (matches --yolo semantics)."""
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "off"}}):
|
||||
assert _get_cron_approval_mode() == "approve"
|
||||
|
||||
def test_allow_maps_to_approve(self):
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "allow"}}):
|
||||
assert _get_cron_approval_mode() == "approve"
|
||||
|
||||
def test_yes_maps_to_approve(self):
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "yes"}}):
|
||||
assert _get_cron_approval_mode() == "approve"
|
||||
|
||||
def test_case_insensitive(self):
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "APPROVE"}}):
|
||||
assert _get_cron_approval_mode() == "approve"
|
||||
|
||||
def test_unknown_value_defaults_to_deny(self):
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "maybe"}}):
|
||||
assert _get_cron_approval_mode() == "deny"
|
||||
|
||||
def test_config_load_failure_defaults_to_deny(self):
|
||||
"""If config loading fails entirely, default to deny (safe)."""
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("hermes_cli.config.load_config", side_effect=RuntimeError("config broken")):
|
||||
assert _get_cron_approval_mode() == "deny"
|
||||
|
||||
def test_yaml_boolean_false_maps_to_deny(self):
|
||||
"""YAML 1.1 parses bare 'off' as False. Ensure it maps to deny."""
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": False}}):
|
||||
# str(False) = "False", which is not in the approve set, so deny
|
||||
assert _get_cron_approval_mode() == "deny"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# check_dangerous_command() with cron session
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCronDenyMode:
|
||||
"""When HERMES_CRON_SESSION is set and cron_mode=deny, dangerous commands are blocked."""
|
||||
|
||||
def test_dangerous_command_blocked_in_cron_deny_mode(self, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
||||
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
||||
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
||||
result = check_dangerous_command("rm -rf /tmp/stuff", "local")
|
||||
assert not result["approved"]
|
||||
assert "BLOCKED" in result["message"]
|
||||
assert "cron_mode" in result["message"]
|
||||
|
||||
def test_safe_command_allowed_in_cron_deny_mode(self, monkeypatch):
|
||||
"""Non-dangerous commands still work even with cron_mode=deny."""
|
||||
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
||||
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
||||
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
||||
result = check_dangerous_command("ls -la", "local")
|
||||
assert result["approved"]
|
||||
|
||||
def test_multiple_dangerous_patterns_blocked(self, monkeypatch):
|
||||
"""All dangerous patterns are blocked, not just rm."""
|
||||
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
||||
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
||||
|
||||
dangerous_commands = [
|
||||
"rm -rf /",
|
||||
"chmod 777 /etc/passwd",
|
||||
"mkfs.ext4 /dev/sda1",
|
||||
"dd if=/dev/zero of=/dev/sda",
|
||||
]
|
||||
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
||||
for cmd in dangerous_commands:
|
||||
is_dangerous, _, _ = detect_dangerous_command(cmd)
|
||||
if is_dangerous:
|
||||
result = check_dangerous_command(cmd, "local")
|
||||
assert not result["approved"], f"Should be blocked: {cmd}"
|
||||
assert "BLOCKED" in result["message"]
|
||||
|
||||
def test_block_message_includes_description(self, monkeypatch):
|
||||
"""The block message should mention what pattern was matched."""
|
||||
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
||||
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
||||
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
||||
result = check_dangerous_command("rm -rf /tmp/stuff", "local")
|
||||
assert not result["approved"]
|
||||
# Should contain the description of what was flagged
|
||||
assert "dangerous" in result["message"].lower() or "delete" in result["message"].lower()
|
||||
|
||||
|
||||
class TestCronApproveMode:
|
||||
"""When HERMES_CRON_SESSION is set and cron_mode=approve, dangerous commands pass through."""
|
||||
|
||||
def test_dangerous_command_allowed_in_cron_approve_mode(self, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
||||
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
||||
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("tools.approval._get_cron_approval_mode", return_value="approve"):
|
||||
result = check_dangerous_command("rm -rf /tmp/stuff", "local")
|
||||
assert result["approved"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# check_all_command_guards() with cron session
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCronDenyModeAllGuards:
|
||||
"""The combined guard function also respects cron_mode."""
|
||||
|
||||
def test_dangerous_command_blocked_in_combined_guard(self, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
||||
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
|
||||
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
||||
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
||||
result = check_all_command_guards("rm -rf /tmp/stuff", "local")
|
||||
assert not result["approved"]
|
||||
assert "BLOCKED" in result["message"]
|
||||
|
||||
def test_safe_command_allowed_in_combined_guard(self, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
||||
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
|
||||
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
||||
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
||||
result = check_all_command_guards("echo hello", "local")
|
||||
assert result["approved"]
|
||||
|
||||
def test_combined_guard_approve_mode(self, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
||||
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
|
||||
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
||||
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("tools.approval._get_cron_approval_mode", return_value="approve"):
|
||||
result = check_all_command_guards("rm -rf /tmp/stuff", "local")
|
||||
assert result["approved"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Edge cases: cron mode interaction with other approval mechanisms
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCronModeInteractions:
|
||||
"""Cron mode should NOT interfere with other approval bypass mechanisms."""
|
||||
|
||||
def test_container_env_still_auto_approves(self, monkeypatch):
|
||||
"""Docker/sandbox environments bypass approvals regardless of cron_mode."""
|
||||
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
||||
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
||||
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
||||
result = check_dangerous_command("rm -rf /", "docker")
|
||||
assert result["approved"]
|
||||
|
||||
def test_yolo_overrides_cron_deny(self, monkeypatch):
|
||||
"""--yolo still works even if cron_mode=deny."""
|
||||
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
||||
monkeypatch.setenv("HERMES_YOLO_MODE", "1")
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
||||
|
||||
from unittest.mock import patch as mock_patch
|
||||
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
||||
result = check_dangerous_command("rm -rf /", "local")
|
||||
assert result["approved"]
|
||||
|
||||
def test_non_cron_non_interactive_still_auto_approves(self, monkeypatch):
|
||||
"""Non-cron, non-interactive sessions (e.g. scripted usage) still auto-approve."""
|
||||
monkeypatch.delenv("HERMES_CRON_SESSION", raising=False)
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
||||
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
||||
|
||||
result = check_dangerous_command("rm -rf /tmp/stuff", "local")
|
||||
assert result["approved"]
|
||||
|
|
@ -532,6 +532,19 @@ def _get_approval_timeout() -> int:
|
|||
return 60
|
||||
|
||||
|
||||
def _get_cron_approval_mode() -> str:
|
||||
"""Read the cron approval mode from config. Returns 'deny' or 'approve'."""
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
config = load_config()
|
||||
mode = str(config.get("approvals", {}).get("cron_mode", "deny")).lower().strip()
|
||||
if mode in ("approve", "off", "allow", "yes"):
|
||||
return "approve"
|
||||
return "deny"
|
||||
except Exception:
|
||||
return "deny"
|
||||
|
||||
|
||||
def _smart_approve(command: str, description: str) -> str:
|
||||
"""Use the auxiliary LLM to assess risk and decide approval.
|
||||
|
||||
|
|
@ -614,6 +627,19 @@ def check_dangerous_command(command: str, env_type: str,
|
|||
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
||||
|
||||
if not is_cli and not is_gateway:
|
||||
# Cron sessions: respect cron_mode config
|
||||
if os.getenv("HERMES_CRON_SESSION"):
|
||||
if _get_cron_approval_mode() == "deny":
|
||||
return {
|
||||
"approved": False,
|
||||
"message": (
|
||||
f"BLOCKED: Command flagged as dangerous ({description}) "
|
||||
"but cron jobs run without a user present to approve it. "
|
||||
"Find an alternative approach that avoids this command. "
|
||||
"To allow dangerous commands in cron jobs, set "
|
||||
"approvals.cron_mode: approve in config.yaml."
|
||||
),
|
||||
}
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
if is_gateway or os.getenv("HERMES_EXEC_ASK"):
|
||||
|
|
@ -712,6 +738,22 @@ def check_all_command_guards(command: str, env_type: str,
|
|||
# Preserve the existing non-interactive behavior: outside CLI/gateway/ask
|
||||
# flows, we do not block on approvals and we skip external guard work.
|
||||
if not is_cli and not is_gateway and not is_ask:
|
||||
# Cron sessions: respect cron_mode config
|
||||
if os.getenv("HERMES_CRON_SESSION"):
|
||||
if _get_cron_approval_mode() == "deny":
|
||||
# Run detection to get a description for the block message
|
||||
is_dangerous, _pk, description = detect_dangerous_command(command)
|
||||
if is_dangerous:
|
||||
return {
|
||||
"approved": False,
|
||||
"message": (
|
||||
f"BLOCKED: Command flagged as dangerous ({description}) "
|
||||
"but cron jobs run without a user present to approve it. "
|
||||
"Find an alternative approach that avoids this command. "
|
||||
"To allow dangerous commands in cron jobs, set "
|
||||
"approvals.cron_mode: approve in config.yaml."
|
||||
),
|
||||
}
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
# --- Phase 1: Gather findings from both checks ---
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue