"""Tests for /approve and /deny gateway commands. Verifies that dangerous command approvals require explicit /approve or /deny slash commands, not bare "yes"/"no" text matching. """ import asyncio import time from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch import pytest from gateway.config import GatewayConfig, Platform, PlatformConfig from gateway.platforms.base import MessageEvent from gateway.session import SessionEntry, SessionSource, build_session_key def _make_source() -> SessionSource: return SessionSource( platform=Platform.TELEGRAM, user_id="u1", chat_id="c1", user_name="tester", chat_type="dm", ) def _make_event(text: str) -> MessageEvent: return MessageEvent( text=text, source=_make_source(), message_id="m1", ) def _make_runner(): from gateway.run import GatewayRunner runner = object.__new__(GatewayRunner) runner.config = GatewayConfig( platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")} ) adapter = MagicMock() adapter.send = AsyncMock() runner.adapters = {Platform.TELEGRAM: adapter} runner._voice_mode = {} runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False) runner.session_store = MagicMock() runner._running_agents = {} runner._pending_messages = {} runner._pending_approvals = {} runner._background_tasks = set() runner._session_db = None runner._reasoning_config = None runner._provider_routing = {} runner._fallback_model = None runner._show_reasoning = False runner._is_user_authorized = lambda _source: True runner._set_session_env = lambda _context: None return runner def _make_pending_approval(command="sudo rm -rf /tmp/test", pattern_key="sudo"): return { "command": command, "pattern_key": pattern_key, "pattern_keys": [pattern_key], "description": "sudo command", "timestamp": time.time(), } # ------------------------------------------------------------------ # /approve command # ------------------------------------------------------------------ class TestApproveCommand: @pytest.mark.asyncio async def test_approve_executes_pending_command(self): """Basic /approve executes the pending command and sends feedback.""" runner = _make_runner() source = _make_source() session_key = runner._session_key_for_source(source) runner._pending_approvals[session_key] = _make_pending_approval() event = _make_event("/approve") with ( patch("tools.terminal_tool.terminal_tool", return_value="done") as mock_term, patch.object(runner, "_handle_message", new_callable=AsyncMock, return_value="agent continued"), ): result = await runner._handle_approve_command(event) # Yield to let the background continuation task run. # This works because mocks return immediately (no real await points). await asyncio.sleep(0) # Returns None because feedback is sent directly via adapter assert result is None mock_term.assert_called_once_with(command="sudo rm -rf /tmp/test", force=True) assert session_key not in runner._pending_approvals # Immediate feedback sent via adapter adapter = runner.adapters[Platform.TELEGRAM] sent_text = adapter.send.call_args_list[0][0][1] assert "Command approved and executed" in sent_text @pytest.mark.asyncio async def test_approve_session_remembers_pattern(self): """/approve session approves the pattern for the session.""" runner = _make_runner() source = _make_source() session_key = runner._session_key_for_source(source) runner._pending_approvals[session_key] = _make_pending_approval() event = _make_event("/approve session") with ( patch("tools.terminal_tool.terminal_tool", return_value="done"), patch("tools.approval.approve_session") as mock_session, patch.object(runner, "_handle_message", new_callable=AsyncMock, return_value=None), ): result = await runner._handle_approve_command(event) # Yield to let the background continuation task run. # This works because mocks return immediately (no real await points). await asyncio.sleep(0) assert result is None mock_session.assert_called_once_with(session_key, "sudo") # Verify scope message in adapter feedback adapter = runner.adapters[Platform.TELEGRAM] sent_text = adapter.send.call_args_list[0][0][1] assert "pattern approved for this session" in sent_text @pytest.mark.asyncio async def test_approve_always_approves_permanently(self): """/approve always approves the pattern permanently.""" runner = _make_runner() source = _make_source() session_key = runner._session_key_for_source(source) runner._pending_approvals[session_key] = _make_pending_approval() event = _make_event("/approve always") with ( patch("tools.terminal_tool.terminal_tool", return_value="done"), patch("tools.approval.approve_permanent") as mock_perm, patch.object(runner, "_handle_message", new_callable=AsyncMock, return_value=None), ): result = await runner._handle_approve_command(event) # Yield to let the background continuation task run. # This works because mocks return immediately (no real await points). await asyncio.sleep(0) assert result is None mock_perm.assert_called_once_with("sudo") # Verify scope message in adapter feedback adapter = runner.adapters[Platform.TELEGRAM] sent_text = adapter.send.call_args_list[0][0][1] assert "pattern approved permanently" in sent_text @pytest.mark.asyncio async def test_approve_no_pending(self): """/approve with no pending approval returns helpful message.""" runner = _make_runner() event = _make_event("/approve") result = await runner._handle_approve_command(event) assert "No pending command" in result @pytest.mark.asyncio async def test_approve_expired(self): """/approve on a timed-out approval rejects it.""" runner = _make_runner() source = _make_source() session_key = runner._session_key_for_source(source) approval = _make_pending_approval() approval["timestamp"] = time.time() - 600 # 10 minutes ago runner._pending_approvals[session_key] = approval event = _make_event("/approve") result = await runner._handle_approve_command(event) assert "expired" in result assert session_key not in runner._pending_approvals @pytest.mark.asyncio async def test_approve_reinvokes_agent_with_result(self): """After executing, /approve re-invokes the agent with command output.""" runner = _make_runner() source = _make_source() session_key = runner._session_key_for_source(source) runner._pending_approvals[session_key] = _make_pending_approval() event = _make_event("/approve") mock_handle = AsyncMock(return_value="I continued the task.") with ( patch("tools.terminal_tool.terminal_tool", return_value="file deleted"), patch.object(runner, "_handle_message", mock_handle), ): await runner._handle_approve_command(event) # Yield to let the background continuation task run. # This works because mocks return immediately (no real await points). await asyncio.sleep(0) # Agent was re-invoked via _handle_message with a synthetic event mock_handle.assert_called_once() synthetic_event = mock_handle.call_args[0][0] assert "approved" in synthetic_event.text.lower() assert "file deleted" in synthetic_event.text assert "sudo rm -rf /tmp/test" in synthetic_event.text # The continuation response was sent to the user adapter = runner.adapters[Platform.TELEGRAM] # First call: immediate feedback, second call: agent continuation assert adapter.send.call_count == 2 continuation_response = adapter.send.call_args_list[1][0][1] assert continuation_response == "I continued the task." # ------------------------------------------------------------------ # /deny command # ------------------------------------------------------------------ class TestDenyCommand: @pytest.mark.asyncio async def test_deny_clears_pending(self): """/deny clears the pending approval.""" runner = _make_runner() source = _make_source() session_key = runner._session_key_for_source(source) runner._pending_approvals[session_key] = _make_pending_approval() event = _make_event("/deny") result = await runner._handle_deny_command(event) assert "❌ Command denied" in result assert session_key not in runner._pending_approvals @pytest.mark.asyncio async def test_deny_no_pending(self): """/deny with no pending approval returns helpful message.""" runner = _make_runner() event = _make_event("/deny") result = await runner._handle_deny_command(event) assert "No pending command" in result # ------------------------------------------------------------------ # Bare "yes" must NOT trigger approval # ------------------------------------------------------------------ class TestBareTextNoLongerApproves: @pytest.mark.asyncio async def test_yes_does_not_execute_pending_command(self): """Saying 'yes' in normal conversation must not execute a pending command. This is the core bug from issue #1888: bare text matching against 'yes'/'no' could intercept unrelated user messages. """ runner = _make_runner() source = _make_source() session_key = runner._session_key_for_source(source) runner._pending_approvals[session_key] = _make_pending_approval() # Simulate the user saying "yes" as a normal message. # The old code would have executed the pending command. # Now it should fall through to normal processing (agent handles it). event = _make_event("yes") # The approval should still be pending — "yes" is not /approve # We can't easily run _handle_message end-to-end, but we CAN verify # the old text-matching block no longer exists by confirming the # approval is untouched after the command dispatch section. # The key assertion is that _pending_approvals is NOT consumed. assert session_key in runner._pending_approvals # ------------------------------------------------------------------ # Approval hint appended to response # ------------------------------------------------------------------ class TestApprovalHint: def test_approval_hint_appended_to_response(self): """When a pending approval is collected, structured instructions should be appended to the agent response.""" # This tests the approval collection logic at the end of _handle_message. # We verify the hint format directly. cmd = "sudo rm -rf /tmp/dangerous" cmd_preview = cmd hint = ( f"\n\n⚠️ **Dangerous command requires approval:**\n" f"```\n{cmd_preview}\n```\n" f"Reply `/approve` to execute, `/approve session` to approve this pattern " f"for the session, or `/deny` to cancel." ) assert "/approve" in hint assert "/deny" in hint assert cmd in hint