mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): /stop and /new bypass Level 1 active-session guard (#5765)
* fix(gateway): /stop and /new bypass Level 1 active-session guard The base adapter's Level 1 guard intercepted ALL messages while an agent was running, including /stop and /new. These commands were queued as pending messages instead of being dispatched to the gateway runner's Level 2 handler. When the agent eventually stopped (via the interrupt mechanism), the command text leaked into the conversation as a user message — the model would receive '/stop' as input and respond to it. Fix: Add /stop, /new, and /reset to the bypass set in base.py alongside /approve, /deny, and /status. Consolidate the three separate bypass blocks into one. Commands in the bypass set are dispatched inline to the gateway runner, where Level 2 handles them correctly (hard-kill for /stop, session reset for /new). Also add a safety net in _run_agent's pending-message processing: if the pending text resolves to a known slash command, discard it instead of passing it to the agent. This catches edge cases where command text leaks through the interrupt_message fallback. Refs: #5244 * test: regression tests for command bypass of active-session guard 17 tests covering: - /stop, /new, /reset bypass the Level 1 guard when agent is running - /approve, /deny, /status bypass (existing behavior, now tested) - Regular text and unknown commands still queued (not bypassed) - File paths like '/path/to/file' not treated as commands - Telegram @botname suffix handled correctly - Safety net command resolution (resolve_command detects known commands)
This commit is contained in:
parent
9e844160f9
commit
eb7c408445
3 changed files with 347 additions and 31 deletions
|
|
@ -1103,16 +1103,20 @@ class BasePlatformAdapter(ABC):
|
|||
|
||||
# Check if there's already an active handler for this session
|
||||
if session_key in self._active_sessions:
|
||||
# /approve and /deny must bypass the active-session guard.
|
||||
# The agent thread is blocked on threading.Event.wait() inside
|
||||
# tools/approval.py — queuing these commands creates a deadlock:
|
||||
# the agent waits for approval, approval waits for agent to finish.
|
||||
# Dispatch directly to the message handler without touching session
|
||||
# lifecycle (no competing background task, no session guard removal).
|
||||
# Certain commands must bypass the active-session guard and be
|
||||
# dispatched directly to the gateway runner. Without this, they
|
||||
# are queued as pending messages and either:
|
||||
# - leak into the conversation as user text (/stop, /new), or
|
||||
# - deadlock (/approve, /deny — agent is blocked on Event.wait)
|
||||
#
|
||||
# Dispatch inline: call the message handler directly and send the
|
||||
# response. Do NOT use _process_message_background — it manages
|
||||
# session lifecycle and its cleanup races with the running task
|
||||
# (see PR #4926).
|
||||
cmd = event.get_command()
|
||||
if cmd in ("approve", "deny"):
|
||||
if cmd in ("approve", "deny", "status", "stop", "new", "reset"):
|
||||
logger.debug(
|
||||
"[%s] Approval command '/%s' bypassing active-session guard for %s",
|
||||
"[%s] Command '/%s' bypassing active-session guard for %s",
|
||||
self.name, cmd, session_key,
|
||||
)
|
||||
try:
|
||||
|
|
@ -1126,29 +1130,7 @@ class BasePlatformAdapter(ABC):
|
|||
metadata=_thread_meta,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("[%s] Approval dispatch failed: %s", self.name, e, exc_info=True)
|
||||
return
|
||||
|
||||
# /status must also bypass the active-session guard so it always
|
||||
# returns a system-generated response instead of being queued as
|
||||
# user text and passed to the agent (#5046).
|
||||
if cmd == "status":
|
||||
logger.debug(
|
||||
"[%s] Status command bypassing active-session guard for %s",
|
||||
self.name, session_key,
|
||||
)
|
||||
try:
|
||||
_thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None
|
||||
response = await self._message_handler(event)
|
||||
if response:
|
||||
await self._send_with_retry(
|
||||
chat_id=event.source.chat_id,
|
||||
content=response,
|
||||
reply_to=event.message_id,
|
||||
metadata=_thread_meta,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("[%s] Status dispatch failed: %s", self.name, e, exc_info=True)
|
||||
logger.error("[%s] Command '/%s' dispatch failed: %s", self.name, cmd, e, exc_info=True)
|
||||
return
|
||||
|
||||
# Special case: photo bursts/albums frequently arrive as multiple near-
|
||||
|
|
|
|||
|
|
@ -7131,6 +7131,27 @@ class GatewayRunner:
|
|||
if pending:
|
||||
logger.debug("Processing queued message after agent completion: '%s...'", pending[:40])
|
||||
|
||||
# Safety net: if the pending text is a slash command (e.g. "/stop",
|
||||
# "/new"), discard it — commands should never be passed to the agent
|
||||
# as user input. The primary fix is in base.py (commands bypass the
|
||||
# active-session guard), but this catches edge cases where command
|
||||
# text leaks through the interrupt_message fallback.
|
||||
if pending and pending.strip().startswith("/"):
|
||||
_pending_parts = pending.strip().split(None, 1)
|
||||
_pending_cmd_word = _pending_parts[0][1:].lower() if _pending_parts else ""
|
||||
if _pending_cmd_word:
|
||||
try:
|
||||
from hermes_cli.commands import resolve_command as _rc_pending
|
||||
if _rc_pending(_pending_cmd_word):
|
||||
logger.info(
|
||||
"Discarding command '/%s' from pending queue — "
|
||||
"commands must not be passed as agent input",
|
||||
_pending_cmd_word,
|
||||
)
|
||||
pending = None
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if pending:
|
||||
logger.debug("Processing pending message: '%s...'", pending[:40])
|
||||
|
||||
|
|
|
|||
313
tests/gateway/test_command_bypass_active_session.py
Normal file
313
tests/gateway/test_command_bypass_active_session.py
Normal file
|
|
@ -0,0 +1,313 @@
|
|||
"""Regression tests: slash commands must bypass the base adapter's active-session guard.
|
||||
|
||||
When an agent is running, the base adapter's Level 1 guard in
|
||||
handle_message() intercepts all incoming messages and queues them as
|
||||
pending. Certain commands (/stop, /new, /reset, /approve, /deny,
|
||||
/status) must bypass this guard and be dispatched directly to the gateway
|
||||
runner — otherwise they are queued as user text and either:
|
||||
- leak into the conversation as agent input (/stop, /new), or
|
||||
- deadlock (/approve, /deny — agent blocks on Event.wait)
|
||||
|
||||
These tests verify that the bypass works at the adapter level and that
|
||||
the safety net in _run_agent discards leaked command text.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType
|
||||
from gateway.session import SessionSource, build_session_key
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _StubAdapter(BasePlatformAdapter):
|
||||
"""Concrete adapter with abstract methods stubbed out."""
|
||||
|
||||
async def connect(self):
|
||||
pass
|
||||
|
||||
async def disconnect(self):
|
||||
pass
|
||||
|
||||
async def send(self, chat_id, text, **kwargs):
|
||||
pass
|
||||
|
||||
async def get_chat_info(self, chat_id):
|
||||
return {}
|
||||
|
||||
|
||||
def _make_adapter():
|
||||
"""Create a minimal adapter for testing the active-session guard."""
|
||||
config = PlatformConfig(enabled=True, token="test-token")
|
||||
adapter = _StubAdapter(config, Platform.TELEGRAM)
|
||||
adapter.sent_responses = []
|
||||
|
||||
async def _mock_handler(event):
|
||||
cmd = event.get_command()
|
||||
return f"handled:{cmd}" if cmd else f"handled:text:{event.text}"
|
||||
|
||||
adapter._message_handler = _mock_handler
|
||||
|
||||
async def _mock_send_retry(chat_id, content, **kwargs):
|
||||
adapter.sent_responses.append(content)
|
||||
|
||||
adapter._send_with_retry = _mock_send_retry
|
||||
return adapter
|
||||
|
||||
|
||||
def _make_event(text="/stop", chat_id="12345"):
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"
|
||||
)
|
||||
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
|
||||
|
||||
|
||||
def _session_key(chat_id="12345"):
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"
|
||||
)
|
||||
return build_session_key(source)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: commands bypass Level 1 when session is active
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCommandBypassActiveSession:
|
||||
"""Commands that must bypass the active-session guard."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_bypasses_guard(self):
|
||||
"""/stop must be dispatched directly, not queued."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
adapter._active_sessions[sk] = asyncio.Event()
|
||||
|
||||
await adapter.handle_message(_make_event("/stop"))
|
||||
|
||||
assert sk not in adapter._pending_messages, (
|
||||
"/stop was queued as a pending message instead of being dispatched"
|
||||
)
|
||||
assert any("handled:stop" in r for r in adapter.sent_responses), (
|
||||
"/stop response was not sent back to the user"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_new_bypasses_guard(self):
|
||||
"""/new must be dispatched directly, not queued."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
adapter._active_sessions[sk] = asyncio.Event()
|
||||
|
||||
await adapter.handle_message(_make_event("/new"))
|
||||
|
||||
assert sk not in adapter._pending_messages
|
||||
assert any("handled:new" in r for r in adapter.sent_responses)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reset_bypasses_guard(self):
|
||||
"""/reset (alias for /new) must be dispatched directly."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
adapter._active_sessions[sk] = asyncio.Event()
|
||||
|
||||
await adapter.handle_message(_make_event("/reset"))
|
||||
|
||||
assert sk not in adapter._pending_messages
|
||||
assert any("handled:reset" in r for r in adapter.sent_responses)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_approve_bypasses_guard(self):
|
||||
"""/approve must bypass (deadlock prevention)."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
adapter._active_sessions[sk] = asyncio.Event()
|
||||
|
||||
await adapter.handle_message(_make_event("/approve"))
|
||||
|
||||
assert sk not in adapter._pending_messages
|
||||
assert any("handled:approve" in r for r in adapter.sent_responses)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deny_bypasses_guard(self):
|
||||
"""/deny must bypass (deadlock prevention)."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
adapter._active_sessions[sk] = asyncio.Event()
|
||||
|
||||
await adapter.handle_message(_make_event("/deny"))
|
||||
|
||||
assert sk not in adapter._pending_messages
|
||||
assert any("handled:deny" in r for r in adapter.sent_responses)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_bypasses_guard(self):
|
||||
"""/status must bypass so it returns a system response."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
adapter._active_sessions[sk] = asyncio.Event()
|
||||
|
||||
await adapter.handle_message(_make_event("/status"))
|
||||
|
||||
assert sk not in adapter._pending_messages
|
||||
assert any("handled:status" in r for r in adapter.sent_responses)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: non-bypass messages still get queued
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestNonBypassStillQueued:
|
||||
"""Regular messages and unknown commands must be queued, not dispatched."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_regular_text_queued(self):
|
||||
"""Plain text while agent is running must be queued as pending."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
adapter._active_sessions[sk] = asyncio.Event()
|
||||
|
||||
await adapter.handle_message(_make_event("hello world"))
|
||||
|
||||
assert sk in adapter._pending_messages, (
|
||||
"Regular text was not queued — it should be pending"
|
||||
)
|
||||
assert len(adapter.sent_responses) == 0, (
|
||||
"Regular text should not produce a direct response"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unknown_command_queued(self):
|
||||
"""Unknown /commands must be queued, not dispatched."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
adapter._active_sessions[sk] = asyncio.Event()
|
||||
|
||||
await adapter.handle_message(_make_event("/foobar"))
|
||||
|
||||
assert sk in adapter._pending_messages
|
||||
assert len(adapter.sent_responses) == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_path_not_treated_as_command(self):
|
||||
"""A message like '/path/to/file' must not bypass the guard."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
adapter._active_sessions[sk] = asyncio.Event()
|
||||
|
||||
await adapter.handle_message(_make_event("/path/to/file.py"))
|
||||
|
||||
assert sk in adapter._pending_messages
|
||||
assert len(adapter.sent_responses) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: no active session — commands go through normally
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestNoActiveSessionNormalDispatch:
|
||||
"""When no agent is running, messages spawn a background task normally."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_when_no_session_active(self):
|
||||
"""/stop without an active session spawns a background task
|
||||
(the Level 2 handler will return 'No active task')."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
|
||||
# No active session — _active_sessions is empty
|
||||
assert sk not in adapter._active_sessions
|
||||
|
||||
await adapter.handle_message(_make_event("/stop"))
|
||||
|
||||
# Should have gone through the normal path (background task spawned)
|
||||
# and NOT be in _pending_messages (that's the queued-during-active path)
|
||||
assert sk not in adapter._pending_messages
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: safety net in _run_agent discards command text from pending queue
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPendingCommandSafetyNet:
|
||||
"""The safety net in gateway/run.py _run_agent must discard command text
|
||||
that leaks into the pending queue via interrupt_message fallback."""
|
||||
|
||||
def test_stop_command_detected(self):
|
||||
"""resolve_command must recognize /stop so the safety net can
|
||||
discard it."""
|
||||
from hermes_cli.commands import resolve_command
|
||||
|
||||
assert resolve_command("stop") is not None
|
||||
assert resolve_command("stop").name == "stop"
|
||||
|
||||
def test_new_command_detected(self):
|
||||
from hermes_cli.commands import resolve_command
|
||||
|
||||
assert resolve_command("new") is not None
|
||||
assert resolve_command("new").name == "new"
|
||||
|
||||
def test_reset_alias_detected(self):
|
||||
from hermes_cli.commands import resolve_command
|
||||
|
||||
assert resolve_command("reset") is not None
|
||||
assert resolve_command("reset").name == "new" # alias
|
||||
|
||||
def test_unknown_command_not_detected(self):
|
||||
from hermes_cli.commands import resolve_command
|
||||
|
||||
assert resolve_command("foobar") is None
|
||||
|
||||
def test_file_path_not_detected_as_command(self):
|
||||
"""'/path/to/file' should not resolve as a command."""
|
||||
from hermes_cli.commands import resolve_command
|
||||
|
||||
# The safety net splits on whitespace and takes the first word
|
||||
# after stripping '/'. For '/path/to/file', that's 'path/to/file'.
|
||||
assert resolve_command("path/to/file") is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: bypass with @botname suffix (Telegram-style)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBypassWithBotnameSuffix:
|
||||
"""Telegram appends @botname to commands. The bypass must still work."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_with_botname(self):
|
||||
"""/stop@MyHermesBot must bypass the guard."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
adapter._active_sessions[sk] = asyncio.Event()
|
||||
|
||||
await adapter.handle_message(_make_event("/stop@MyHermesBot"))
|
||||
|
||||
assert sk not in adapter._pending_messages, (
|
||||
"/stop@MyHermesBot was queued instead of bypassing"
|
||||
)
|
||||
assert any("handled:stop" in r for r in adapter.sent_responses)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_new_with_botname(self):
|
||||
"""/new@MyHermesBot must bypass the guard."""
|
||||
adapter = _make_adapter()
|
||||
sk = _session_key()
|
||||
adapter._active_sessions[sk] = asyncio.Event()
|
||||
|
||||
await adapter.handle_message(_make_event("/new@MyHermesBot"))
|
||||
|
||||
assert sk not in adapter._pending_messages
|
||||
assert any("handled:new" in r for r in adapter.sent_responses)
|
||||
Loading…
Add table
Add a link
Reference in a new issue