diff --git a/tests/tools/test_approval.py b/tests/tools/test_approval.py index 0694dbcdc91..942d27cbe13 100644 --- a/tests/tools/test_approval.py +++ b/tests/tools/test_approval.py @@ -1,6 +1,9 @@ """Tests for the dangerous command approval module.""" import ast +import os +import threading +import time from pathlib import Path from types import SimpleNamespace from unittest.mock import patch as mock_patch @@ -1305,3 +1308,165 @@ class TestEtcPatternsUnaffectedByRefactor: def test_grep_etc_passwd_is_safe(self): dangerous, _, _ = detect_dangerous_command("grep root /etc/passwd") assert dangerous is False + + +# ========================================================================= +# Gateway approval timeout = deny, NOT consent (#24912) +# +# A Slack user walked away mid-conversation; the agent requested approval +# to run `rm -rf .git`; the prompt timed out; the agent ran the command +# anyway. Reported by @tofalck on 2026-05-13, corroborated by +# @angry-programmer on Telegram. Silence is not consent. +# +# These tests pin: +# 1. Gateway timeout → approved=False, with a message strong enough that +# a downstream agent reading "BLOCKED: ... Silence is not consent." +# treats it as a hard halt, not an invitation to rephrase. +# 2. The structured outcome / user_consent fields are present so +# plugins, hooks, and audit pipelines can act on the timeout without +# string-parsing the message. +# 3. Explicit /deny carries the same shape (treat-as-not-consented). +# ========================================================================= + + +class TestApprovalTimeoutIsNotConsent: + """The gateway approval contract: silence is not consent (#24912).""" + + SESSION_KEY = "test-no-consent-session" + + def setup_method(self): + """Reset module state and force tight gateway_timeout for fast tests.""" + from tools import approval as mod + mod._gateway_queues.clear() + mod._gateway_notify_cbs.clear() + mod._session_approved.clear() + mod._permanent_approved.clear() + mod._pending.clear() + + self._saved_env = { + k: os.environ.get(k) + for k in ("HERMES_GATEWAY_SESSION", "HERMES_YOLO_MODE", + "HERMES_SESSION_KEY", "HERMES_INTERACTIVE") + } + os.environ.pop("HERMES_YOLO_MODE", None) + os.environ.pop("HERMES_INTERACTIVE", None) + os.environ["HERMES_GATEWAY_SESSION"] = "1" + os.environ["HERMES_SESSION_KEY"] = self.SESSION_KEY + + def teardown_method(self): + from tools import approval as mod + mod._gateway_queues.clear() + mod._gateway_notify_cbs.clear() + for k, v in self._saved_env.items(): + if v is None: + os.environ.pop(k, None) + else: + os.environ[k] = v + + def _force_short_timeout(self, monkeypatch, seconds=1): + from tools import approval as mod + monkeypatch.setattr( + mod, "_get_approval_config", + lambda: {"mode": "manual", "gateway_timeout": seconds, "timeout": seconds}, + ) + + def test_timeout_returns_approved_false_with_no_consent(self, monkeypatch): + """The reported #24912 scenario — user never responds, agent must see BLOCKED.""" + from tools import approval as mod + + self._force_short_timeout(monkeypatch, seconds=1) + + # Slack-shaped: notify_cb registered, but user doesn't respond. + notified = [] + mod.register_gateway_notify(self.SESSION_KEY, lambda data: notified.append(data)) + + result = mod.check_all_command_guards("rm -rf .git", "local") + + assert result["approved"] is False + assert result.get("user_consent") is False + assert result.get("outcome") == "timeout" + # The notify_cb DID fire — we did try to ask the user. + assert len(notified) == 1 + + def test_timeout_message_is_emphatic_against_retry_and_rephrase(self, monkeypatch): + """The BLOCKED message must explicitly tell the agent not to rephrase. + + Without this, the agent treats 'Do NOT retry this command' as + permission to try a different command achieving the same outcome. + """ + from tools import approval as mod + self._force_short_timeout(monkeypatch, seconds=1) + mod.register_gateway_notify(self.SESSION_KEY, lambda data: None) + + result = mod.check_all_command_guards("rm -rf .git", "local") + + msg = result["message"] + # Explicit halt signals — these are the model-facing contract. + assert "BLOCKED" in msg + assert "NOT consented" in msg + assert "Silence is not consent" in msg + # Both forms of evasion must be named: + assert "do NOT retry" in msg.lower() or "Do NOT retry" in msg + assert "rephrase" in msg.lower() + assert "different command" in msg.lower() + + def test_explicit_deny_carries_same_no_consent_shape(self): + """An explicit /deny must produce the same shape as timeout — + the agent should treat both identically.""" + from tools import approval as mod + + notified = [] + mod.register_gateway_notify(self.SESSION_KEY, lambda data: notified.append(data)) + + # Spawn the approval wait in a thread, then resolve it with "deny". + result_holder = {} + def _check(): + result_holder["r"] = mod.check_all_command_guards("rm -rf .git", "local") + t = threading.Thread(target=_check) + t.start() + + # Wait for the queue entry to appear, then resolve. + for _ in range(50): + if mod._gateway_queues.get(self.SESSION_KEY): + break + time.sleep(0.02) + mod.resolve_gateway_approval(self.SESSION_KEY, "deny") + t.join(timeout=5) + assert "r" in result_holder, "approval wait did not return after deny" + + r = result_holder["r"] + assert r["approved"] is False + assert r.get("user_consent") is False + assert r.get("outcome") == "denied" + assert "Silence is not consent" not in r["message"] # this one IS denied, not timed-out + assert "NOT consented" in r["message"] + assert "rephrase" in r["message"].lower() + + def test_timeout_emits_post_hook_with_timeout_outcome(self, monkeypatch): + """Plugins must be able to distinguish timeout from explicit deny. + + This is what an audit / notification plugin needs to alert + operators on 'agent asked, user never replied' incidents like #24912. + """ + from tools import approval as mod + self._force_short_timeout(monkeypatch, seconds=1) + mod.register_gateway_notify(self.SESSION_KEY, lambda data: None) + + hook_calls = [] + original_fire = mod._fire_approval_hook + + def _capture(event_name, **kwargs): + hook_calls.append((event_name, kwargs)) + return original_fire(event_name, **kwargs) + + monkeypatch.setattr(mod, "_fire_approval_hook", _capture) + + mod.check_all_command_guards("rm -rf .git", "local") + + # post_approval_response must be in the hook log with choice=timeout + posts = [c for c in hook_calls if c[0] == "post_approval_response"] + assert posts, "post_approval_response hook did not fire" + last_post = posts[-1][1] + assert last_post.get("choice") == "timeout", ( + f"hook choice should be 'timeout' on no-response, got {last_post.get('choice')!r}" + ) diff --git a/tools/approval.py b/tools/approval.py index bfc70cd0fb0..399b9d6c2d2 100644 --- a/tools/approval.py +++ b/tools/approval.py @@ -1299,12 +1299,34 @@ def check_all_command_guards(command: str, env_type: str, ) if not resolved or choice is None or choice == "deny": - reason = "timed out" if not resolved else "denied by user" + # Consent contract: silence is NOT consent, and an explicit + # deny is also a hard halt — both produce a BLOCKED outcome + # that names the agent's most common evasion paths (retry, + # rephrase, achieve the same outcome via a different command). + # See issue #24912 for the original incident. + if not resolved: + reason = "timed out without user response" + timeout_addendum = " Silence is not consent." + outcome = "timeout" + else: + reason = "denied by user" + timeout_addendum = "" + outcome = "denied" return { "approved": False, - "message": f"BLOCKED: Command {reason}. Do NOT retry this command.", + "message": ( + f"BLOCKED: Command {reason}. The user has NOT consented " + f"to this action. Do NOT retry this command, do NOT " + f"rephrase it, and do NOT attempt the same outcome via " + f"a different command. Stop the current workflow and " + f"wait for the user to respond before taking any " + f"further destructive or irreversible action." + f"{timeout_addendum}" + ), "pattern_key": primary_key, "description": combined_desc, + "outcome": outcome, + "user_consent": False, } # User approved — persist based on scope (same logic as CLI) @@ -1369,9 +1391,18 @@ def check_all_command_guards(command: str, env_type: str, if choice == "deny": return { "approved": False, - "message": "BLOCKED: User denied. Do NOT retry.", + "message": ( + "BLOCKED: User denied this command. The user has NOT consented " + "to this action. Do NOT retry this command, do NOT rephrase " + "it, and do NOT attempt the same outcome via a different " + "command. Stop the current workflow and wait for the user " + "to respond before taking any further destructive or " + "irreversible action." + ), "pattern_key": primary_key, "description": combined_desc, + "outcome": "denied", + "user_consent": False, } # Persist approval for each warning individually