diff --git a/cron/scheduler.py b/cron/scheduler.py index db5991c6f02..8938063c7ff 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -681,6 +681,11 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: logger.info("Running job '%s' (ID: %s)", job_name, job_id) logger.info("Prompt: %s", prompt[:100]) + # Mark this as a cron session so the approval system can apply cron_mode. + # This env var is process-wide and persists for the lifetime of the + # scheduler process — every job this process runs is a cron job. + os.environ["HERMES_CRON_SESSION"] = "1" + try: # Inject origin context so the agent's send_message tool knows the chat. # Must be INSIDE the try block so the finally cleanup always runs. diff --git a/hermes_cli/config.py b/hermes_cli/config.py index dfb6b7210a4..d53899b135e 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -737,9 +737,14 @@ DEFAULT_CONFIG = { # manual — always prompt the user (default) # smart — use auxiliary LLM to auto-approve low-risk commands, prompt for high-risk # off — skip all approval prompts (equivalent to --yolo) + # + # cron_mode — what to do when a cron job hits a dangerous command: + # deny — block the command and let the agent find another way (default, safe) + # approve — auto-approve all dangerous commands in cron jobs "approvals": { "mode": "manual", "timeout": 60, + "cron_mode": "deny", }, # Permanently allowed dangerous command patterns (added via "always" approval) diff --git a/tests/tools/test_cron_approval_mode.py b/tests/tools/test_cron_approval_mode.py new file mode 100644 index 00000000000..965d2eaa474 --- /dev/null +++ b/tests/tools/test_cron_approval_mode.py @@ -0,0 +1,256 @@ +"""Tests for approvals.cron_mode — configurable approval behavior for cron jobs.""" + +import os +import pytest + +import tools.approval as approval_module +from tools.approval import ( + _get_cron_approval_mode, + check_all_command_guards, + check_dangerous_command, + detect_dangerous_command, +) + + +@pytest.fixture(autouse=True) +def _clear_approval_state(): + approval_module._permanent_approved.clear() + approval_module.clear_session("default") + approval_module.clear_session("test-session") + yield + approval_module._permanent_approved.clear() + approval_module.clear_session("default") + approval_module.clear_session("test-session") + + +# --------------------------------------------------------------------------- +# _get_cron_approval_mode() config parsing +# --------------------------------------------------------------------------- + +class TestCronApprovalModeParsing: + def test_default_is_deny(self): + """When no config is set, cron_mode defaults to 'deny'.""" + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {}}): + assert _get_cron_approval_mode() == "deny" + + def test_explicit_deny(self): + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "deny"}}): + assert _get_cron_approval_mode() == "deny" + + def test_explicit_approve(self): + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "approve"}}): + assert _get_cron_approval_mode() == "approve" + + def test_off_maps_to_approve(self): + """'off' is an alias for 'approve' (matches --yolo semantics).""" + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "off"}}): + assert _get_cron_approval_mode() == "approve" + + def test_allow_maps_to_approve(self): + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "allow"}}): + assert _get_cron_approval_mode() == "approve" + + def test_yes_maps_to_approve(self): + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "yes"}}): + assert _get_cron_approval_mode() == "approve" + + def test_case_insensitive(self): + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "APPROVE"}}): + assert _get_cron_approval_mode() == "approve" + + def test_unknown_value_defaults_to_deny(self): + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "maybe"}}): + assert _get_cron_approval_mode() == "deny" + + def test_config_load_failure_defaults_to_deny(self): + """If config loading fails entirely, default to deny (safe).""" + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", side_effect=RuntimeError("config broken")): + assert _get_cron_approval_mode() == "deny" + + def test_yaml_boolean_false_maps_to_deny(self): + """YAML 1.1 parses bare 'off' as False. Ensure it maps to deny.""" + from unittest.mock import patch as mock_patch + with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": False}}): + # str(False) = "False", which is not in the approve set, so deny + assert _get_cron_approval_mode() == "deny" + + +# --------------------------------------------------------------------------- +# check_dangerous_command() with cron session +# --------------------------------------------------------------------------- + +class TestCronDenyMode: + """When HERMES_CRON_SESSION is set and cron_mode=deny, dangerous commands are blocked.""" + + def test_dangerous_command_blocked_in_cron_deny_mode(self, monkeypatch): + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_dangerous_command("rm -rf /tmp/stuff", "local") + assert not result["approved"] + assert "BLOCKED" in result["message"] + assert "cron_mode" in result["message"] + + def test_safe_command_allowed_in_cron_deny_mode(self, monkeypatch): + """Non-dangerous commands still work even with cron_mode=deny.""" + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_dangerous_command("ls -la", "local") + assert result["approved"] + + def test_multiple_dangerous_patterns_blocked(self, monkeypatch): + """All dangerous patterns are blocked, not just rm.""" + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + dangerous_commands = [ + "rm -rf /", + "chmod 777 /etc/passwd", + "mkfs.ext4 /dev/sda1", + "dd if=/dev/zero of=/dev/sda", + ] + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + for cmd in dangerous_commands: + is_dangerous, _, _ = detect_dangerous_command(cmd) + if is_dangerous: + result = check_dangerous_command(cmd, "local") + assert not result["approved"], f"Should be blocked: {cmd}" + assert "BLOCKED" in result["message"] + + def test_block_message_includes_description(self, monkeypatch): + """The block message should mention what pattern was matched.""" + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_dangerous_command("rm -rf /tmp/stuff", "local") + assert not result["approved"] + # Should contain the description of what was flagged + assert "dangerous" in result["message"].lower() or "delete" in result["message"].lower() + + +class TestCronApproveMode: + """When HERMES_CRON_SESSION is set and cron_mode=approve, dangerous commands pass through.""" + + def test_dangerous_command_allowed_in_cron_approve_mode(self, monkeypatch): + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="approve"): + result = check_dangerous_command("rm -rf /tmp/stuff", "local") + assert result["approved"] + + +# --------------------------------------------------------------------------- +# check_all_command_guards() with cron session +# --------------------------------------------------------------------------- + +class TestCronDenyModeAllGuards: + """The combined guard function also respects cron_mode.""" + + def test_dangerous_command_blocked_in_combined_guard(self, monkeypatch): + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_all_command_guards("rm -rf /tmp/stuff", "local") + assert not result["approved"] + assert "BLOCKED" in result["message"] + + def test_safe_command_allowed_in_combined_guard(self, monkeypatch): + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_all_command_guards("echo hello", "local") + assert result["approved"] + + def test_combined_guard_approve_mode(self, monkeypatch): + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="approve"): + result = check_all_command_guards("rm -rf /tmp/stuff", "local") + assert result["approved"] + + +# --------------------------------------------------------------------------- +# Edge cases: cron mode interaction with other approval mechanisms +# --------------------------------------------------------------------------- + +class TestCronModeInteractions: + """Cron mode should NOT interfere with other approval bypass mechanisms.""" + + def test_container_env_still_auto_approves(self, monkeypatch): + """Docker/sandbox environments bypass approvals regardless of cron_mode.""" + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_dangerous_command("rm -rf /", "docker") + assert result["approved"] + + def test_yolo_overrides_cron_deny(self, monkeypatch): + """--yolo still works even if cron_mode=deny.""" + monkeypatch.setenv("HERMES_CRON_SESSION", "1") + monkeypatch.setenv("HERMES_YOLO_MODE", "1") + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + + from unittest.mock import patch as mock_patch + with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"): + result = check_dangerous_command("rm -rf /", "local") + assert result["approved"] + + def test_non_cron_non_interactive_still_auto_approves(self, monkeypatch): + """Non-cron, non-interactive sessions (e.g. scripted usage) still auto-approve.""" + monkeypatch.delenv("HERMES_CRON_SESSION", raising=False) + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + + result = check_dangerous_command("rm -rf /tmp/stuff", "local") + assert result["approved"] diff --git a/tools/approval.py b/tools/approval.py index 7d8c5b032e8..fc344bd77b7 100644 --- a/tools/approval.py +++ b/tools/approval.py @@ -532,6 +532,19 @@ def _get_approval_timeout() -> int: return 60 +def _get_cron_approval_mode() -> str: + """Read the cron approval mode from config. Returns 'deny' or 'approve'.""" + try: + from hermes_cli.config import load_config + config = load_config() + mode = str(config.get("approvals", {}).get("cron_mode", "deny")).lower().strip() + if mode in ("approve", "off", "allow", "yes"): + return "approve" + return "deny" + except Exception: + return "deny" + + def _smart_approve(command: str, description: str) -> str: """Use the auxiliary LLM to assess risk and decide approval. @@ -614,6 +627,19 @@ def check_dangerous_command(command: str, env_type: str, is_gateway = os.getenv("HERMES_GATEWAY_SESSION") if not is_cli and not is_gateway: + # Cron sessions: respect cron_mode config + if os.getenv("HERMES_CRON_SESSION"): + if _get_cron_approval_mode() == "deny": + return { + "approved": False, + "message": ( + f"BLOCKED: Command flagged as dangerous ({description}) " + "but cron jobs run without a user present to approve it. " + "Find an alternative approach that avoids this command. " + "To allow dangerous commands in cron jobs, set " + "approvals.cron_mode: approve in config.yaml." + ), + } return {"approved": True, "message": None} if is_gateway or os.getenv("HERMES_EXEC_ASK"): @@ -712,6 +738,22 @@ def check_all_command_guards(command: str, env_type: str, # Preserve the existing non-interactive behavior: outside CLI/gateway/ask # flows, we do not block on approvals and we skip external guard work. if not is_cli and not is_gateway and not is_ask: + # Cron sessions: respect cron_mode config + if os.getenv("HERMES_CRON_SESSION"): + if _get_cron_approval_mode() == "deny": + # Run detection to get a description for the block message + is_dangerous, _pk, description = detect_dangerous_command(command) + if is_dangerous: + return { + "approved": False, + "message": ( + f"BLOCKED: Command flagged as dangerous ({description}) " + "but cron jobs run without a user present to approve it. " + "Find an alternative approach that avoids this command. " + "To allow dangerous commands in cron jobs, set " + "approvals.cron_mode: approve in config.yaml." + ), + } return {"approved": True, "message": None} # --- Phase 1: Gather findings from both checks ---