"""Tests for gateway /status behavior and token persistence.""" from datetime import datetime import time from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock import pytest from gateway.config import GatewayConfig, Platform, PlatformConfig from gateway.platforms.base import MessageEvent from gateway.session import SessionEntry, SessionSource, build_session_key def _make_source() -> SessionSource: return SessionSource( platform=Platform.TELEGRAM, user_id="u1", chat_id="c1", user_name="tester", chat_type="dm", ) def _make_event(text: str) -> MessageEvent: return MessageEvent( text=text, source=_make_source(), message_id="m1", ) def _make_runner(session_entry: SessionEntry): from gateway.run import GatewayRunner runner = object.__new__(GatewayRunner) runner.config = GatewayConfig( platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")} ) adapter = MagicMock() adapter.send = AsyncMock() runner.adapters = {Platform.TELEGRAM: adapter} runner._voice_mode = {} runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False) runner.session_store = MagicMock() runner.session_store.get_or_create_session.return_value = session_entry runner.session_store.load_transcript.return_value = [] runner.session_store.has_any_sessions.return_value = True runner.session_store.append_to_transcript = MagicMock() runner.session_store.rewrite_transcript = MagicMock() runner.session_store.update_session = MagicMock() runner._running_agents = {} runner._session_run_generation = {} runner._pending_messages = {} runner._pending_approvals = {} runner._session_db = MagicMock() runner._session_db.get_session_title.return_value = None runner._reasoning_config = None runner._provider_routing = {} runner._fallback_model = None runner._show_reasoning = False runner._is_user_authorized = lambda _source: True runner._set_session_env = lambda _context: None runner._should_send_voice_reply = lambda *_args, **_kwargs: False runner._send_voice_reply = AsyncMock() runner._capture_gateway_honcho_if_configured = lambda *args, **kwargs: None runner._emit_gateway_run_progress = AsyncMock() return runner @pytest.mark.asyncio async def test_status_command_reports_running_agent_without_interrupt(monkeypatch): session_entry = SessionEntry( session_key=build_session_key(_make_source()), session_id="sess-1", created_at=datetime.now(), updated_at=datetime.now(), platform=Platform.TELEGRAM, chat_type="dm", total_tokens=321, ) runner = _make_runner(session_entry) running_agent = MagicMock() runner._running_agents[build_session_key(_make_source())] = running_agent result = await runner._handle_message(_make_event("/status")) assert "**Session ID:** `sess-1`" in result assert "**Tokens:** 321" in result assert "**Agent Running:** Yes ⚡" in result assert "**Title:**" not in result running_agent.interrupt.assert_not_called() assert runner._pending_messages == {} @pytest.mark.asyncio async def test_status_command_includes_session_title_when_present(): session_entry = SessionEntry( session_key=build_session_key(_make_source()), session_id="sess-1", created_at=datetime.now(), updated_at=datetime.now(), platform=Platform.TELEGRAM, chat_type="dm", total_tokens=321, ) runner = _make_runner(session_entry) runner._session_db.get_session_title.return_value = "My titled session" result = await runner._handle_message(_make_event("/status")) assert "**Session ID:** `sess-1`" in result assert "**Title:** My titled session" in result @pytest.mark.asyncio async def test_agents_command_reports_active_agents_and_processes(monkeypatch): session_key = build_session_key(_make_source()) session_entry = SessionEntry( session_key=session_key, session_id="sess-1", created_at=datetime.now(), updated_at=datetime.now(), platform=Platform.TELEGRAM, chat_type="dm", total_tokens=0, ) runner = _make_runner(session_entry) running_agent = SimpleNamespace( session_id="sess-running", model="openrouter/test-model", interrupt=MagicMock(), get_activity_summary=lambda: {"seconds_since_activity": 0}, ) runner._running_agents[session_key] = running_agent runner._running_agents_ts = {session_key: time.time() - 8} runner._background_tasks = set() class _FakeRegistry: def list_sessions(self): return [ { "session_id": "proc-1", "status": "running", "uptime_seconds": 17, "command": "sleep 30", } ] monkeypatch.setattr("tools.process_registry.process_registry", _FakeRegistry()) result = await runner._handle_message(_make_event("/agents")) assert "**Active agents:** 1" in result assert "**Running background processes:** 1" in result assert "proc-1" in result running_agent.interrupt.assert_not_called() @pytest.mark.asyncio async def test_tasks_alias_routes_to_agents_command(monkeypatch): session_entry = SessionEntry( session_key=build_session_key(_make_source()), session_id="sess-1", created_at=datetime.now(), updated_at=datetime.now(), platform=Platform.TELEGRAM, chat_type="dm", total_tokens=0, ) runner = _make_runner(session_entry) runner._background_tasks = set() class _FakeRegistry: def list_sessions(self): return [] monkeypatch.setattr("tools.process_registry.process_registry", _FakeRegistry()) result = await runner._handle_message(_make_event("/tasks")) assert "Active Agents & Tasks" in result @pytest.mark.asyncio async def test_handle_message_persists_agent_token_counts(monkeypatch): import gateway.run as gateway_run session_entry = SessionEntry( session_key=build_session_key(_make_source()), session_id="sess-1", created_at=datetime.now(), updated_at=datetime.now(), platform=Platform.TELEGRAM, chat_type="dm", ) runner = _make_runner(session_entry) runner.session_store.load_transcript.return_value = [{"role": "user", "content": "earlier"}] runner._run_agent = AsyncMock( return_value={ "final_response": "ok", "messages": [], "tools": [], "history_offset": 0, "last_prompt_tokens": 80, "input_tokens": 120, "output_tokens": 45, "model": "openai/test-model", } ) monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) monkeypatch.setattr( "agent.model_metadata.get_model_context_length", lambda *_args, **_kwargs: 100000, ) result = await runner._handle_message(_make_event("hello")) assert result == "ok" runner.session_store.update_session.assert_called_once_with( session_entry.session_key, last_prompt_tokens=80, ) @pytest.mark.asyncio async def test_handle_message_discards_stale_result_after_session_invalidation(monkeypatch): import gateway.run as gateway_run session_entry = SessionEntry( session_key=build_session_key(_make_source()), session_id="sess-1", created_at=datetime.now(), updated_at=datetime.now(), platform=Platform.TELEGRAM, chat_type="dm", ) runner = _make_runner(session_entry) runner.session_store.load_transcript.return_value = [{"role": "user", "content": "earlier"}] session_key = session_entry.session_key runner.adapters[Platform.TELEGRAM]._post_delivery_callbacks = {session_key: object()} async def _stale_result(**kwargs): runner._invalidate_session_run_generation(kwargs["session_key"], reason="test_stale_result") return { "final_response": "late reply", "messages": [], "tools": [], "history_offset": 0, "last_prompt_tokens": 80, "input_tokens": 120, "output_tokens": 45, "model": "openai/test-model", } runner._run_agent = AsyncMock(side_effect=_stale_result) monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) monkeypatch.setattr( "agent.model_metadata.get_model_context_length", lambda *_args, **_kwargs: 100000, ) result = await runner._handle_message(_make_event("hello")) assert result is None runner.session_store.append_to_transcript.assert_not_called() runner.session_store.update_session.assert_not_called() assert session_key not in runner.adapters[Platform.TELEGRAM]._post_delivery_callbacks @pytest.mark.asyncio async def test_handle_message_stale_result_keeps_newer_generation_callback(monkeypatch): import gateway.run as gateway_run class _Adapter: def __init__(self): self._post_delivery_callbacks = {} async def send(self, *args, **kwargs): return None def pop_post_delivery_callback(self, session_key, *, generation=None): entry = self._post_delivery_callbacks.get(session_key) if entry is None: return None if isinstance(entry, tuple): entry_generation, callback = entry if generation is not None and entry_generation != generation: return None self._post_delivery_callbacks.pop(session_key, None) return callback if generation is not None: return None return self._post_delivery_callbacks.pop(session_key, None) session_entry = SessionEntry( session_key=build_session_key(_make_source()), session_id="sess-1", created_at=datetime.now(), updated_at=datetime.now(), platform=Platform.TELEGRAM, chat_type="dm", ) runner = _make_runner(session_entry) runner.session_store.load_transcript.return_value = [{"role": "user", "content": "earlier"}] session_key = session_entry.session_key adapter = _Adapter() runner.adapters[Platform.TELEGRAM] = adapter async def _stale_result(**kwargs): # Simulate a newer run claiming the callback slot before the stale run unwinds. runner._session_run_generation[session_key] = 2 adapter._post_delivery_callbacks[session_key] = (2, lambda: None) return { "final_response": "late reply", "messages": [], "tools": [], "history_offset": 0, "last_prompt_tokens": 80, "input_tokens": 120, "output_tokens": 45, "model": "openai/test-model", } runner._run_agent = AsyncMock(side_effect=_stale_result) monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) monkeypatch.setattr( "agent.model_metadata.get_model_context_length", lambda *_args, **_kwargs: 100000, ) result = await runner._handle_message(_make_event("hello")) assert result is None assert session_key in adapter._post_delivery_callbacks assert adapter._post_delivery_callbacks[session_key][0] == 2 @pytest.mark.asyncio async def test_status_command_bypasses_active_session_guard(): """When an agent is running, /status must be dispatched immediately via base.handle_message — not queued or treated as an interrupt (#5046).""" import asyncio from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType from gateway.session import build_session_key from gateway.config import Platform, PlatformConfig, GatewayConfig source = _make_source() session_key = build_session_key(source) handler_called_with = [] async def fake_handler(event): handler_called_with.append(event) return "📊 **Hermes Gateway Status**\n**Agent Running:** Yes ⚡" # Concrete subclass to avoid abstract method errors class _ConcreteAdapter(BasePlatformAdapter): platform = Platform.TELEGRAM async def connect(self): pass async def disconnect(self): pass async def send(self, chat_id, content, **kwargs): pass async def get_chat_info(self, chat_id): return {} platform_config = PlatformConfig(enabled=True, token="***") adapter = _ConcreteAdapter(platform_config, Platform.TELEGRAM) adapter.set_message_handler(fake_handler) sent = [] async def fake_send_with_retry(chat_id, content, reply_to=None, metadata=None): sent.append(content) adapter._send_with_retry = fake_send_with_retry # Simulate an active session interrupt_event = asyncio.Event() adapter._active_sessions[session_key] = interrupt_event event = MessageEvent( text="/status", source=source, message_id="m1", message_type=MessageType.COMMAND, ) await adapter.handle_message(event) assert handler_called_with, "/status handler was never called (event was queued or dropped)" assert sent, "/status response was never sent" assert "Agent Running" in sent[0] assert not interrupt_event.is_set(), "/status incorrectly triggered an agent interrupt" assert session_key not in adapter._pending_messages, "/status was incorrectly queued" @pytest.mark.asyncio async def test_profile_command_reports_custom_root_profile(monkeypatch, tmp_path): """Gateway /profile detects custom-root profiles (not under ~/.hermes).""" from pathlib import Path session_entry = SessionEntry( session_key=build_session_key(_make_source()), session_id="sess-1", created_at=datetime.now(), updated_at=datetime.now(), platform=Platform.TELEGRAM, chat_type="dm", ) runner = _make_runner(session_entry) profile_home = tmp_path / "profiles" / "coder" monkeypatch.setenv("HERMES_HOME", str(profile_home)) monkeypatch.setattr(Path, "home", lambda: tmp_path / "unrelated-home") result = await runner._handle_profile_command(_make_event("/profile")) assert "**Profile:** `coder`" in result assert f"**Home:** `{profile_home}`" in result