diff --git a/plugins/platforms/telegram/adapter.py b/plugins/platforms/telegram/adapter.py index 43fc17bc410..9e456bc67bd 100644 --- a/plugins/platforms/telegram/adapter.py +++ b/plugins/platforms/telegram/adapter.py @@ -417,6 +417,12 @@ class TelegramAdapter(BasePlatformAdapter): self._polling_network_error_count: int = 0 self._polling_error_callback_ref = None self._polling_heartbeat_task: Optional[asyncio.Task] = None + # Consecutive heartbeat probes that saw queued updates the running + # poller is not consuming. get_me() can't see this — the send path is + # healthy while the getUpdates consumer is wedged — so the heartbeat + # also probes get_webhook_info().pending_update_count and escalates to + # recovery after two consecutive stuck probes (#42909). + self._polling_pending_stuck_count: int = 0 # After sustained reconnect storms the PTB httpx pool can return # SendResult(success=True) for sends that never actually transmit. # _handle_polling_network_error sets this; _verify_polling_after_reconnect @@ -1689,6 +1695,16 @@ class TelegramAdapter(BasePlatformAdapter): if not callable(getattr(bot, "get_me", None)): return await asyncio.wait_for(bot.get_me(), PROBE_TIMEOUT) + # get_me() succeeded — the general/send request path is healthy. + # That does NOT prove the getUpdates consumer is alive: PTB can + # report updater.running=True while the long-poll task is wedged, + # so DMs queue in the Bot API and never reach handlers (#42909). + # get_me() is blind to this; get_webhook_info() exposes it via + # pending_update_count. Escalate only after two consecutive + # probes see a non-zero queue while we believe we're polling, so + # a single in-flight update (consumed before the next probe) + # never trips recovery. + await self._probe_pending_updates(bot, PROBE_TIMEOUT) except asyncio.CancelledError: return except (asyncio.TimeoutError, OSError) as probe_err: @@ -1707,6 +1723,67 @@ class TelegramAdapter(BasePlatformAdapter): # CLOSE-WAIT symptoms — let PTB's own handlers surface them. pass + async def _probe_pending_updates(self, bot, probe_timeout: float) -> None: + """Detect a wedged getUpdates consumer via pending_update_count. + + PTB can report ``updater.running == True`` while its long-poll task is + silently stuck (e.g. a socket that epoll keeps reporting readable on + WSL2). ``get_me()`` stays healthy because it uses the general request + path, so the CLOSE-WAIT heartbeat never fires — yet DMs queue in the + Bot API and never reach handlers (#42909). + + ``get_webhook_info().pending_update_count`` is the one signal that + exposes this: a growing/stuck queue while we believe we're polling means + the consumer is dead. We only escalate after two consecutive stuck + probes so a single update that's simply in-flight between probes does + not trip a needless recovery. Recovery reuses + ``_handle_polling_network_error`` — the same ladder PTB's own + ``error_callback`` feeds — so no new restart machinery is introduced. + """ + # Only meaningful in polling mode with a running updater; in webhook + # mode Telegram pushes updates and holds no server-side queue. + if self._webhook_mode: + return + updater = getattr(self._app, "updater", None) if self._app else None + if updater is None or not getattr(updater, "running", False): + self._polling_pending_stuck_count = 0 + return + get_webhook_info = getattr(bot, "get_webhook_info", None) + if not callable(get_webhook_info): + return + # A reconnect already in flight owns recovery — don't double-trigger. + if self._polling_error_task and not self._polling_error_task.done(): + return + try: + info = await asyncio.wait_for(get_webhook_info(), probe_timeout) # type: ignore[arg-type] + except (asyncio.TimeoutError, OSError): + # A failed probe is a connectivity symptom the get_me() path or the + # outer handler will catch; don't treat it as a stuck-queue signal. + return + pending = int(getattr(info, "pending_update_count", 0) or 0) + if pending <= 0: + self._polling_pending_stuck_count = 0 + return + self._polling_pending_stuck_count += 1 + logger.warning( + "[%s] Telegram polling heartbeat: %d update(s) queued but not " + "consumed (stuck probe %d/2)", + self.name, pending, self._polling_pending_stuck_count, + ) + if self._polling_pending_stuck_count >= 2: + self._polling_pending_stuck_count = 0 + logger.warning( + "[%s] getUpdates consumer appears wedged (queue not draining); " + "triggering polling restart", + self.name, + ) + loop = asyncio.get_running_loop() + self._polling_error_task = loop.create_task( + self._handle_polling_network_error( + RuntimeError("getUpdates consumer wedged: pending updates not draining") + ) + ) + async def _verify_polling_after_reconnect(self) -> None: """Heartbeat probe scheduled after a successful reconnect. diff --git a/tests/gateway/test_telegram_pending_update_probe.py b/tests/gateway/test_telegram_pending_update_probe.py new file mode 100644 index 00000000000..f02778b1dfb --- /dev/null +++ b/tests/gateway/test_telegram_pending_update_probe.py @@ -0,0 +1,117 @@ +"""TelegramAdapter wedged-getUpdates detection via pending_update_count. + +PTB can report ``updater.running == True`` while its long-poll consumer is +silently stuck (observed on WSL2), so DMs queue in the Bot API and never reach +handlers (#42909). ``get_me()`` stays healthy (general request path), so the +CLOSE-WAIT heartbeat is blind to it. ``_probe_pending_updates`` watches +``get_webhook_info().pending_update_count`` and escalates to the existing +network-error recovery ladder after two consecutive stuck probes. +""" +import sys +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import PlatformConfig + + +def _ensure_telegram_mock(): + if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"): + return + mod = MagicMock() + mod.error.NetworkError = type("NetworkError", (OSError,), {}) + mod.error.TimedOut = type("TimedOut", (OSError,), {}) + mod.error.BadRequest = type("BadRequest", (Exception,), {}) + for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"): + sys.modules.setdefault(name, mod) + sys.modules.setdefault("telegram.error", mod.error) + + +_ensure_telegram_mock() + +from plugins.platforms.telegram.adapter import TelegramAdapter # noqa: E402 + + +def _make_adapter(*, pending: int) -> TelegramAdapter: + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***")) + adapter._webhook_mode = False + adapter._app = MagicMock() + adapter._app.updater.running = True + bot = MagicMock() + bot.get_webhook_info = AsyncMock( + return_value=MagicMock(pending_update_count=pending) + ) + adapter._app.bot = bot + adapter._bot = bot + return adapter + + +@pytest.mark.asyncio +async def test_single_stuck_probe_does_not_escalate(): + """One probe with a queued update only increments the counter.""" + adapter = _make_adapter(pending=3) + with patch.object(adapter, "_handle_polling_network_error", new=AsyncMock()) as rec: + await adapter._probe_pending_updates(adapter._app.bot, 5) + assert adapter._polling_pending_stuck_count == 1 + rec.assert_not_called() + + +@pytest.mark.asyncio +async def test_two_consecutive_stuck_probes_trigger_recovery(): + """Second consecutive stuck probe routes into the recovery ladder.""" + adapter = _make_adapter(pending=2) + recovery = AsyncMock() + with patch.object(adapter, "_handle_polling_network_error", new=recovery): + await adapter._probe_pending_updates(adapter._app.bot, 5) + assert adapter._polling_pending_stuck_count == 1 + await adapter._probe_pending_updates(adapter._app.bot, 5) + # Let the scheduled recovery task run. + task = adapter._polling_error_task + assert task is not None + await task + recovery.assert_awaited_once() + # Counter resets after escalation so a fresh wedge starts from zero. + assert adapter._polling_pending_stuck_count == 0 + + +@pytest.mark.asyncio +async def test_zero_pending_resets_counter(): + """A drained queue clears any prior stuck count without escalating.""" + adapter = _make_adapter(pending=0) + adapter._polling_pending_stuck_count = 1 + with patch.object(adapter, "_handle_polling_network_error", new=AsyncMock()) as rec: + await adapter._probe_pending_updates(adapter._app.bot, 5) + assert adapter._polling_pending_stuck_count == 0 + rec.assert_not_called() + + +@pytest.mark.asyncio +async def test_webhook_mode_is_noop(): + """Webhook mode holds no server-side queue — probe never runs.""" + adapter = _make_adapter(pending=9) + adapter._webhook_mode = True + await adapter._probe_pending_updates(adapter._app.bot, 5) + adapter._app.bot.get_webhook_info.assert_not_called() + assert adapter._polling_pending_stuck_count == 0 + + +@pytest.mark.asyncio +async def test_no_probe_when_updater_not_running(): + """If the updater isn't running, recovery is already someone else's job.""" + adapter = _make_adapter(pending=9) + adapter._app.updater.running = False + adapter._polling_pending_stuck_count = 1 + await adapter._probe_pending_updates(adapter._app.bot, 5) + adapter._app.bot.get_webhook_info.assert_not_called() + assert adapter._polling_pending_stuck_count == 0 + + +@pytest.mark.asyncio +async def test_reconnect_in_flight_skips_probe(): + """An active recovery task owns the connection — don't double-trigger.""" + adapter = _make_adapter(pending=9) + inflight = MagicMock() + inflight.done.return_value = False + adapter._polling_error_task = inflight + await adapter._probe_pending_updates(adapter._app.bot, 5) + adapter._app.bot.get_webhook_info.assert_not_called()