fix(telegram): detect wedged getUpdates consumer via pending_update_count

The merged CLOSE-WAIT heartbeat (#52744) only probes get_me(), which uses the
general request path and stays healthy while PTB's getUpdates consumer is
silently wedged (updater.running=True but the long-poll task is stuck, observed
on WSL2). DMs then queue in the Bot API and never reach handlers (#42909).

Augment the existing _polling_heartbeat_loop to also probe
get_webhook_info().pending_update_count. After two consecutive probes that see a
non-draining queue while the updater claims to be running, escalate into the
existing _handle_polling_network_error recovery ladder — no new restart
machinery. No-ops in webhook mode, when the updater is not running, or when a
reconnect is already in flight.

Credit to @gazzumatteo, whose PR #42959 identified the pending_update_count
signal as the missing liveness probe. This reuses the existing heartbeat +
recovery path rather than adding a parallel watchdog.

Fixes #42909.
This commit is contained in:
teknium1 2026-06-28 02:33:53 -07:00 committed by Teknium
parent 822b71cbf8
commit d5ba374c03
2 changed files with 194 additions and 0 deletions

View file

@ -417,6 +417,12 @@ class TelegramAdapter(BasePlatformAdapter):
self._polling_network_error_count: int = 0
self._polling_error_callback_ref = None
self._polling_heartbeat_task: Optional[asyncio.Task] = None
# Consecutive heartbeat probes that saw queued updates the running
# poller is not consuming. get_me() can't see this — the send path is
# healthy while the getUpdates consumer is wedged — so the heartbeat
# also probes get_webhook_info().pending_update_count and escalates to
# recovery after two consecutive stuck probes (#42909).
self._polling_pending_stuck_count: int = 0
# After sustained reconnect storms the PTB httpx pool can return
# SendResult(success=True) for sends that never actually transmit.
# _handle_polling_network_error sets this; _verify_polling_after_reconnect
@ -1689,6 +1695,16 @@ class TelegramAdapter(BasePlatformAdapter):
if not callable(getattr(bot, "get_me", None)):
return
await asyncio.wait_for(bot.get_me(), PROBE_TIMEOUT)
# get_me() succeeded — the general/send request path is healthy.
# That does NOT prove the getUpdates consumer is alive: PTB can
# report updater.running=True while the long-poll task is wedged,
# so DMs queue in the Bot API and never reach handlers (#42909).
# get_me() is blind to this; get_webhook_info() exposes it via
# pending_update_count. Escalate only after two consecutive
# probes see a non-zero queue while we believe we're polling, so
# a single in-flight update (consumed before the next probe)
# never trips recovery.
await self._probe_pending_updates(bot, PROBE_TIMEOUT)
except asyncio.CancelledError:
return
except (asyncio.TimeoutError, OSError) as probe_err:
@ -1707,6 +1723,67 @@ class TelegramAdapter(BasePlatformAdapter):
# CLOSE-WAIT symptoms — let PTB's own handlers surface them.
pass
async def _probe_pending_updates(self, bot, probe_timeout: float) -> None:
"""Detect a wedged getUpdates consumer via pending_update_count.
PTB can report ``updater.running == True`` while its long-poll task is
silently stuck (e.g. a socket that epoll keeps reporting readable on
WSL2). ``get_me()`` stays healthy because it uses the general request
path, so the CLOSE-WAIT heartbeat never fires yet DMs queue in the
Bot API and never reach handlers (#42909).
``get_webhook_info().pending_update_count`` is the one signal that
exposes this: a growing/stuck queue while we believe we're polling means
the consumer is dead. We only escalate after two consecutive stuck
probes so a single update that's simply in-flight between probes does
not trip a needless recovery. Recovery reuses
``_handle_polling_network_error`` the same ladder PTB's own
``error_callback`` feeds so no new restart machinery is introduced.
"""
# Only meaningful in polling mode with a running updater; in webhook
# mode Telegram pushes updates and holds no server-side queue.
if self._webhook_mode:
return
updater = getattr(self._app, "updater", None) if self._app else None
if updater is None or not getattr(updater, "running", False):
self._polling_pending_stuck_count = 0
return
get_webhook_info = getattr(bot, "get_webhook_info", None)
if not callable(get_webhook_info):
return
# A reconnect already in flight owns recovery — don't double-trigger.
if self._polling_error_task and not self._polling_error_task.done():
return
try:
info = await asyncio.wait_for(get_webhook_info(), probe_timeout) # type: ignore[arg-type]
except (asyncio.TimeoutError, OSError):
# A failed probe is a connectivity symptom the get_me() path or the
# outer handler will catch; don't treat it as a stuck-queue signal.
return
pending = int(getattr(info, "pending_update_count", 0) or 0)
if pending <= 0:
self._polling_pending_stuck_count = 0
return
self._polling_pending_stuck_count += 1
logger.warning(
"[%s] Telegram polling heartbeat: %d update(s) queued but not "
"consumed (stuck probe %d/2)",
self.name, pending, self._polling_pending_stuck_count,
)
if self._polling_pending_stuck_count >= 2:
self._polling_pending_stuck_count = 0
logger.warning(
"[%s] getUpdates consumer appears wedged (queue not draining); "
"triggering polling restart",
self.name,
)
loop = asyncio.get_running_loop()
self._polling_error_task = loop.create_task(
self._handle_polling_network_error(
RuntimeError("getUpdates consumer wedged: pending updates not draining")
)
)
async def _verify_polling_after_reconnect(self) -> None:
"""Heartbeat probe scheduled after a successful reconnect.

View file

@ -0,0 +1,117 @@
"""TelegramAdapter wedged-getUpdates detection via pending_update_count.
PTB can report ``updater.running == True`` while its long-poll consumer is
silently stuck (observed on WSL2), so DMs queue in the Bot API and never reach
handlers (#42909). ``get_me()`` stays healthy (general request path), so the
CLOSE-WAIT heartbeat is blind to it. ``_probe_pending_updates`` watches
``get_webhook_info().pending_update_count`` and escalates to the existing
network-error recovery ladder after two consecutive stuck probes.
"""
import sys
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from gateway.config import PlatformConfig
def _ensure_telegram_mock():
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
return
mod = MagicMock()
mod.error.NetworkError = type("NetworkError", (OSError,), {})
mod.error.TimedOut = type("TimedOut", (OSError,), {})
mod.error.BadRequest = type("BadRequest", (Exception,), {})
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, mod)
sys.modules.setdefault("telegram.error", mod.error)
_ensure_telegram_mock()
from plugins.platforms.telegram.adapter import TelegramAdapter # noqa: E402
def _make_adapter(*, pending: int) -> TelegramAdapter:
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
adapter._webhook_mode = False
adapter._app = MagicMock()
adapter._app.updater.running = True
bot = MagicMock()
bot.get_webhook_info = AsyncMock(
return_value=MagicMock(pending_update_count=pending)
)
adapter._app.bot = bot
adapter._bot = bot
return adapter
@pytest.mark.asyncio
async def test_single_stuck_probe_does_not_escalate():
"""One probe with a queued update only increments the counter."""
adapter = _make_adapter(pending=3)
with patch.object(adapter, "_handle_polling_network_error", new=AsyncMock()) as rec:
await adapter._probe_pending_updates(adapter._app.bot, 5)
assert adapter._polling_pending_stuck_count == 1
rec.assert_not_called()
@pytest.mark.asyncio
async def test_two_consecutive_stuck_probes_trigger_recovery():
"""Second consecutive stuck probe routes into the recovery ladder."""
adapter = _make_adapter(pending=2)
recovery = AsyncMock()
with patch.object(adapter, "_handle_polling_network_error", new=recovery):
await adapter._probe_pending_updates(adapter._app.bot, 5)
assert adapter._polling_pending_stuck_count == 1
await adapter._probe_pending_updates(adapter._app.bot, 5)
# Let the scheduled recovery task run.
task = adapter._polling_error_task
assert task is not None
await task
recovery.assert_awaited_once()
# Counter resets after escalation so a fresh wedge starts from zero.
assert adapter._polling_pending_stuck_count == 0
@pytest.mark.asyncio
async def test_zero_pending_resets_counter():
"""A drained queue clears any prior stuck count without escalating."""
adapter = _make_adapter(pending=0)
adapter._polling_pending_stuck_count = 1
with patch.object(adapter, "_handle_polling_network_error", new=AsyncMock()) as rec:
await adapter._probe_pending_updates(adapter._app.bot, 5)
assert adapter._polling_pending_stuck_count == 0
rec.assert_not_called()
@pytest.mark.asyncio
async def test_webhook_mode_is_noop():
"""Webhook mode holds no server-side queue — probe never runs."""
adapter = _make_adapter(pending=9)
adapter._webhook_mode = True
await adapter._probe_pending_updates(adapter._app.bot, 5)
adapter._app.bot.get_webhook_info.assert_not_called()
assert adapter._polling_pending_stuck_count == 0
@pytest.mark.asyncio
async def test_no_probe_when_updater_not_running():
"""If the updater isn't running, recovery is already someone else's job."""
adapter = _make_adapter(pending=9)
adapter._app.updater.running = False
adapter._polling_pending_stuck_count = 1
await adapter._probe_pending_updates(adapter._app.bot, 5)
adapter._app.bot.get_webhook_info.assert_not_called()
assert adapter._polling_pending_stuck_count == 0
@pytest.mark.asyncio
async def test_reconnect_in_flight_skips_probe():
"""An active recovery task owns the connection — don't double-trigger."""
adapter = _make_adapter(pending=9)
inflight = MagicMock()
inflight.done.return_value = False
adapter._polling_error_task = inflight
await adapter._probe_pending_updates(adapter._app.bot, 5)
adapter._app.bot.get_webhook_info.assert_not_called()