diff --git a/gateway/run.py b/gateway/run.py index 222e28c3eb..0cdfb71466 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1391,6 +1391,65 @@ class GatewayRunner: except Exception as e: logger.debug("Failed interrupting agent during shutdown: %s", e) + async def _notify_active_sessions_of_shutdown(self) -> None: + """Send a notification to every chat with an active agent. + + Called at the very start of stop() — adapters are still connected so + messages can be delivered. Best-effort: individual send failures are + logged and swallowed so they never block the shutdown sequence. + """ + active = self._snapshot_running_agents() + if not active: + return + + action = "restarting" if self._restart_requested else "shutting down" + hint = ( + "Your current task will be interrupted. " + "Use /retry after restart to continue." + if self._restart_requested + else "Your current task will be interrupted." + ) + msg = f"⚠️ Gateway {action} — {hint}" + + notified: set = set() + for session_key in active: + # Parse platform + chat_id from the session key. + # Format: agent:main:{platform}:{chat_type}:{chat_id}[:{extra}...] + parts = session_key.split(":") + if len(parts) < 5: + continue + platform_str = parts[2] + chat_id = parts[4] + + # Deduplicate: one notification per chat, even if multiple + # sessions (different users/threads) share the same chat. + dedup_key = (platform_str, chat_id) + if dedup_key in notified: + continue + + try: + platform = Platform(platform_str) + adapter = self.adapters.get(platform) + if not adapter: + continue + + # Include thread_id if present so the message lands in the + # correct forum topic / thread. + thread_id = parts[5] if len(parts) > 5 else None + metadata = {"thread_id": thread_id} if thread_id else None + + await adapter.send(chat_id, msg, metadata=metadata) + notified.add(dedup_key) + logger.info( + "Sent shutdown notification to %s:%s", + platform_str, chat_id, + ) + except Exception as e: + logger.debug( + "Failed to send shutdown notification to %s:%s: %s", + platform_str, chat_id, e, + ) + def _finalize_shutdown_agents(self, active_agents: Dict[str, Any]) -> None: for agent in active_agents.values(): try: @@ -2018,6 +2077,10 @@ class GatewayRunner: self._running = False self._draining = True + # Notify all chats with active agents BEFORE draining. + # Adapters are still connected here, so messages can be sent. + await self._notify_active_sessions_of_shutdown() + timeout = self._restart_drain_timeout active_agents, timed_out = await self._drain_active_agents(timeout) if timed_out: @@ -2088,12 +2151,23 @@ class GatewayRunner: # Write a clean-shutdown marker so the next startup knows this # wasn't a crash. suspend_recently_active() only needs to run - # after unexpected exits — graceful shutdowns already drain - # active agents, so there's no stuck-session risk. - try: - (_hermes_home / ".clean_shutdown").touch() - except Exception: - pass + # after unexpected exits. However, if the drain timed out and + # agents were force-interrupted, their sessions may be in an + # incomplete state (trailing tool response, no final assistant + # message). Skip the marker in that case so the next startup + # suspends those sessions — giving users a clean slate instead + # of resuming a half-finished tool loop. + if not timed_out: + try: + (_hermes_home / ".clean_shutdown").touch() + except Exception: + pass + else: + logger.info( + "Skipping .clean_shutdown marker — drain timed out with " + "interrupted agents; next startup will suspend recently " + "active sessions." + ) if self._restart_requested and self._restart_via_service: self._exit_code = GATEWAY_SERVICE_RESTART_EXIT_CODE diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 721e68143c..c73344be4e 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -4036,7 +4036,40 @@ def cmd_update(args): capture_output=True, text=True, timeout=15, ) if restart.returncode == 0: - restarted_services.append(svc_name) + # Verify the service actually survived the + # restart. systemctl restart returns 0 even + # if the new process crashes immediately. + import time as _time + _time.sleep(3) + verify = subprocess.run( + scope_cmd + ["is-active", svc_name], + capture_output=True, text=True, timeout=5, + ) + if verify.stdout.strip() == "active": + restarted_services.append(svc_name) + else: + # Retry once — transient startup failures + # (stale module cache, import race) often + # resolve on the second attempt. + print(f" ⚠ {svc_name} died after restart, retrying...") + retry = subprocess.run( + scope_cmd + ["restart", svc_name], + capture_output=True, text=True, timeout=15, + ) + _time.sleep(3) + verify2 = subprocess.run( + scope_cmd + ["is-active", svc_name], + capture_output=True, text=True, timeout=5, + ) + if verify2.stdout.strip() == "active": + restarted_services.append(svc_name) + print(f" ✓ {svc_name} recovered on retry") + else: + print( + f" ✗ {svc_name} failed to stay running after restart.\n" + f" Check logs: journalctl --user -u {svc_name} --since '2 min ago'\n" + f" Restart manually: systemctl {'--user ' if scope == 'user' else ''}restart {svc_name}" + ) else: print(f" ⚠ Failed to restart {svc_name}: {restart.stderr.strip()}") except (FileNotFoundError, subprocess.TimeoutExpired): diff --git a/tests/gateway/restart_test_helpers.py b/tests/gateway/restart_test_helpers.py index 8b48974673..75665325b6 100644 --- a/tests/gateway/restart_test_helpers.py +++ b/tests/gateway/restart_test_helpers.py @@ -93,6 +93,12 @@ def make_restart_runner( runner._running_agent_count = GatewayRunner._running_agent_count.__get__( runner, GatewayRunner ) + runner._snapshot_running_agents = GatewayRunner._snapshot_running_agents.__get__( + runner, GatewayRunner + ) + runner._notify_active_sessions_of_shutdown = ( + GatewayRunner._notify_active_sessions_of_shutdown.__get__(runner, GatewayRunner) + ) runner._launch_detached_restart_command = GatewayRunner._launch_detached_restart_command.__get__( runner, GatewayRunner ) diff --git a/tests/gateway/test_restart_drain.py b/tests/gateway/test_restart_drain.py index cfc2c364c6..732470c122 100644 --- a/tests/gateway/test_restart_drain.py +++ b/tests/gateway/test_restart_drain.py @@ -161,3 +161,84 @@ async def test_launch_detached_restart_command_uses_setsid(monkeypatch): assert kwargs["start_new_session"] is True assert kwargs["stdout"] is subprocess.DEVNULL assert kwargs["stderr"] is subprocess.DEVNULL + + +# ── Shutdown notification tests ────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_shutdown_notification_sent_to_active_sessions(): + """Active sessions receive a notification when the gateway starts shutting down.""" + runner, adapter = make_restart_runner() + source = make_restart_source(chat_id="999", chat_type="dm") + session_key = f"agent:main:telegram:dm:999" + runner._running_agents[session_key] = MagicMock() + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent) == 1 + assert "shutting down" in adapter.sent[0] + assert "interrupted" in adapter.sent[0] + + +@pytest.mark.asyncio +async def test_shutdown_notification_says_restarting_when_restart_requested(): + """When _restart_requested is True, the message says 'restarting' and mentions /retry.""" + runner, adapter = make_restart_runner() + runner._restart_requested = True + session_key = "agent:main:telegram:dm:999" + runner._running_agents[session_key] = MagicMock() + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent) == 1 + assert "restarting" in adapter.sent[0] + assert "/retry" in adapter.sent[0] + + +@pytest.mark.asyncio +async def test_shutdown_notification_deduplicates_per_chat(): + """Multiple sessions in the same chat only get one notification.""" + runner, adapter = make_restart_runner() + # Two sessions (different users) in the same chat + runner._running_agents["agent:main:telegram:group:chat1:u1"] = MagicMock() + runner._running_agents["agent:main:telegram:group:chat1:u2"] = MagicMock() + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent) == 1 + + +@pytest.mark.asyncio +async def test_shutdown_notification_skipped_when_no_active_agents(): + """No notification is sent when there are no active agents.""" + runner, adapter = make_restart_runner() + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent) == 0 + + +@pytest.mark.asyncio +async def test_shutdown_notification_ignores_pending_sentinels(): + """Pending sentinels (not-yet-started agents) don't trigger notifications.""" + from gateway.run import _AGENT_PENDING_SENTINEL + + runner, adapter = make_restart_runner() + runner._running_agents["agent:main:telegram:dm:999"] = _AGENT_PENDING_SENTINEL + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent) == 0 + + +@pytest.mark.asyncio +async def test_shutdown_notification_send_failure_does_not_block(): + """If sending a notification fails, the method still completes.""" + runner, adapter = make_restart_runner() + adapter.send = AsyncMock(side_effect=Exception("network error")) + session_key = "agent:main:telegram:dm:999" + runner._running_agents[session_key] = MagicMock() + + # Should not raise + await runner._notify_active_sessions_of_shutdown()