mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-30 01:41:43 +00:00
Narrow plaintext shortcut that rewrites a tiny set of admin phrases
("restart gateway", "restart the gateway", "restart hermes") into the
/restart slash command, but only in DMs. Scope is intentionally tight:
- DM text messages only — group chats keep natural-language semantics
- Exact restart-style phrases only
- Skips anything already starting with "/"
Without this, the LLM can receive "restart gateway" as a user turn and
try to satisfy it via the terminal tool (systemctl restart ...). That
kills the gateway while the originating agent is still running, which
leaves systemd in "draining" state waiting on a process it's about to
kill. Routing the phrase to the slash-command dispatcher bypasses the
agent loop and uses the existing restart machinery (request_restart).
Called once, at the adapter level in BasePlatformAdapter.handle_message,
so every platform gets it for free and pending-message reinjection is
covered by the same call site.
Adds 2 Telegram-parametrized e2e tests: DM routes to request_restart,
group chats fall through to the normal agent path.
214 lines
9.2 KiB
Python
214 lines
9.2 KiB
Python
"""E2E tests for gateway slash commands (Telegram, Discord).
|
|
|
|
Each test drives a message through the full async pipeline:
|
|
adapter.handle_message(event)
|
|
→ BasePlatformAdapter._process_message_background()
|
|
→ GatewayRunner._handle_message() (command dispatch)
|
|
→ adapter.send() (captured for assertions)
|
|
|
|
No LLM involved — only gateway-level commands are tested.
|
|
Tests are parametrized over platforms via the ``platform`` fixture in conftest.
|
|
"""
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from gateway.config import Platform
|
|
from gateway.platforms.base import SendResult
|
|
from tests.e2e.conftest import make_event, send_and_capture
|
|
|
|
|
|
class TestSlashCommands:
|
|
"""Gateway slash commands dispatched through the full adapter pipeline."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_help_returns_command_list(self, adapter, platform):
|
|
send = await send_and_capture(adapter, "/help", platform)
|
|
|
|
send.assert_called_once()
|
|
response_text = send.call_args[1].get("content") or send.call_args[0][1]
|
|
assert "/new" in response_text
|
|
assert "/status" in response_text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_shows_session_info(self, adapter, platform):
|
|
send = await send_and_capture(adapter, "/status", platform)
|
|
|
|
send.assert_called_once()
|
|
response_text = send.call_args[1].get("content") or send.call_args[0][1]
|
|
assert "session" in response_text.lower() or "Session" in response_text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_new_resets_session(self, adapter, runner, platform):
|
|
send = await send_and_capture(adapter, "/new", platform)
|
|
|
|
send.assert_called_once()
|
|
runner.session_store.reset_session.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_when_no_agent_running(self, adapter, platform):
|
|
send = await send_and_capture(adapter, "/stop", platform)
|
|
|
|
send.assert_called_once()
|
|
response_text = send.call_args[1].get("content") or send.call_args[0][1]
|
|
response_lower = response_text.lower()
|
|
assert "no" in response_lower or "stop" in response_lower or "not running" in response_lower
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_commands_shows_listing(self, adapter, platform):
|
|
send = await send_and_capture(adapter, "/commands", platform)
|
|
|
|
send.assert_called_once()
|
|
response_text = send.call_args[1].get("content") or send.call_args[0][1]
|
|
# Should list at least some commands
|
|
assert "/" in response_text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sequential_commands_share_session(self, adapter, platform):
|
|
"""Two commands from the same chat_id should both succeed."""
|
|
send_help = await send_and_capture(adapter, "/help", platform)
|
|
send_help.assert_called_once()
|
|
|
|
send_status = await send_and_capture(adapter, "/status", platform)
|
|
send_status.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_verbose_responds(self, adapter, platform):
|
|
send = await send_and_capture(adapter, "/verbose", platform)
|
|
|
|
send.assert_called_once()
|
|
response_text = send.call_args[1].get("content") or send.call_args[0][1]
|
|
# Either shows the mode cycle or tells user to enable it in config
|
|
assert "verbose" in response_text.lower() or "tool_progress" in response_text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plaintext_restart_gateway_routes_to_safe_restart_command(self, adapter, runner, platform, monkeypatch):
|
|
if platform != Platform.TELEGRAM:
|
|
pytest.skip("Plaintext restart shortcut is intentionally DM/Telegram-focused")
|
|
|
|
monkeypatch.setenv("INVOCATION_ID", "e2e-systemd")
|
|
runner.request_restart = MagicMock(return_value=True)
|
|
|
|
send = await send_and_capture(adapter, "restart gateway", platform)
|
|
|
|
send.assert_called_once()
|
|
response_text = send.call_args[1].get("content") or send.call_args[0][1]
|
|
assert "restart" in response_text.lower() or "draining" in response_text.lower()
|
|
runner.request_restart.assert_called_once_with(detached=False, via_service=True)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plaintext_restart_gateway_in_group_stays_plain_text(self, adapter, runner, platform, monkeypatch):
|
|
if platform != Platform.TELEGRAM:
|
|
pytest.skip("Shortcut scope is only verified for Telegram here")
|
|
|
|
monkeypatch.setenv("INVOCATION_ID", "e2e-systemd")
|
|
runner.request_restart = MagicMock(return_value=True)
|
|
runner._handle_message_with_agent = AsyncMock(return_value="agent-handled")
|
|
|
|
send = await send_and_capture(adapter, "restart gateway", platform, chat_id="group-chat-1", user_id="u1", chat_type="group")
|
|
|
|
send.assert_called_once()
|
|
response_text = send.call_args[1].get("content") or send.call_args[0][1]
|
|
assert response_text == "agent-handled"
|
|
runner.request_restart.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_personality_lists_options(self, adapter, platform):
|
|
send = await send_and_capture(adapter, "/personality", platform)
|
|
|
|
send.assert_called_once()
|
|
response_text = send.call_args[1].get("content") or send.call_args[0][1]
|
|
assert "personalit" in response_text.lower() # matches "personality" or "personalities"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_yolo_toggles_mode(self, adapter, platform):
|
|
send = await send_and_capture(adapter, "/yolo", platform)
|
|
|
|
send.assert_called_once()
|
|
response_text = send.call_args[1].get("content") or send.call_args[0][1]
|
|
assert "yolo" in response_text.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_compress_command(self, adapter, platform):
|
|
send = await send_and_capture(adapter, "/compress", platform)
|
|
|
|
send.assert_called_once()
|
|
response_text = send.call_args[1].get("content") or send.call_args[0][1]
|
|
assert "compress" in response_text.lower() or "context" in response_text.lower()
|
|
|
|
|
|
class TestSessionLifecycle:
|
|
"""Verify session state changes across command sequences."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_new_then_status_reflects_reset(self, adapter, runner, session_entry, platform):
|
|
"""After /new, /status should report the fresh session."""
|
|
await send_and_capture(adapter, "/new", platform)
|
|
runner.session_store.reset_session.assert_called_once()
|
|
|
|
send = await send_and_capture(adapter, "/status", platform)
|
|
send.assert_called_once()
|
|
response_text = send.call_args[1].get("content") or send.call_args[0][1]
|
|
# Session ID from the entry should appear in the status output
|
|
assert session_entry.session_id[:8] in response_text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_new_is_idempotent(self, adapter, runner, platform):
|
|
"""/new called twice should not crash."""
|
|
await send_and_capture(adapter, "/new", platform)
|
|
await send_and_capture(adapter, "/new", platform)
|
|
assert runner.session_store.reset_session.call_count == 2
|
|
|
|
|
|
class TestAuthorization:
|
|
"""Verify the pipeline handles unauthorized users."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unauthorized_user_gets_pairing_response(self, adapter, runner, platform):
|
|
"""Unauthorized DM should trigger pairing code, not a command response."""
|
|
runner._is_user_authorized = lambda _source: False
|
|
|
|
event = make_event(platform, "/help")
|
|
adapter.send.reset_mock()
|
|
await adapter.handle_message(event)
|
|
await asyncio.sleep(0.3)
|
|
|
|
# The adapter.send is called directly by the authorization path
|
|
# (not via _send_with_retry), so check it was called with a pairing message
|
|
adapter.send.assert_called()
|
|
response_text = adapter.send.call_args[0][1] if len(adapter.send.call_args[0]) > 1 else ""
|
|
assert "recognize" in response_text.lower() or "pair" in response_text.lower() or "ABC123" in response_text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unauthorized_user_does_not_get_help(self, adapter, runner, platform):
|
|
"""Unauthorized user should NOT see the help command output."""
|
|
runner._is_user_authorized = lambda _source: False
|
|
|
|
event = make_event(platform, "/help")
|
|
adapter.send.reset_mock()
|
|
await adapter.handle_message(event)
|
|
await asyncio.sleep(0.3)
|
|
|
|
# If send was called, it should NOT contain the help text
|
|
if adapter.send.called:
|
|
response_text = adapter.send.call_args[0][1] if len(adapter.send.call_args[0]) > 1 else ""
|
|
assert "/new" not in response_text
|
|
|
|
|
|
class TestSendFailureResilience:
|
|
"""Verify the pipeline handles send failures gracefully."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_failure_does_not_crash_pipeline(self, adapter, platform):
|
|
"""If send() returns failure, the pipeline should not raise."""
|
|
adapter.send = AsyncMock(return_value=SendResult(success=False, error="network timeout"))
|
|
adapter.set_message_handler(adapter._message_handler) # re-wire with same handler
|
|
|
|
event = make_event(platform, "/help")
|
|
# Should not raise — pipeline handles send failures internally
|
|
await adapter.handle_message(event)
|
|
await asyncio.sleep(0.3)
|
|
|
|
adapter.send.assert_called()
|