fix(telegram): reset send pool after pool timeouts

This commit is contained in:
yungchentang 2026-06-27 14:31:56 +02:00 committed by Teknium
parent f3d8f20a59
commit 7e2ca7f68d
2 changed files with 90 additions and 1 deletions

View file

@ -424,6 +424,7 @@ class TelegramAdapter(BasePlatformAdapter):
# While True, send() short-circuits to a failure so callers
# (cron live-adapter branch) fall through to standalone delivery.
self._send_path_degraded: bool = False
self._general_request_drain_lock = asyncio.Lock()
# DM Topics: map of topic_name -> message_thread_id (populated at startup)
self._dm_topics: Dict[str, int] = {}
# Track forum chats where we've already registered bot commands
@ -1513,6 +1514,50 @@ class TelegramAdapter(BasePlatformAdapter):
self.name, exc_info=True,
)
def _get_general_request_drain_lock(self) -> asyncio.Lock:
lock = getattr(self, "_general_request_drain_lock", None)
if lock is None:
lock = asyncio.Lock()
self._general_request_drain_lock = lock
return lock
async def _drain_general_connections_after_pool_timeout(self) -> None:
"""Reset the Bot API request pool after a confirmed send pool timeout.
``send_message`` uses PTB's general request pool (``_request[1]``).
When httpx reports that this pool is exhausted, PTB says the request
was not sent, so it is safe to reset the wedged pool before retrying.
"""
bot = getattr(getattr(self, "_app", None), "bot", None)
if bot is None:
bot = getattr(self, "_bot", None)
if bot is None:
return
try:
# PTB 22.x: _request is (get_updates_request, general_request).
general_req = bot._request[1] # noqa: SLF001
except Exception:
return
async with self._get_general_request_drain_lock():
try:
await general_req.shutdown()
except Exception:
logger.debug(
"[%s] General request shutdown failed after pool timeout (non-fatal)",
self.name, exc_info=True,
)
try:
await general_req.initialize()
logger.warning(
"[%s] General request pool drained after Telegram pool timeout",
self.name,
)
except Exception:
logger.debug(
"[%s] General request re-initialize failed after pool timeout (non-fatal)",
self.name, exc_info=True,
)
async def _handle_polling_network_error(self, error: Exception) -> None:
"""Reconnect polling after a transient network interruption.
@ -2912,13 +2957,16 @@ class TelegramAdapter(BasePlatformAdapter):
# (httpx pool exhausted) is explicitly "not sent to
# Telegram" -- retrying through the loop is safe and
# prevents silent drops when the pool frees up.
is_pool_timeout = self._looks_like_pool_timeout(send_err)
if (
_TimedOut
and isinstance(send_err, _TimedOut)
and not self._looks_like_connect_timeout(send_err)
and not self._looks_like_pool_timeout(send_err)
and not is_pool_timeout
):
raise
if is_pool_timeout:
await self._drain_general_connections_after_pool_timeout()
if _send_attempt < 2:
wait = 2 ** _send_attempt
logger.warning("[%s] Network error on send (attempt %d/3), retrying in %ds: %s",

View file

@ -11,6 +11,7 @@ avoid retrying with a partial topic route that can render outside the lane.
import sys
import types
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
@ -1338,6 +1339,46 @@ async def test_send_retries_pool_timeout():
assert attempt[0] == 3
@pytest.mark.asyncio
async def test_send_drains_general_request_pool_before_retrying_pool_timeout():
"""Pool timeout should reset the send-message request pool before retrying."""
adapter = _make_adapter()
general_request = SimpleNamespace(
shutdown=AsyncMock(),
initialize=AsyncMock(),
)
polling_request = SimpleNamespace(
shutdown=AsyncMock(),
initialize=AsyncMock(),
)
adapter._app = SimpleNamespace(
bot=SimpleNamespace(_request=(polling_request, general_request))
)
attempt = [0]
async def mock_send_message(**kwargs):
attempt[0] += 1
if attempt[0] == 1:
raise FakeTimedOut(
"Pool timeout: All connections in the connection pool are "
"occupied. Request was *not* sent to Telegram."
)
return SimpleNamespace(message_id=203)
adapter._bot = SimpleNamespace(send_message=mock_send_message)
result = await adapter.send(chat_id="123", content="test message")
assert result.success is True
assert result.message_id == "203"
assert attempt[0] == 2
general_request.shutdown.assert_awaited_once()
general_request.initialize.assert_awaited_once()
polling_request.shutdown.assert_not_awaited()
polling_request.initialize.assert_not_awaited()
@pytest.mark.asyncio
async def test_send_marks_pool_timeout_retryable_after_exhaustion():
"""Pool timeout that never clears stays retryable for outer retry handling."""