mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-12 08:51:53 +00:00
fix(discord): recover from runtime gateway task exits (#44383)
* fix(discord): recover from runtime gateway task exits Salvaged from #39416 (AMEOBIUS) — cherry-picked only the task-exit recovery; the original PR was 1081 commits behind with 28 unrelated commits. A post-ready discord.py WebSocket crash left the gateway split-brained: producers stayed active while Discord stopped responding. After this fix the adapter calls _set_fatal_error(retryable=True) + _notify_fatal_error() so the existing GatewayRunner reconnect watcher replaces the dead adapter. Also adds _wait_for_ready_or_bot_exit() so startup failures (SOCKS/proxy errors, invalid tokens) surface fast instead of burning the full ready timeout. Because connect() no longer waits via asyncio.wait_for on that path, test_connect_releases_token_lock_on_timeout is updated to trigger the timeout through the new helper (same lock-release contract). 3 tests pass (2 new runtime-failure tests + the updated timeout test); test_discord_connect.py and test_discord_slash_commands.py green. Co-Authored-By: ameobius <ameobius@local.host> * fix(test): patch _wait_for_ready_or_bot_exit in timeout cancel test connect() no longer uses asyncio.wait_for for the ready handshake, so test_connect_timeout_cancels_bot_task was hanging for 30s in CI. Co-authored-by: Cursor <cursoragent@cursor.com> --------- Co-authored-by: ameobius <ameobius@local.host> Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
parent
e080365a7a
commit
c3464ecf45
3 changed files with 174 additions and 9 deletions
|
|
@ -20,6 +20,7 @@ import tempfile
|
|||
import threading
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from contextlib import suppress
|
||||
from typing import Callable, Dict, List, Optional, Any, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -68,6 +69,43 @@ from gateway.platforms.base import (
|
|||
from tools.url_safety import is_safe_url
|
||||
|
||||
|
||||
async def _wait_for_ready_or_bot_exit(
|
||||
ready_event: asyncio.Event,
|
||||
bot_task: asyncio.Task,
|
||||
timeout: float,
|
||||
) -> None:
|
||||
"""Wait until Discord is ready, or surface early bot startup failure.
|
||||
|
||||
``discord.py`` startup errors (including SOCKS/proxy failures from
|
||||
aiohttp-socks/python-socks) happen inside ``Bot.start()``. If ``connect()``
|
||||
only waits on ``ready_event``, a dead background task still burns the full
|
||||
ready timeout before the gateway supervisor can reconnect. Racing the ready
|
||||
event against the bot task keeps failures fast and preserves the original
|
||||
exception for logging/classification.
|
||||
"""
|
||||
ready_task = asyncio.create_task(ready_event.wait())
|
||||
try:
|
||||
done, _pending = await asyncio.wait(
|
||||
{ready_task, bot_task},
|
||||
timeout=timeout,
|
||||
return_when=asyncio.FIRST_COMPLETED,
|
||||
)
|
||||
if not done:
|
||||
raise asyncio.TimeoutError
|
||||
if bot_task in done:
|
||||
exc = bot_task.exception()
|
||||
if exc is not None:
|
||||
raise exc
|
||||
if not ready_task.done():
|
||||
raise RuntimeError("Discord bot task exited before ready")
|
||||
await ready_task
|
||||
finally:
|
||||
if not ready_task.done():
|
||||
ready_task.cancel()
|
||||
with suppress(asyncio.CancelledError):
|
||||
await ready_task
|
||||
|
||||
|
||||
def _find_discord_windows_bundled_opus(discord_module: Any = None) -> Optional[str]:
|
||||
"""Return discord.py's bundled Windows opus DLL path when present."""
|
||||
if sys.platform != "win32":
|
||||
|
|
@ -622,6 +660,10 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||
self._typing_tasks: Dict[str, asyncio.Task] = {}
|
||||
self._bot_task: Optional[asyncio.Task] = None
|
||||
self._post_connect_task: Optional[asyncio.Task] = None
|
||||
# True while disconnect() is intentionally closing discord.py. The
|
||||
# bot task's done callback uses this to distinguish an operator/service
|
||||
# shutdown from a runtime websocket crash.
|
||||
self._disconnecting = False
|
||||
# Dedup cache: prevents duplicate bot responses when Discord
|
||||
# RESUME replays events after reconnects.
|
||||
self._dedup = MessageDeduplicator()
|
||||
|
|
@ -634,6 +676,65 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||
# scanning channel.history() on cache miss (cold start / restart).
|
||||
self._last_self_message_id: Dict[str, str] = {}
|
||||
|
||||
def _handle_bot_task_done(self, task: asyncio.Task) -> None:
|
||||
"""Surface post-startup discord.py task exits to the gateway supervisor.
|
||||
|
||||
discord.py reconnects normal gateway interruptions internally. When its
|
||||
top-level ``Bot.start()`` task actually exits after the adapter has been
|
||||
marked running, the Discord websocket is dead while the Hermes gateway
|
||||
process can remain alive. Treat that split-brain state as a retryable
|
||||
fatal adapter error so ``GatewayRunner._handle_adapter_fatal_error`` can
|
||||
remove this adapter and queue Discord for the existing reconnect watcher.
|
||||
"""
|
||||
if getattr(self, "_disconnecting", False):
|
||||
# Intentional service/operator shutdown. Drain the task result so
|
||||
# asyncio doesn't emit "exception was never retrieved" warnings.
|
||||
with suppress(asyncio.CancelledError, Exception):
|
||||
task.exception()
|
||||
return
|
||||
|
||||
# Ignore stale callbacks from an older client if a reconnect already
|
||||
# installed a newer Bot.start() task on this adapter instance.
|
||||
if self._bot_task is not None and task is not self._bot_task:
|
||||
with suppress(asyncio.CancelledError, Exception):
|
||||
task.exception()
|
||||
return
|
||||
|
||||
if not self._running:
|
||||
# Startup failures are handled by _wait_for_ready_or_bot_exit() in
|
||||
# connect(); this callback is only for post-startup split-brain.
|
||||
with suppress(asyncio.CancelledError, Exception):
|
||||
task.exception()
|
||||
return
|
||||
|
||||
try:
|
||||
exc = task.exception()
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
except Exception as err: # pragma: no cover - defensive
|
||||
exc = err
|
||||
|
||||
if exc is None:
|
||||
message = "Discord gateway task exited without an exception"
|
||||
else:
|
||||
message = f"Discord gateway task exited: {exc}"
|
||||
|
||||
logger.error("[%s] %s", self.name, message, exc_info=exc if exc else False)
|
||||
self._set_fatal_error("discord_gateway_task_exited", message, retryable=True)
|
||||
|
||||
async def _notify() -> None:
|
||||
try:
|
||||
await self._notify_fatal_error()
|
||||
except Exception as notify_exc: # pragma: no cover - defensive logging
|
||||
logger.warning(
|
||||
"[%s] Failed to notify gateway supervisor about Discord task exit: %s",
|
||||
self.name,
|
||||
notify_exc,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
asyncio.create_task(_notify())
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to Discord and start receiving events."""
|
||||
if not DISCORD_AVAILABLE:
|
||||
|
|
@ -900,10 +1001,13 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||
self._register_slash_commands()
|
||||
|
||||
# Start the bot in background
|
||||
self._disconnecting = False
|
||||
self._bot_task = asyncio.create_task(self._client.start(self.config.token))
|
||||
self._bot_task.add_done_callback(self._handle_bot_task_done)
|
||||
|
||||
# Wait for ready
|
||||
await asyncio.wait_for(self._ready_event.wait(), timeout=30)
|
||||
# Wait for ready, but fail fast if discord.py's background startup
|
||||
# task dies first (for example on SOCKS/proxy connect errors).
|
||||
await _wait_for_ready_or_bot_exit(self._ready_event, self._bot_task, timeout=30)
|
||||
|
||||
self._running = True
|
||||
return True
|
||||
|
|
@ -938,6 +1042,7 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||
|
||||
async def disconnect(self) -> None:
|
||||
"""Disconnect from Discord."""
|
||||
self._disconnecting = True
|
||||
# Cancel the bot task before closing the client. If connect() timed out
|
||||
# and returned False, the background client.start() task may still be
|
||||
# running; calling client.close() alone is not enough to stop it because
|
||||
|
|
@ -945,7 +1050,6 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||
# WebSocket handshake is in flight. Explicitly cancelling the task here
|
||||
# ensures the zombie client cannot receive or dispatch any further events.
|
||||
await self._cancel_bot_task()
|
||||
|
||||
# Clean up all active voice connections before closing the client
|
||||
for guild_id in list(self._voice_clients.keys()):
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -266,11 +266,12 @@ async def test_connect_releases_token_lock_on_timeout(monkeypatch):
|
|||
),
|
||||
)
|
||||
|
||||
async def fake_wait_for(awaitable, timeout):
|
||||
awaitable.close()
|
||||
async def fake_wait_for_ready(ready_event, bot_task, timeout):
|
||||
raise asyncio.TimeoutError()
|
||||
|
||||
monkeypatch.setattr(discord_platform.asyncio, "wait_for", fake_wait_for)
|
||||
monkeypatch.setattr(
|
||||
discord_platform, "_wait_for_ready_or_bot_exit", fake_wait_for_ready
|
||||
)
|
||||
|
||||
ok = await adapter.connect()
|
||||
|
||||
|
|
@ -314,11 +315,12 @@ async def test_connect_timeout_cancels_bot_task(monkeypatch):
|
|||
),
|
||||
)
|
||||
|
||||
async def fake_wait_for(awaitable, timeout):
|
||||
awaitable.close()
|
||||
async def fake_wait_for_ready(ready_event, bot_task, timeout):
|
||||
raise asyncio.TimeoutError()
|
||||
|
||||
monkeypatch.setattr(discord_platform.asyncio, "wait_for", fake_wait_for)
|
||||
monkeypatch.setattr(
|
||||
discord_platform, "_wait_for_ready_or_bot_exit", fake_wait_for_ready
|
||||
)
|
||||
|
||||
ok = await adapter.connect()
|
||||
|
||||
|
|
|
|||
59
tests/plugins/test_discord_runtime_failure.py
Normal file
59
tests/plugins/test_discord_runtime_failure.py
Normal file
|
|
@ -0,0 +1,59 @@
|
|||
import asyncio
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
from plugins.platforms.discord.adapter import DiscordAdapter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_discord_bot_task_runtime_exit_notifies_gateway_for_reconnect(monkeypatch):
|
||||
"""A post-ready discord.py websocket task crash must not leave the gateway split-brained.
|
||||
|
||||
Regression: producers stayed systemd-active while Discord stopped responding after
|
||||
a runtime ClientOSError/ConnectionResetError. The adapter must mark Discord as a
|
||||
retryable fatal platform error and notify the gateway supervisor so the existing
|
||||
reconnect watcher can replace the dead adapter.
|
||||
"""
|
||||
adapter = DiscordAdapter(PlatformConfig(enabled=True, token="token"))
|
||||
adapter._running = True
|
||||
adapter._ready_event.set()
|
||||
adapter._notify_fatal_error = AsyncMock()
|
||||
|
||||
async def crash():
|
||||
raise ConnectionResetError("Cannot write to closing transport")
|
||||
|
||||
task = asyncio.create_task(crash())
|
||||
await asyncio.sleep(0)
|
||||
|
||||
adapter._handle_bot_task_done(task)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert adapter.has_fatal_error is True
|
||||
assert adapter.fatal_error_retryable is True
|
||||
assert adapter.fatal_error_code == "discord_gateway_task_exited"
|
||||
assert adapter.fatal_error_message is not None
|
||||
assert "Cannot write to closing transport" in adapter.fatal_error_message
|
||||
adapter._notify_fatal_error.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_discord_bot_task_done_ignored_during_intentional_disconnect():
|
||||
adapter = DiscordAdapter(PlatformConfig(enabled=True, token="token"))
|
||||
adapter._running = True
|
||||
adapter._ready_event.set()
|
||||
adapter._disconnecting = True
|
||||
adapter._notify_fatal_error = AsyncMock()
|
||||
|
||||
async def stop_cleanly():
|
||||
return None
|
||||
|
||||
task = asyncio.create_task(stop_cleanly())
|
||||
await asyncio.sleep(0)
|
||||
|
||||
adapter._handle_bot_task_done(task)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert adapter.has_fatal_error is False
|
||||
adapter._notify_fatal_error.assert_not_awaited()
|
||||
Loading…
Add table
Add a link
Reference in a new issue