From 7e2ca7f68da63a29e1695b5e24901cc57e32f2ac Mon Sep 17 00:00:00 2001 From: yungchentang <46495124+yungchentang@users.noreply.github.com> Date: Sat, 27 Jun 2026 14:31:56 +0200 Subject: [PATCH] fix(telegram): reset send pool after pool timeouts --- plugins/platforms/telegram/adapter.py | 50 ++++++++++++++++++- .../gateway/test_telegram_thread_fallback.py | 41 +++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/plugins/platforms/telegram/adapter.py b/plugins/platforms/telegram/adapter.py index 5ca00428a25..43fc17bc410 100644 --- a/plugins/platforms/telegram/adapter.py +++ b/plugins/platforms/telegram/adapter.py @@ -424,6 +424,7 @@ class TelegramAdapter(BasePlatformAdapter): # While True, send() short-circuits to a failure so callers # (cron live-adapter branch) fall through to standalone delivery. self._send_path_degraded: bool = False + self._general_request_drain_lock = asyncio.Lock() # DM Topics: map of topic_name -> message_thread_id (populated at startup) self._dm_topics: Dict[str, int] = {} # Track forum chats where we've already registered bot commands @@ -1513,6 +1514,50 @@ class TelegramAdapter(BasePlatformAdapter): self.name, exc_info=True, ) + def _get_general_request_drain_lock(self) -> asyncio.Lock: + lock = getattr(self, "_general_request_drain_lock", None) + if lock is None: + lock = asyncio.Lock() + self._general_request_drain_lock = lock + return lock + + async def _drain_general_connections_after_pool_timeout(self) -> None: + """Reset the Bot API request pool after a confirmed send pool timeout. + + ``send_message`` uses PTB's general request pool (``_request[1]``). + When httpx reports that this pool is exhausted, PTB says the request + was not sent, so it is safe to reset the wedged pool before retrying. + """ + bot = getattr(getattr(self, "_app", None), "bot", None) + if bot is None: + bot = getattr(self, "_bot", None) + if bot is None: + return + try: + # PTB 22.x: _request is (get_updates_request, general_request). + general_req = bot._request[1] # noqa: SLF001 + except Exception: + return + async with self._get_general_request_drain_lock(): + try: + await general_req.shutdown() + except Exception: + logger.debug( + "[%s] General request shutdown failed after pool timeout (non-fatal)", + self.name, exc_info=True, + ) + try: + await general_req.initialize() + logger.warning( + "[%s] General request pool drained after Telegram pool timeout", + self.name, + ) + except Exception: + logger.debug( + "[%s] General request re-initialize failed after pool timeout (non-fatal)", + self.name, exc_info=True, + ) + async def _handle_polling_network_error(self, error: Exception) -> None: """Reconnect polling after a transient network interruption. @@ -2912,13 +2957,16 @@ class TelegramAdapter(BasePlatformAdapter): # (httpx pool exhausted) is explicitly "not sent to # Telegram" -- retrying through the loop is safe and # prevents silent drops when the pool frees up. + is_pool_timeout = self._looks_like_pool_timeout(send_err) if ( _TimedOut and isinstance(send_err, _TimedOut) and not self._looks_like_connect_timeout(send_err) - and not self._looks_like_pool_timeout(send_err) + and not is_pool_timeout ): raise + if is_pool_timeout: + await self._drain_general_connections_after_pool_timeout() if _send_attempt < 2: wait = 2 ** _send_attempt logger.warning("[%s] Network error on send (attempt %d/3), retrying in %ds: %s", diff --git a/tests/gateway/test_telegram_thread_fallback.py b/tests/gateway/test_telegram_thread_fallback.py index 719b1367846..3f5b7da420c 100644 --- a/tests/gateway/test_telegram_thread_fallback.py +++ b/tests/gateway/test_telegram_thread_fallback.py @@ -11,6 +11,7 @@ avoid retrying with a partial topic route that can render outside the lane. import sys import types from types import SimpleNamespace +from unittest.mock import AsyncMock import pytest @@ -1338,6 +1339,46 @@ async def test_send_retries_pool_timeout(): assert attempt[0] == 3 +@pytest.mark.asyncio +async def test_send_drains_general_request_pool_before_retrying_pool_timeout(): + """Pool timeout should reset the send-message request pool before retrying.""" + adapter = _make_adapter() + general_request = SimpleNamespace( + shutdown=AsyncMock(), + initialize=AsyncMock(), + ) + polling_request = SimpleNamespace( + shutdown=AsyncMock(), + initialize=AsyncMock(), + ) + adapter._app = SimpleNamespace( + bot=SimpleNamespace(_request=(polling_request, general_request)) + ) + + attempt = [0] + + async def mock_send_message(**kwargs): + attempt[0] += 1 + if attempt[0] == 1: + raise FakeTimedOut( + "Pool timeout: All connections in the connection pool are " + "occupied. Request was *not* sent to Telegram." + ) + return SimpleNamespace(message_id=203) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send(chat_id="123", content="test message") + + assert result.success is True + assert result.message_id == "203" + assert attempt[0] == 2 + general_request.shutdown.assert_awaited_once() + general_request.initialize.assert_awaited_once() + polling_request.shutdown.assert_not_awaited() + polling_request.initialize.assert_not_awaited() + + @pytest.mark.asyncio async def test_send_marks_pool_timeout_retryable_after_exhaustion(): """Pool timeout that never clears stays retryable for outer retry handling."""