From 762f7e97965ab9b19d6672724e90660921196569 Mon Sep 17 00:00:00 2001 From: Teknium Date: Sun, 29 Mar 2026 11:37:06 -0700 Subject: [PATCH] feat: configurable approval mode for cron jobs (approvals.cron_mode) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add approvals.cron_mode config option that controls how cron jobs handle dangerous commands. Previously, cron jobs silently auto-approved all dangerous commands because there was no user present to approve them. Now the behavior is configurable: - deny (default): block dangerous commands and return a message telling the agent to find an alternative approach. The agent loop continues — it just can't use that specific command. - approve: auto-approve all dangerous commands (previous behavior). When a command is blocked, the agent receives the same response format as a user denial in the CLI — exit_code=-1, status=blocked, with a message explaining why and pointing to the config option. This keeps the agent loop running and encourages it to adapt. Implementation: - config.py: add approvals.cron_mode to DEFAULT_CONFIG - scheduler.py: set HERMES_CRON_SESSION=1 env var before agent runs - approval.py: both check_command_approval() and check_all_command_guards() now check for cron sessions and apply the configured mode - 21 new tests covering config parsing, deny/approve behavior, and interaction with other bypass mechanisms (yolo, containers) --- cron/scheduler.py | 5 + hermes_cli/config.py | 5 + tests/tools/test_cron_approval_mode.py | 256 +++++++++++++++++++++++++ tools/approval.py | 42 ++++ 4 files changed, 308 insertions(+) create mode 100644 tests/tools/test_cron_approval_mode.py diff --git a/cron/scheduler.py b/cron/scheduler.py index db5991c6f..8938063c7 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -681,6 +681,11 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: logger.info("Running job '%s' (ID: %s)", job_name, job_id) logger.info("Prompt: %s", prompt[:100]) + # Mark this as a cron session so the approval system can apply cron_mode. + # This env var is process-wide and persists for the lifetime of the + # scheduler process — every job this process runs is a cron job. + os.environ["HERMES_CRON_SESSION"] = "1" + try: # Inject origin context so the agent's send_message tool knows the chat. # Must be INSIDE the try block so the finally cleanup always runs. diff --git a/hermes_cli/config.py b/hermes_cli/config.py index dfb6b7210..d53899b13 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -737,9 +737,14 @@ DEFAULT_CONFIG = { # manual — always prompt the user (default) # smart — use auxiliary LLM to auto-approve low-risk commands, prompt for high-risk # off — skip all approval prompts (equivalent to --yolo) + # + # cron_mode — what to do when a cron job hits a dangerous command: + # deny — block the command and let the agent find another way (default, safe) + # approve — auto-approve all dangerous commands in cron jobs "approvals": { "mode": "manual", "timeout": 60, + "cron_mode": "deny", }, # Permanently allowed dangerous command patterns (added via "always" approval) diff --git a/tests/tools/test_cron_approval_mode.py b/tests/tools/test_cron_approval_mode.py new file mode 100644 index 000000000..965d2eaa4 --- /dev/null +++ b/tests/tools/test_cron_approval_mode.py @@ -0,0 +1,256 @@ +"""Tests for approvals.cron_mode — configurable approval behavior for cron jobs.""" + +import os +import pytest + +import tools.approval as approval_module +from tools.approval import ( + _get_cron_approval_mode, + check_all_command_guards, + check_dangerous_command, + detect_dangerous_command, +) + + +@pytest.fixture(autouse=True) +def _clear_approval_state(): + approval_module._permanent_approved.clear() + approval_module.clear_session("default") + approval_module.clear_session("test-session") + yield + approval_module._permanent_approved.clear() + approval_module.clear_session("default") + approval_module.clear_session("test-session") + + +# --------------------------------------------------------------------------- +# _get_cron_approval_mode() config parsing +# --------------------------------------------------------------------------- + +class TestCronApprovalModeParsing: + def test_default_is_deny(self): + """When no config is set, cron_mode defaults to 'deny'.""" + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {}}): + assert _get_cron_approval_mode() == "deny" + + def test_explicit_deny(self): + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "deny"}}): + assert _get_cron_approval_mode() == "deny" + + def test_explicit_approve(self): + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "approve"}}): + assert _get_cron_approval_mode() == "approve" + + def test_off_maps_to_approve(self): + """'off' is an alias for 'approve' (matches --yolo semantics).""" + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "off"}}): + assert _get_cron_approval_mode() == "approve" + + def test_allow_maps_to_approve(self): + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "allow"}}): + assert _get_cron_approval_mode() == "approve" + + def test_yes_maps_to_approve(self): + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "yes"}}): + assert _get_cron_approval_mode() == "approve" + + def test_case_insensitive(self): + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "APPROVE"}}): + assert _get_cron_approval_mode() == "approve" + + def test_unknown_value_defaults_to_deny(self): + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "maybe"}}): + assert _get_cron_approval_mode() == "deny" + + def test_config_load_failure_defaults_to_deny(self): + """If config loading fails entirely, default to deny (safe).""" + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", side_effect=RuntimeError("config broken")): + assert _get_cron_approval_mode() == "deny" + + def test_yaml_boolean_false_maps_to_deny(self): + """YAML 1.1 parses bare 'off' as False. Ensure it maps to deny.""" + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": False}}): + # str(False) = "False", which is not in the approve set, so deny + assert _get_cron_approval_mode() == "deny" + + +# --------------------------------------------------------------------------- +# check_dangerous_command() with cron session +# --------------------------------------------------------------------------- + +class TestCronDenyMode: + """When HERMES_CRON_SESSION is set and cron_mode=deny, dangerous commands are blocked.""" + + def test_dangerous_command_blocked_in_cron_deny_mode(self, monkeypatch): + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_dangerous_command("rm -rf /tmp/stuff", "local") + assert not result["approved"] + assert "BLOCKED" in result["message"] + assert "cron_mode" in result["message"] + + def test_safe_command_allowed_in_cron_deny_mode(self, monkeypatch): + """Non-dangerous commands still work even with cron_mode=deny.""" + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_dangerous_command("ls -la", "local") + assert result["approved"] + + def test_multiple_dangerous_patterns_blocked(self, monkeypatch): + """All dangerous patterns are blocked, not just rm.""" + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + dangerous_commands = [ + "rm -rf /", + "chmod 777 /etc/passwd", + "mkfs.ext4 /dev/sda1", + "dd if=/dev/zero of=/dev/sda", + ] + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + for cmd in dangerous_commands: + is_dangerous, _, _ = detect_dangerous_command(cmd) + if is_dangerous: + result = check_dangerous_command(cmd, "local") + assert not result["approved"], f"Should be blocked: {cmd}" + assert "BLOCKED" in result["message"] + + def test_block_message_includes_description(self, monkeypatch): + """The block message should mention what pattern was matched.""" + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_dangerous_command("rm -rf /tmp/stuff", "local") + assert not result["approved"] + # Should contain the description of what was flagged + assert "dangerous" in result["message"].lower() or "delete" in result["message"].lower() + + +class TestCronApproveMode: + """When HERMES_CRON_SESSION is set and cron_mode=approve, dangerous commands pass through.""" + + def test_dangerous_command_allowed_in_cron_approve_mode(self, monkeypatch): + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="approve"): + result = check_dangerous_command("rm -rf /tmp/stuff", "local") + assert result["approved"] + + +# --------------------------------------------------------------------------- +# check_all_command_guards() with cron session +# --------------------------------------------------------------------------- + +class TestCronDenyModeAllGuards: + """The combined guard function also respects cron_mode.""" + + def test_dangerous_command_blocked_in_combined_guard(self, monkeypatch): + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_all_command_guards("rm -rf /tmp/stuff", "local") + assert not result["approved"] + assert "BLOCKED" in result["message"] + + def test_safe_command_allowed_in_combined_guard(self, monkeypatch): + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_all_command_guards("echo hello", "local") + assert result["approved"] + + def test_combined_guard_approve_mode(self, monkeypatch): + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="approve"): + result = check_all_command_guards("rm -rf /tmp/stuff", "local") + assert result["approved"] + + +# --------------------------------------------------------------------------- +# Edge cases: cron mode interaction with other approval mechanisms +# --------------------------------------------------------------------------- + +class TestCronModeInteractions: + """Cron mode should NOT interfere with other approval bypass mechanisms.""" + + def test_container_env_still_auto_approves(self, monkeypatch): + """Docker/sandbox environments bypass approvals regardless of cron_mode.""" + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_dangerous_command("rm -rf /", "docker") + assert result["approved"] + + def test_yolo_overrides_cron_deny(self, monkeypatch): + """--yolo still works even if cron_mode=deny.""" + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.setenv("HERMES_YOLO_MODE", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_dangerous_command("rm -rf /", "local") + assert result["approved"] + + def test_non_cron_non_interactive_still_auto_approves(self, monkeypatch): + """Non-cron, non-interactive sessions (e.g. scripted usage) still auto-approve.""" + monkeypatch.delenv("HERMES_CRON_SESSION", raising=False) + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + result = check_dangerous_command("rm -rf /tmp/stuff", "local") + assert result["approved"] diff --git a/tools/approval.py b/tools/approval.py index 7d8c5b032..fc344bd77 100644 --- a/tools/approval.py +++ b/tools/approval.py @@ -532,6 +532,19 @@ def _get_approval_timeout() -> int: return 60 +def _get_cron_approval_mode() -> str: + """Read the cron approval mode from config. Returns 'deny' or 'approve'.""" + try: + from hermes_cli.config import load_config + config = load_config() + mode = str(config.get("approvals", {}).get("cron_mode", "deny")).lower().strip() + if mode in ("approve", "off", "allow", "yes"): + return "approve" + return "deny" + except Exception: + return "deny" + + def _smart_approve(command: str, description: str) -> str: """Use the auxiliary LLM to assess risk and decide approval. @@ -614,6 +627,19 @@ def check_dangerous_command(command: str, env_type: str, is_gateway = os.getenv("HERMES_GATEWAY_SESSION") if not is_cli and not is_gateway: + # Cron sessions: respect cron_mode config + if os.getenv("HERMES_CRON_SESSION"): + if _get_cron_approval_mode() == "deny": + return { + "approved": False, + "message": ( + f"BLOCKED: Command flagged as dangerous ({description}) " + "but cron jobs run without a user present to approve it. " + "Find an alternative approach that avoids this command. " + "To allow dangerous commands in cron jobs, set " + "approvals.cron_mode: approve in config.yaml." + ), + } return {"approved": True, "message": None} if is_gateway or os.getenv("HERMES_EXEC_ASK"): @@ -712,6 +738,22 @@ def check_all_command_guards(command: str, env_type: str, # Preserve the existing non-interactive behavior: outside CLI/gateway/ask # flows, we do not block on approvals and we skip external guard work. if not is_cli and not is_gateway and not is_ask: + # Cron sessions: respect cron_mode config + if os.getenv("HERMES_CRON_SESSION"): + if _get_cron_approval_mode() == "deny": + # Run detection to get a description for the block message + is_dangerous, _pk, description = detect_dangerous_command(command) + if is_dangerous: + return { + "approved": False, + "message": ( + f"BLOCKED: Command flagged as dangerous ({description}) " + "but cron jobs run without a user present to approve it. " + "Find an alternative approach that avoids this command. " + "To allow dangerous commands in cron jobs, set " + "approvals.cron_mode: approve in config.yaml." + ), + } return {"approved": True, "message": None} # --- Phase 1: Gather findings from both checks ---