From c3464ecf453d02410c65a14812f409d186eceead Mon Sep 17 00:00:00 2001 From: Austin Pickett Date: Thu, 11 Jun 2026 15:39:01 -0400 Subject: [PATCH] fix(discord): recover from runtime gateway task exits (#44383) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(discord): recover from runtime gateway task exits Salvaged from #39416 (AMEOBIUS) — cherry-picked only the task-exit recovery; the original PR was 1081 commits behind with 28 unrelated commits. A post-ready discord.py WebSocket crash left the gateway split-brained: producers stayed active while Discord stopped responding. After this fix the adapter calls _set_fatal_error(retryable=True) + _notify_fatal_error() so the existing GatewayRunner reconnect watcher replaces the dead adapter. Also adds _wait_for_ready_or_bot_exit() so startup failures (SOCKS/proxy errors, invalid tokens) surface fast instead of burning the full ready timeout. Because connect() no longer waits via asyncio.wait_for on that path, test_connect_releases_token_lock_on_timeout is updated to trigger the timeout through the new helper (same lock-release contract). 3 tests pass (2 new runtime-failure tests + the updated timeout test); test_discord_connect.py and test_discord_slash_commands.py green. Co-Authored-By: ameobius * fix(test): patch _wait_for_ready_or_bot_exit in timeout cancel test connect() no longer uses asyncio.wait_for for the ready handshake, so test_connect_timeout_cancels_bot_task was hanging for 30s in CI. Co-authored-by: Cursor --------- Co-authored-by: ameobius Co-authored-by: Cursor --- plugins/platforms/discord/adapter.py | 110 +++++++++++++++++- tests/gateway/test_discord_connect.py | 14 ++- tests/plugins/test_discord_runtime_failure.py | 59 ++++++++++ 3 files changed, 174 insertions(+), 9 deletions(-) create mode 100644 tests/plugins/test_discord_runtime_failure.py diff --git a/plugins/platforms/discord/adapter.py b/plugins/platforms/discord/adapter.py index c205c5b942b..196564dd14f 100644 --- a/plugins/platforms/discord/adapter.py +++ b/plugins/platforms/discord/adapter.py @@ -20,6 +20,7 @@ import tempfile import threading import time from collections import defaultdict +from contextlib import suppress from typing import Callable, Dict, List, Optional, Any, Tuple logger = logging.getLogger(__name__) @@ -68,6 +69,43 @@ from gateway.platforms.base import ( from tools.url_safety import is_safe_url +async def _wait_for_ready_or_bot_exit( + ready_event: asyncio.Event, + bot_task: asyncio.Task, + timeout: float, +) -> None: + """Wait until Discord is ready, or surface early bot startup failure. + + ``discord.py`` startup errors (including SOCKS/proxy failures from + aiohttp-socks/python-socks) happen inside ``Bot.start()``. If ``connect()`` + only waits on ``ready_event``, a dead background task still burns the full + ready timeout before the gateway supervisor can reconnect. Racing the ready + event against the bot task keeps failures fast and preserves the original + exception for logging/classification. + """ + ready_task = asyncio.create_task(ready_event.wait()) + try: + done, _pending = await asyncio.wait( + {ready_task, bot_task}, + timeout=timeout, + return_when=asyncio.FIRST_COMPLETED, + ) + if not done: + raise asyncio.TimeoutError + if bot_task in done: + exc = bot_task.exception() + if exc is not None: + raise exc + if not ready_task.done(): + raise RuntimeError("Discord bot task exited before ready") + await ready_task + finally: + if not ready_task.done(): + ready_task.cancel() + with suppress(asyncio.CancelledError): + await ready_task + + def _find_discord_windows_bundled_opus(discord_module: Any = None) -> Optional[str]: """Return discord.py's bundled Windows opus DLL path when present.""" if sys.platform != "win32": @@ -622,6 +660,10 @@ class DiscordAdapter(BasePlatformAdapter): self._typing_tasks: Dict[str, asyncio.Task] = {} self._bot_task: Optional[asyncio.Task] = None self._post_connect_task: Optional[asyncio.Task] = None + # True while disconnect() is intentionally closing discord.py. The + # bot task's done callback uses this to distinguish an operator/service + # shutdown from a runtime websocket crash. + self._disconnecting = False # Dedup cache: prevents duplicate bot responses when Discord # RESUME replays events after reconnects. self._dedup = MessageDeduplicator() @@ -634,6 +676,65 @@ class DiscordAdapter(BasePlatformAdapter): # scanning channel.history() on cache miss (cold start / restart). self._last_self_message_id: Dict[str, str] = {} + def _handle_bot_task_done(self, task: asyncio.Task) -> None: + """Surface post-startup discord.py task exits to the gateway supervisor. + + discord.py reconnects normal gateway interruptions internally. When its + top-level ``Bot.start()`` task actually exits after the adapter has been + marked running, the Discord websocket is dead while the Hermes gateway + process can remain alive. Treat that split-brain state as a retryable + fatal adapter error so ``GatewayRunner._handle_adapter_fatal_error`` can + remove this adapter and queue Discord for the existing reconnect watcher. + """ + if getattr(self, "_disconnecting", False): + # Intentional service/operator shutdown. Drain the task result so + # asyncio doesn't emit "exception was never retrieved" warnings. + with suppress(asyncio.CancelledError, Exception): + task.exception() + return + + # Ignore stale callbacks from an older client if a reconnect already + # installed a newer Bot.start() task on this adapter instance. + if self._bot_task is not None and task is not self._bot_task: + with suppress(asyncio.CancelledError, Exception): + task.exception() + return + + if not self._running: + # Startup failures are handled by _wait_for_ready_or_bot_exit() in + # connect(); this callback is only for post-startup split-brain. + with suppress(asyncio.CancelledError, Exception): + task.exception() + return + + try: + exc = task.exception() + except asyncio.CancelledError: + return + except Exception as err: # pragma: no cover - defensive + exc = err + + if exc is None: + message = "Discord gateway task exited without an exception" + else: + message = f"Discord gateway task exited: {exc}" + + logger.error("[%s] %s", self.name, message, exc_info=exc if exc else False) + self._set_fatal_error("discord_gateway_task_exited", message, retryable=True) + + async def _notify() -> None: + try: + await self._notify_fatal_error() + except Exception as notify_exc: # pragma: no cover - defensive logging + logger.warning( + "[%s] Failed to notify gateway supervisor about Discord task exit: %s", + self.name, + notify_exc, + exc_info=True, + ) + + asyncio.create_task(_notify()) + async def connect(self) -> bool: """Connect to Discord and start receiving events.""" if not DISCORD_AVAILABLE: @@ -900,10 +1001,13 @@ class DiscordAdapter(BasePlatformAdapter): self._register_slash_commands() # Start the bot in background + self._disconnecting = False self._bot_task = asyncio.create_task(self._client.start(self.config.token)) + self._bot_task.add_done_callback(self._handle_bot_task_done) - # Wait for ready - await asyncio.wait_for(self._ready_event.wait(), timeout=30) + # Wait for ready, but fail fast if discord.py's background startup + # task dies first (for example on SOCKS/proxy connect errors). + await _wait_for_ready_or_bot_exit(self._ready_event, self._bot_task, timeout=30) self._running = True return True @@ -938,6 +1042,7 @@ class DiscordAdapter(BasePlatformAdapter): async def disconnect(self) -> None: """Disconnect from Discord.""" + self._disconnecting = True # Cancel the bot task before closing the client. If connect() timed out # and returned False, the background client.start() task may still be # running; calling client.close() alone is not enough to stop it because @@ -945,7 +1050,6 @@ class DiscordAdapter(BasePlatformAdapter): # WebSocket handshake is in flight. Explicitly cancelling the task here # ensures the zombie client cannot receive or dispatch any further events. await self._cancel_bot_task() - # Clean up all active voice connections before closing the client for guild_id in list(self._voice_clients.keys()): try: diff --git a/tests/gateway/test_discord_connect.py b/tests/gateway/test_discord_connect.py index b7bee41ca52..2f12138feb0 100644 --- a/tests/gateway/test_discord_connect.py +++ b/tests/gateway/test_discord_connect.py @@ -266,11 +266,12 @@ async def test_connect_releases_token_lock_on_timeout(monkeypatch): ), ) - async def fake_wait_for(awaitable, timeout): - awaitable.close() + async def fake_wait_for_ready(ready_event, bot_task, timeout): raise asyncio.TimeoutError() - monkeypatch.setattr(discord_platform.asyncio, "wait_for", fake_wait_for) + monkeypatch.setattr( + discord_platform, "_wait_for_ready_or_bot_exit", fake_wait_for_ready + ) ok = await adapter.connect() @@ -314,11 +315,12 @@ async def test_connect_timeout_cancels_bot_task(monkeypatch): ), ) - async def fake_wait_for(awaitable, timeout): - awaitable.close() + async def fake_wait_for_ready(ready_event, bot_task, timeout): raise asyncio.TimeoutError() - monkeypatch.setattr(discord_platform.asyncio, "wait_for", fake_wait_for) + monkeypatch.setattr( + discord_platform, "_wait_for_ready_or_bot_exit", fake_wait_for_ready + ) ok = await adapter.connect() diff --git a/tests/plugins/test_discord_runtime_failure.py b/tests/plugins/test_discord_runtime_failure.py new file mode 100644 index 00000000000..dc23a130d96 --- /dev/null +++ b/tests/plugins/test_discord_runtime_failure.py @@ -0,0 +1,59 @@ +import asyncio +from unittest.mock import AsyncMock + +import pytest + +from gateway.config import PlatformConfig +from plugins.platforms.discord.adapter import DiscordAdapter + + +@pytest.mark.asyncio +async def test_discord_bot_task_runtime_exit_notifies_gateway_for_reconnect(monkeypatch): + """A post-ready discord.py websocket task crash must not leave the gateway split-brained. + + Regression: producers stayed systemd-active while Discord stopped responding after + a runtime ClientOSError/ConnectionResetError. The adapter must mark Discord as a + retryable fatal platform error and notify the gateway supervisor so the existing + reconnect watcher can replace the dead adapter. + """ + adapter = DiscordAdapter(PlatformConfig(enabled=True, token="token")) + adapter._running = True + adapter._ready_event.set() + adapter._notify_fatal_error = AsyncMock() + + async def crash(): + raise ConnectionResetError("Cannot write to closing transport") + + task = asyncio.create_task(crash()) + await asyncio.sleep(0) + + adapter._handle_bot_task_done(task) + await asyncio.sleep(0) + + assert adapter.has_fatal_error is True + assert adapter.fatal_error_retryable is True + assert adapter.fatal_error_code == "discord_gateway_task_exited" + assert adapter.fatal_error_message is not None + assert "Cannot write to closing transport" in adapter.fatal_error_message + adapter._notify_fatal_error.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_discord_bot_task_done_ignored_during_intentional_disconnect(): + adapter = DiscordAdapter(PlatformConfig(enabled=True, token="token")) + adapter._running = True + adapter._ready_event.set() + adapter._disconnecting = True + adapter._notify_fatal_error = AsyncMock() + + async def stop_cleanly(): + return None + + task = asyncio.create_task(stop_cleanly()) + await asyncio.sleep(0) + + adapter._handle_bot_task_done(task) + await asyncio.sleep(0) + + assert adapter.has_fatal_error is False + adapter._notify_fatal_error.assert_not_awaited()