mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-09 03:11:58 +00:00
The new _is_gateway_approval_context() widened the gateway classification to any call with HERMES_SESSION_PLATFORM bound via contextvars. But cron/scheduler.py binds that same contextvar for delivery routing on cron jobs that originate from a gateway platform (telegram/discord/etc.), so those jobs were getting routed through submit_pending with no listener — blocking indefinitely instead of honoring approvals.cron_mode. Short-circuit on HERMES_CRON_SESSION before any gateway check. Cron is always governed by cron_mode config, regardless of where the job was scheduled from. Adds regression coverage in TestCronWithGatewayOrigin and records the contributor email mapping for scripts/release.py.
332 lines
16 KiB
Python
332 lines
16 KiB
Python
"""Tests for approvals.cron_mode — configurable approval behavior for cron jobs."""
|
|
|
|
import os
|
|
import pytest
|
|
|
|
import tools.approval as approval_module
|
|
from tools.approval import (
|
|
_get_cron_approval_mode,
|
|
check_all_command_guards,
|
|
check_dangerous_command,
|
|
detect_dangerous_command,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _clear_approval_state():
|
|
approval_module._permanent_approved.clear()
|
|
approval_module.clear_session("default")
|
|
approval_module.clear_session("test-session")
|
|
yield
|
|
approval_module._permanent_approved.clear()
|
|
approval_module.clear_session("default")
|
|
approval_module.clear_session("test-session")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _get_cron_approval_mode() config parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCronApprovalModeParsing:
|
|
def test_default_is_deny(self):
|
|
"""When no config is set, cron_mode defaults to 'deny'."""
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {}}):
|
|
assert _get_cron_approval_mode() == "deny"
|
|
|
|
def test_explicit_deny(self):
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "deny"}}):
|
|
assert _get_cron_approval_mode() == "deny"
|
|
|
|
def test_explicit_approve(self):
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "approve"}}):
|
|
assert _get_cron_approval_mode() == "approve"
|
|
|
|
def test_off_maps_to_approve(self):
|
|
"""'off' is an alias for 'approve' (matches --yolo semantics)."""
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "off"}}):
|
|
assert _get_cron_approval_mode() == "approve"
|
|
|
|
def test_allow_maps_to_approve(self):
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "allow"}}):
|
|
assert _get_cron_approval_mode() == "approve"
|
|
|
|
def test_yes_maps_to_approve(self):
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "yes"}}):
|
|
assert _get_cron_approval_mode() == "approve"
|
|
|
|
def test_case_insensitive(self):
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "APPROVE"}}):
|
|
assert _get_cron_approval_mode() == "approve"
|
|
|
|
def test_unknown_value_defaults_to_deny(self):
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": "maybe"}}):
|
|
assert _get_cron_approval_mode() == "deny"
|
|
|
|
def test_config_load_failure_defaults_to_deny(self):
|
|
"""If config loading fails entirely, default to deny (safe)."""
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("hermes_cli.config.load_config", side_effect=RuntimeError("config broken")):
|
|
assert _get_cron_approval_mode() == "deny"
|
|
|
|
def test_yaml_boolean_false_maps_to_deny(self):
|
|
"""YAML 1.1 parses bare 'off' as False. Ensure it maps to deny."""
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("hermes_cli.config.load_config", return_value={"approvals": {"cron_mode": False}}):
|
|
# str(False) = "False", which is not in the approve set, so deny
|
|
assert _get_cron_approval_mode() == "deny"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# check_dangerous_command() with cron session
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCronDenyMode:
|
|
"""When HERMES_CRON_SESSION is set and cron_mode=deny, dangerous commands are blocked."""
|
|
|
|
def test_dangerous_command_blocked_in_cron_deny_mode(self, monkeypatch):
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
|
result = check_dangerous_command("rm -rf /tmp/stuff", "local")
|
|
assert not result["approved"]
|
|
assert "BLOCKED" in result["message"]
|
|
assert "cron_mode" in result["message"]
|
|
|
|
def test_safe_command_allowed_in_cron_deny_mode(self, monkeypatch):
|
|
"""Non-dangerous commands still work even with cron_mode=deny."""
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
|
result = check_dangerous_command("ls -la", "local")
|
|
assert result["approved"]
|
|
|
|
def test_multiple_dangerous_patterns_blocked(self, monkeypatch):
|
|
"""All dangerous patterns are blocked, not just rm."""
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
|
|
dangerous_commands = [
|
|
"rm -rf /",
|
|
"chmod 777 /etc/passwd",
|
|
"mkfs.ext4 /dev/sda1",
|
|
"dd if=/dev/zero of=/dev/sda",
|
|
]
|
|
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
|
for cmd in dangerous_commands:
|
|
is_dangerous, _, _ = detect_dangerous_command(cmd)
|
|
if is_dangerous:
|
|
result = check_dangerous_command(cmd, "local")
|
|
assert not result["approved"], f"Should be blocked: {cmd}"
|
|
assert "BLOCKED" in result["message"]
|
|
|
|
def test_block_message_includes_description(self, monkeypatch):
|
|
"""The block message should mention what pattern was matched."""
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
|
result = check_dangerous_command("rm -rf /tmp/stuff", "local")
|
|
assert not result["approved"]
|
|
# Should contain the description of what was flagged
|
|
assert "dangerous" in result["message"].lower() or "delete" in result["message"].lower()
|
|
|
|
|
|
class TestCronApproveMode:
|
|
"""When HERMES_CRON_SESSION is set and cron_mode=approve, dangerous commands pass through."""
|
|
|
|
def test_dangerous_command_allowed_in_cron_approve_mode(self, monkeypatch):
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="approve"):
|
|
result = check_dangerous_command("rm -rf /tmp/stuff", "local")
|
|
assert result["approved"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# check_all_command_guards() with cron session
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCronDenyModeAllGuards:
|
|
"""The combined guard function also respects cron_mode."""
|
|
|
|
def test_dangerous_command_blocked_in_combined_guard(self, monkeypatch):
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
|
result = check_all_command_guards("rm -rf /tmp/stuff", "local")
|
|
assert not result["approved"]
|
|
assert "BLOCKED" in result["message"]
|
|
|
|
def test_safe_command_allowed_in_combined_guard(self, monkeypatch):
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
|
result = check_all_command_guards("echo hello", "local")
|
|
assert result["approved"]
|
|
|
|
def test_combined_guard_approve_mode(self, monkeypatch):
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="approve"):
|
|
result = check_all_command_guards("rm -rf /tmp/stuff", "local")
|
|
assert result["approved"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Edge cases: cron mode interaction with other approval mechanisms
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCronModeInteractions:
|
|
"""Cron mode should NOT interfere with other approval bypass mechanisms."""
|
|
|
|
def test_container_env_still_auto_approves(self, monkeypatch):
|
|
"""Docker/sandbox environments bypass approvals regardless of cron_mode."""
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
|
result = check_dangerous_command("rm -rf /", "docker")
|
|
assert result["approved"]
|
|
|
|
def test_yolo_overrides_cron_deny(self, monkeypatch):
|
|
"""--yolo still bypasses cron_mode=deny for dangerous (non-hardline) commands."""
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.setenv("HERMES_YOLO_MODE", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
|
# Use a dangerous-but-not-hardline command — `rm -rf /` is now
|
|
# hardline-blocked regardless of yolo (see test_hardline_blocklist.py).
|
|
result = check_dangerous_command("rm -rf /tmp/stuff", "local")
|
|
assert result["approved"]
|
|
|
|
def test_non_cron_non_interactive_still_auto_approves(self, monkeypatch):
|
|
"""Non-cron, non-interactive sessions (e.g. scripted usage) still auto-approve."""
|
|
monkeypatch.delenv("HERMES_CRON_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
|
|
result = check_dangerous_command("rm -rf /tmp/stuff", "local")
|
|
assert result["approved"]
|
|
|
|
|
|
class TestCronWithGatewayOrigin:
|
|
"""Cron jobs originating from a gateway platform must NOT be treated as gateway.
|
|
|
|
cron/scheduler.py binds HERMES_SESSION_PLATFORM via contextvars for
|
|
delivery routing (so cron output lands back in the origin chat). The
|
|
API-server approvals work (PR #20311) made check_dangerous_command treat
|
|
any contextvar-bound platform as a gateway session. That would route
|
|
cron-from-telegram/discord/etc. through submit_pending with no listener,
|
|
hanging the job instead of respecting approvals.cron_mode.
|
|
"""
|
|
|
|
def test_cron_with_telegram_origin_uses_cron_mode_not_gateway(self, monkeypatch):
|
|
"""Cron + contextvar platform=telegram + cron_mode=deny → BLOCKED, not pending."""
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
|
|
|
|
from gateway.session_context import set_session_vars, clear_session_vars
|
|
tokens = set_session_vars(platform="telegram", chat_id="123")
|
|
try:
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
|
result = check_dangerous_command("rm -rf /tmp/stuff", "local")
|
|
# Cron-mode path: BLOCKED message, NOT pending/approval_required.
|
|
assert not result["approved"]
|
|
assert "BLOCKED" in result["message"]
|
|
assert "cron_mode" in result["message"]
|
|
assert result.get("status") != "approval_required"
|
|
finally:
|
|
clear_session_vars(tokens)
|
|
|
|
def test_cron_with_telegram_origin_approve_mode_allows(self, monkeypatch):
|
|
"""Cron + contextvar platform=telegram + cron_mode=approve → allowed via cron path."""
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
|
|
|
|
from gateway.session_context import set_session_vars, clear_session_vars
|
|
tokens = set_session_vars(platform="discord", chat_id="456")
|
|
try:
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="approve"):
|
|
result = check_dangerous_command("rm -rf /tmp/stuff", "local")
|
|
assert result["approved"]
|
|
# Should NOT be a gateway-approval response.
|
|
assert result.get("status") != "approval_required"
|
|
finally:
|
|
clear_session_vars(tokens)
|
|
|
|
def test_cron_with_telegram_origin_combined_guard_uses_cron_mode(self, monkeypatch):
|
|
"""check_all_command_guards must also honor cron_mode over gateway classification."""
|
|
monkeypatch.setenv("HERMES_CRON_SESSION", "1")
|
|
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
|
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
|
|
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
|
|
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
|
|
|
|
from gateway.session_context import set_session_vars, clear_session_vars
|
|
tokens = set_session_vars(platform="telegram", chat_id="789")
|
|
try:
|
|
from unittest.mock import patch as mock_patch
|
|
with mock_patch("tools.approval._get_cron_approval_mode", return_value="deny"):
|
|
result = check_all_command_guards("rm -rf /tmp/stuff", "local")
|
|
assert not result["approved"]
|
|
assert "BLOCKED" in result["message"]
|
|
assert result.get("status") != "approval_required"
|
|
finally:
|
|
clear_session_vars(tokens)
|