"""Tests that on_session_finalize and on_session_reset plugin hooks fire in the gateway.""" from datetime import datetime from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch import pytest from gateway.config import GatewayConfig, Platform, PlatformConfig from gateway.platforms.base import MessageEvent from gateway.session import SessionEntry, SessionSource, build_session_key def _make_source() -> SessionSource: return SessionSource( platform=Platform.TELEGRAM, user_id="u1", chat_id="c1", user_name="tester", chat_type="dm", ) def _make_event(text: str) -> MessageEvent: return MessageEvent(text=text, source=_make_source(), message_id="m1") def _make_runner(): from gateway.run import GatewayRunner runner = object.__new__(GatewayRunner) runner.config = GatewayConfig( platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")} ) adapter = MagicMock() adapter.send = AsyncMock() runner.adapters = {Platform.TELEGRAM: adapter} runner._voice_mode = {} runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False) runner._session_model_overrides = {} runner._pending_model_notes = {} runner._background_tasks = set() session_key = build_session_key(_make_source()) session_entry = SessionEntry( session_key=session_key, session_id="sess-old", created_at=datetime.now(), updated_at=datetime.now(), platform=Platform.TELEGRAM, chat_type="dm", ) new_session_entry = SessionEntry( session_key=session_key, session_id="sess-new", created_at=datetime.now(), updated_at=datetime.now(), platform=Platform.TELEGRAM, chat_type="dm", ) runner.session_store = MagicMock() runner.session_store.get_or_create_session.return_value = new_session_entry runner.session_store.reset_session.return_value = new_session_entry runner.session_store._entries = {session_key: session_entry} runner.session_store._generate_session_key.return_value = session_key runner._running_agents = {} runner._pending_messages = {} runner._pending_approvals = {} runner._session_db = None runner._agent_cache_lock = None runner._is_user_authorized = lambda _source: True runner._format_session_info = lambda: "" return runner @pytest.mark.asyncio @patch("hermes_cli.plugins.invoke_hook") async def test_reset_fires_finalize_hook(mock_invoke_hook): """/new must fire on_session_finalize with the OLD session id.""" runner = _make_runner() await runner._handle_reset_command(_make_event("/new")) mock_invoke_hook.assert_any_call( "on_session_finalize", session_id="sess-old", platform="telegram" ) @pytest.mark.asyncio @patch("hermes_cli.plugins.invoke_hook") async def test_reset_fires_reset_hook(mock_invoke_hook): """/new must fire on_session_reset with the NEW session id.""" runner = _make_runner() await runner._handle_reset_command(_make_event("/new")) mock_invoke_hook.assert_any_call( "on_session_reset", session_id="sess-new", platform="telegram" ) @pytest.mark.asyncio @patch("hermes_cli.plugins.invoke_hook") async def test_finalize_before_reset(mock_invoke_hook): """on_session_finalize must fire before on_session_reset.""" runner = _make_runner() await runner._handle_reset_command(_make_event("/new")) calls = [c for c in mock_invoke_hook.call_args_list if c[0][0] in ("on_session_finalize", "on_session_reset")] hook_names = [c[0][0] for c in calls] assert hook_names == ["on_session_finalize", "on_session_reset"] @pytest.mark.asyncio @patch("hermes_cli.plugins.invoke_hook") async def test_shutdown_fires_finalize_for_active_agents(mock_invoke_hook): """Gateway stop() must fire on_session_finalize for each active agent.""" from gateway.run import GatewayRunner runner = object.__new__(GatewayRunner) runner._running = True runner._background_tasks = set() runner._pending_messages = {} runner._pending_approvals = {} runner._shutdown_event = MagicMock() runner.adapters = {} runner._exit_reason = "test" runner._exit_code = None runner._draining = False runner._restart_requested = False runner._restart_task_started = False runner._restart_detached = False runner._restart_via_service = False runner._restart_drain_timeout = 0.0 runner._stop_task = None runner._running_agents_ts = {} runner._update_runtime_status = MagicMock() agent1 = MagicMock() agent1.session_id = "sess-a" agent2 = MagicMock() agent2.session_id = "sess-b" runner._running_agents = {"key-a": agent1, "key-b": agent2} with patch("gateway.status.remove_pid_file"), \ patch("gateway.status.write_runtime_status"): await runner.stop() finalize_calls = [ c for c in mock_invoke_hook.call_args_list if c[0][0] == "on_session_finalize" ] session_ids = {c[1]["session_id"] for c in finalize_calls} assert session_ids == {"sess-a", "sess-b"} @pytest.mark.asyncio @patch("hermes_cli.plugins.invoke_hook", side_effect=Exception("boom")) async def test_hook_error_does_not_break_reset(mock_invoke_hook): """Plugin hook errors must not prevent /new from completing.""" runner = _make_runner() result = await runner._handle_reset_command(_make_event("/new")) # Should still return a success message despite hook errors assert "Session reset" in result or "New session" in result @pytest.mark.asyncio @patch("hermes_cli.plugins.invoke_hook") async def test_idle_expiry_fires_finalize_hook(mock_invoke_hook): """Regression test for #14981. When ``_session_expiry_watcher`` sweeps a session that has aged past its reset policy (idle timeout, scheduled reset), it must fire ``on_session_finalize`` so plugin providers get the same final-pass extraction opportunity they'd get from /new or CLI shutdown. Before the fix, the expiry path flushed memories and evicted the agent but silently skipped the hook. """ from datetime import datetime, timedelta from gateway.run import GatewayRunner runner = object.__new__(GatewayRunner) runner._running = True runner._running_agents = {} runner._agent_cache = {} runner._agent_cache_lock = None runner._last_session_store_prune_ts = 0.0 session_key = "agent:main:telegram:dm:42" expired_entry = SessionEntry( session_key=session_key, session_id="sess-expired", created_at=datetime.now() - timedelta(hours=2), updated_at=datetime.now() - timedelta(hours=2), platform=Platform.TELEGRAM, chat_type="dm", ) expired_entry.memory_flushed = False runner.session_store = MagicMock() runner.session_store._ensure_loaded = MagicMock() runner.session_store._entries = {session_key: expired_entry} runner.session_store._is_session_expired = MagicMock(return_value=True) runner.session_store._lock = MagicMock() runner.session_store._lock.__enter__ = MagicMock(return_value=None) runner.session_store._lock.__exit__ = MagicMock(return_value=None) runner.session_store._save = MagicMock() runner._async_flush_memories = AsyncMock() runner._evict_cached_agent = MagicMock() runner._cleanup_agent_resources = MagicMock() runner._sweep_idle_cached_agents = MagicMock(return_value=0) # The watcher starts with `await asyncio.sleep(60)` and loops while # `self._running`. Patch sleep so the 60s initial delay is instant, then # flip `_running` false inside the flush call so the loop exits cleanly # after one pass. _orig_sleep = __import__("asyncio").sleep async def _fast_sleep(_): await _orig_sleep(0) async def _flush_and_stop(session_id, key): runner._running = False # terminate the loop after this iteration runner._async_flush_memories = AsyncMock(side_effect=_flush_and_stop) with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): await runner._session_expiry_watcher(interval=0) # Look for the finalize call targeting the expired session. finalize_calls = [ c for c in mock_invoke_hook.call_args_list if c[0] and c[0][0] == "on_session_finalize" ] session_ids = {c[1].get("session_id") for c in finalize_calls} assert "sess-expired" in session_ids, ( f"on_session_finalize was not fired during idle expiry; " f"got session_ids={session_ids} (regression of #14981)" )