mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix: notify active sessions on gateway shutdown + update health check
Three fixes for gateway lifecycle stability: 1. Notify active sessions before shutdown (#new) When the gateway receives SIGTERM or /restart, it now sends a notification to every chat with an active agent BEFORE starting the drain. Users see: - Shutdown: 'Gateway shutting down — your task will be interrupted.' - Restart: 'Gateway restarting — use /retry after restart to continue.' Deduplicates per-chat so group sessions with multiple users get one notification. Best-effort: send failures are logged and swallowed. 2. Skip .clean_shutdown marker when drain timed out Previously, a graceful SIGTERM always wrote .clean_shutdown, even if agents were force-interrupted when the drain timed out. This meant the next startup skipped session suspension, leaving interrupted sessions in a broken state (trailing tool response, no final message). Now the marker is only written if the drain completed without timeout, so interrupted sessions get properly suspended on next startup. 3. Post-restart health check for hermes update (#6631) cmd_update() now verifies the gateway actually survived after systemctl restart (sleep 3s + is-active check). If the service crashed immediately, it retries once. If still dead, prints actionable diagnostics (journalctl command, manual restart hint). Also closes #8104 — already fixed on main (the /restart handler correctly detects systemd via INVOCATION_ID and uses via_service=True). Test plan: - 6 new tests for shutdown notifications (dedup, restart vs shutdown messaging, sentinel filtering, send failure resilience) - Existing restart drain + update tests pass (47 total)
This commit is contained in:
parent
95d11dfd8e
commit
fa8c448f7d
4 changed files with 201 additions and 7 deletions
|
|
@ -1391,6 +1391,65 @@ class GatewayRunner:
|
|||
except Exception as e:
|
||||
logger.debug("Failed interrupting agent during shutdown: %s", e)
|
||||
|
||||
async def _notify_active_sessions_of_shutdown(self) -> None:
|
||||
"""Send a notification to every chat with an active agent.
|
||||
|
||||
Called at the very start of stop() — adapters are still connected so
|
||||
messages can be delivered. Best-effort: individual send failures are
|
||||
logged and swallowed so they never block the shutdown sequence.
|
||||
"""
|
||||
active = self._snapshot_running_agents()
|
||||
if not active:
|
||||
return
|
||||
|
||||
action = "restarting" if self._restart_requested else "shutting down"
|
||||
hint = (
|
||||
"Your current task will be interrupted. "
|
||||
"Use /retry after restart to continue."
|
||||
if self._restart_requested
|
||||
else "Your current task will be interrupted."
|
||||
)
|
||||
msg = f"⚠️ Gateway {action} — {hint}"
|
||||
|
||||
notified: set = set()
|
||||
for session_key in active:
|
||||
# Parse platform + chat_id from the session key.
|
||||
# Format: agent:main:{platform}:{chat_type}:{chat_id}[:{extra}...]
|
||||
parts = session_key.split(":")
|
||||
if len(parts) < 5:
|
||||
continue
|
||||
platform_str = parts[2]
|
||||
chat_id = parts[4]
|
||||
|
||||
# Deduplicate: one notification per chat, even if multiple
|
||||
# sessions (different users/threads) share the same chat.
|
||||
dedup_key = (platform_str, chat_id)
|
||||
if dedup_key in notified:
|
||||
continue
|
||||
|
||||
try:
|
||||
platform = Platform(platform_str)
|
||||
adapter = self.adapters.get(platform)
|
||||
if not adapter:
|
||||
continue
|
||||
|
||||
# Include thread_id if present so the message lands in the
|
||||
# correct forum topic / thread.
|
||||
thread_id = parts[5] if len(parts) > 5 else None
|
||||
metadata = {"thread_id": thread_id} if thread_id else None
|
||||
|
||||
await adapter.send(chat_id, msg, metadata=metadata)
|
||||
notified.add(dedup_key)
|
||||
logger.info(
|
||||
"Sent shutdown notification to %s:%s",
|
||||
platform_str, chat_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"Failed to send shutdown notification to %s:%s: %s",
|
||||
platform_str, chat_id, e,
|
||||
)
|
||||
|
||||
def _finalize_shutdown_agents(self, active_agents: Dict[str, Any]) -> None:
|
||||
for agent in active_agents.values():
|
||||
try:
|
||||
|
|
@ -2018,6 +2077,10 @@ class GatewayRunner:
|
|||
self._running = False
|
||||
self._draining = True
|
||||
|
||||
# Notify all chats with active agents BEFORE draining.
|
||||
# Adapters are still connected here, so messages can be sent.
|
||||
await self._notify_active_sessions_of_shutdown()
|
||||
|
||||
timeout = self._restart_drain_timeout
|
||||
active_agents, timed_out = await self._drain_active_agents(timeout)
|
||||
if timed_out:
|
||||
|
|
@ -2088,12 +2151,23 @@ class GatewayRunner:
|
|||
|
||||
# Write a clean-shutdown marker so the next startup knows this
|
||||
# wasn't a crash. suspend_recently_active() only needs to run
|
||||
# after unexpected exits — graceful shutdowns already drain
|
||||
# active agents, so there's no stuck-session risk.
|
||||
try:
|
||||
(_hermes_home / ".clean_shutdown").touch()
|
||||
except Exception:
|
||||
pass
|
||||
# after unexpected exits. However, if the drain timed out and
|
||||
# agents were force-interrupted, their sessions may be in an
|
||||
# incomplete state (trailing tool response, no final assistant
|
||||
# message). Skip the marker in that case so the next startup
|
||||
# suspends those sessions — giving users a clean slate instead
|
||||
# of resuming a half-finished tool loop.
|
||||
if not timed_out:
|
||||
try:
|
||||
(_hermes_home / ".clean_shutdown").touch()
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
logger.info(
|
||||
"Skipping .clean_shutdown marker — drain timed out with "
|
||||
"interrupted agents; next startup will suspend recently "
|
||||
"active sessions."
|
||||
)
|
||||
|
||||
if self._restart_requested and self._restart_via_service:
|
||||
self._exit_code = GATEWAY_SERVICE_RESTART_EXIT_CODE
|
||||
|
|
|
|||
|
|
@ -4036,7 +4036,40 @@ def cmd_update(args):
|
|||
capture_output=True, text=True, timeout=15,
|
||||
)
|
||||
if restart.returncode == 0:
|
||||
restarted_services.append(svc_name)
|
||||
# Verify the service actually survived the
|
||||
# restart. systemctl restart returns 0 even
|
||||
# if the new process crashes immediately.
|
||||
import time as _time
|
||||
_time.sleep(3)
|
||||
verify = subprocess.run(
|
||||
scope_cmd + ["is-active", svc_name],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
if verify.stdout.strip() == "active":
|
||||
restarted_services.append(svc_name)
|
||||
else:
|
||||
# Retry once — transient startup failures
|
||||
# (stale module cache, import race) often
|
||||
# resolve on the second attempt.
|
||||
print(f" ⚠ {svc_name} died after restart, retrying...")
|
||||
retry = subprocess.run(
|
||||
scope_cmd + ["restart", svc_name],
|
||||
capture_output=True, text=True, timeout=15,
|
||||
)
|
||||
_time.sleep(3)
|
||||
verify2 = subprocess.run(
|
||||
scope_cmd + ["is-active", svc_name],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
if verify2.stdout.strip() == "active":
|
||||
restarted_services.append(svc_name)
|
||||
print(f" ✓ {svc_name} recovered on retry")
|
||||
else:
|
||||
print(
|
||||
f" ✗ {svc_name} failed to stay running after restart.\n"
|
||||
f" Check logs: journalctl --user -u {svc_name} --since '2 min ago'\n"
|
||||
f" Restart manually: systemctl {'--user ' if scope == 'user' else ''}restart {svc_name}"
|
||||
)
|
||||
else:
|
||||
print(f" ⚠ Failed to restart {svc_name}: {restart.stderr.strip()}")
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
|
|
|
|||
|
|
@ -93,6 +93,12 @@ def make_restart_runner(
|
|||
runner._running_agent_count = GatewayRunner._running_agent_count.__get__(
|
||||
runner, GatewayRunner
|
||||
)
|
||||
runner._snapshot_running_agents = GatewayRunner._snapshot_running_agents.__get__(
|
||||
runner, GatewayRunner
|
||||
)
|
||||
runner._notify_active_sessions_of_shutdown = (
|
||||
GatewayRunner._notify_active_sessions_of_shutdown.__get__(runner, GatewayRunner)
|
||||
)
|
||||
runner._launch_detached_restart_command = GatewayRunner._launch_detached_restart_command.__get__(
|
||||
runner, GatewayRunner
|
||||
)
|
||||
|
|
|
|||
|
|
@ -161,3 +161,84 @@ async def test_launch_detached_restart_command_uses_setsid(monkeypatch):
|
|||
assert kwargs["start_new_session"] is True
|
||||
assert kwargs["stdout"] is subprocess.DEVNULL
|
||||
assert kwargs["stderr"] is subprocess.DEVNULL
|
||||
|
||||
|
||||
# ── Shutdown notification tests ──────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_shutdown_notification_sent_to_active_sessions():
|
||||
"""Active sessions receive a notification when the gateway starts shutting down."""
|
||||
runner, adapter = make_restart_runner()
|
||||
source = make_restart_source(chat_id="999", chat_type="dm")
|
||||
session_key = f"agent:main:telegram:dm:999"
|
||||
runner._running_agents[session_key] = MagicMock()
|
||||
|
||||
await runner._notify_active_sessions_of_shutdown()
|
||||
|
||||
assert len(adapter.sent) == 1
|
||||
assert "shutting down" in adapter.sent[0]
|
||||
assert "interrupted" in adapter.sent[0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_shutdown_notification_says_restarting_when_restart_requested():
|
||||
"""When _restart_requested is True, the message says 'restarting' and mentions /retry."""
|
||||
runner, adapter = make_restart_runner()
|
||||
runner._restart_requested = True
|
||||
session_key = "agent:main:telegram:dm:999"
|
||||
runner._running_agents[session_key] = MagicMock()
|
||||
|
||||
await runner._notify_active_sessions_of_shutdown()
|
||||
|
||||
assert len(adapter.sent) == 1
|
||||
assert "restarting" in adapter.sent[0]
|
||||
assert "/retry" in adapter.sent[0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_shutdown_notification_deduplicates_per_chat():
|
||||
"""Multiple sessions in the same chat only get one notification."""
|
||||
runner, adapter = make_restart_runner()
|
||||
# Two sessions (different users) in the same chat
|
||||
runner._running_agents["agent:main:telegram:group:chat1:u1"] = MagicMock()
|
||||
runner._running_agents["agent:main:telegram:group:chat1:u2"] = MagicMock()
|
||||
|
||||
await runner._notify_active_sessions_of_shutdown()
|
||||
|
||||
assert len(adapter.sent) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_shutdown_notification_skipped_when_no_active_agents():
|
||||
"""No notification is sent when there are no active agents."""
|
||||
runner, adapter = make_restart_runner()
|
||||
|
||||
await runner._notify_active_sessions_of_shutdown()
|
||||
|
||||
assert len(adapter.sent) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_shutdown_notification_ignores_pending_sentinels():
|
||||
"""Pending sentinels (not-yet-started agents) don't trigger notifications."""
|
||||
from gateway.run import _AGENT_PENDING_SENTINEL
|
||||
|
||||
runner, adapter = make_restart_runner()
|
||||
runner._running_agents["agent:main:telegram:dm:999"] = _AGENT_PENDING_SENTINEL
|
||||
|
||||
await runner._notify_active_sessions_of_shutdown()
|
||||
|
||||
assert len(adapter.sent) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_shutdown_notification_send_failure_does_not_block():
|
||||
"""If sending a notification fails, the method still completes."""
|
||||
runner, adapter = make_restart_runner()
|
||||
adapter.send = AsyncMock(side_effect=Exception("network error"))
|
||||
session_key = "agent:main:telegram:dm:999"
|
||||
runner._running_agents[session_key] = MagicMock()
|
||||
|
||||
# Should not raise
|
||||
await runner._notify_active_sessions_of_shutdown()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue