From 8501caf51f6d0ff5782a0a00c85a319a449d781b Mon Sep 17 00:00:00 2001 From: agt-user <267614622+agt-user@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:51:45 -0700 Subject: [PATCH] fix(telegram): persistent heartbeat loop to detect CLOSE-WAIT polling sockets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a Telegram long-poll TCP socket enters CLOSE-WAIT (remote sent FIN but httpx hasn't noticed), epoll still reports it readable so no exception is raised. PTB's error_callback never fires, the reconnect ladder never engages, and the gateway silently stops receiving messages while the process stays alive — until a manual systemctl restart. The existing recovery only covers two cases: error_callback-driven reconnects (which require an exception PTB never gets) and a one-shot _verify_polling_after_reconnect probe (which runs only right after an explicit reconnect). A socket that wedges during steady-state operation is never detected. Add _polling_heartbeat_loop: a background asyncio.Task started in connect() (polling mode only) that probes get_me() every 90s on the general request pool (not the getUpdates pool, so healthy long-polls are never interrupted). On asyncio.TimeoutError/OSError it hands off to the existing _handle_polling_network_error ladder; other errors are swallowed. disconnect() cancels and awaits the task. Worst-case detection window ~105s. Complementary to #51541 (general-pool keepalive limits / fd leak) — that recycles idle pooled connections; this detects a wedged active read. Fixes #48495 Co-authored-by: agt-user <267614622+agt-user@users.noreply.github.com> --- plugins/platforms/telegram/adapter.py | 72 ++++++ .../test_telegram_network_reconnect.py | 208 ++++++++++++++++++ 2 files changed, 280 insertions(+) diff --git a/plugins/platforms/telegram/adapter.py b/plugins/platforms/telegram/adapter.py index 7cd6b272796..18649af6002 100644 --- a/plugins/platforms/telegram/adapter.py +++ b/plugins/platforms/telegram/adapter.py @@ -537,6 +537,7 @@ class TelegramAdapter(BasePlatformAdapter): self._polling_conflict_count: int = 0 self._polling_network_error_count: int = 0 self._polling_error_callback_ref = None + self._polling_heartbeat_task: Optional[asyncio.Task] = None # After sustained reconnect storms the PTB httpx pool can return # SendResult(success=True) for sends that never actually transmit. # _handle_polling_network_error sets this; _verify_polling_after_reconnect @@ -1714,6 +1715,57 @@ class TelegramAdapter(BasePlatformAdapter): self._background_tasks.add(task) task.add_done_callback(self._background_tasks.discard) + async def _polling_heartbeat_loop(self) -> None: + """Detect dead Telegram TCP sockets (CLOSE-WAIT) by periodic probing. + + PTB's long-poll task blocks on epoll waiting for Telegram to push an + update. When the underlying TCP connection enters CLOSE-WAIT (the remote + sent a FIN but the httpx pool has not yet noticed), epoll still reports + the socket as readable and no exception is raised — so PTB's + ``error_callback`` never fires and the gateway silently stops receiving + messages. + + This loop probes ``get_me()`` every ``HEARTBEAT_INTERVAL`` seconds on the + *general* request path (not the getUpdates pool), so a healthy long-poll + waiting for the 30-second Telegram window is never interrupted. On any + connect-level failure the loop hands off to + ``_handle_polling_network_error`` — the same path triggered by PTB's own + ``error_callback`` — which drains the dead pool and restarts polling. + + Unlike ``_verify_polling_after_reconnect`` (a one-shot probe scheduled + only after an explicit reconnect), this loop runs for the full lifetime + of the polling connection, so it catches a socket that wedges during + steady-state operation without any prior error event. + """ + HEARTBEAT_INTERVAL = 90 # seconds between probes + PROBE_TIMEOUT = 15 # seconds before declaring the path dead + + while True: + try: + await asyncio.sleep(HEARTBEAT_INTERVAL) + if self.has_fatal_error: + return + if not (self._app and self._app.bot): + continue + await asyncio.wait_for(self._app.bot.get_me(), PROBE_TIMEOUT) + except asyncio.CancelledError: + return + except (asyncio.TimeoutError, OSError) as probe_err: + logger.warning( + "[%s] Polling heartbeat probe failed (%s); triggering reconnect", + self.name, probe_err, + ) + if self._polling_error_task and not self._polling_error_task.done(): + continue # reconnect already in progress + loop = asyncio.get_running_loop() + self._polling_error_task = loop.create_task( + self._handle_polling_network_error(probe_err) + ) + except Exception: + # Non-connectivity errors (e.g. TelegramError 401) are not + # CLOSE-WAIT symptoms — let PTB's own handlers surface them. + pass + async def _verify_polling_after_reconnect(self) -> None: """Heartbeat probe scheduled after a successful reconnect. @@ -2505,6 +2557,16 @@ class TelegramAdapter(BasePlatformAdapter): mode = "webhook" if self._webhook_mode else "polling" logger.info("[%s] Connected to Telegram (%s mode)", self.name, mode) + # Start the persistent heartbeat loop in polling mode. Webhook mode + # receives updates via incoming pushes — there is no long-poll + # socket to wedge in CLOSE-WAIT, so the loop is not needed there. + if not self._webhook_mode: + if self._polling_heartbeat_task and not self._polling_heartbeat_task.done(): + self._polling_heartbeat_task.cancel() + self._polling_heartbeat_task = asyncio.ensure_future( + self._polling_heartbeat_loop() + ) + # Surface the gateway as "Online" in the bot's short description # (opt-in via extra.status_indicator). Non-fatal. try: @@ -2563,6 +2625,16 @@ class TelegramAdapter(BasePlatformAdapter): async def disconnect(self) -> None: """Stop polling/webhook, cancel pending album flushes, and disconnect.""" + # Cancel the heartbeat before tearing down the app so the probe task + # cannot fire get_me() into a half-shutdown bot client. + if self._polling_heartbeat_task and not self._polling_heartbeat_task.done(): + self._polling_heartbeat_task.cancel() + try: + await self._polling_heartbeat_task + except asyncio.CancelledError: + pass + self._polling_heartbeat_task = None + # Mark the bot "Offline" in its short description while the bot's HTTP # client is still alive (before app shutdown closes it). Opt-in via # extra.status_indicator. Non-fatal. This is the clean-shutdown path; diff --git a/tests/gateway/test_telegram_network_reconnect.py b/tests/gateway/test_telegram_network_reconnect.py index bd9e9e3b7b0..8c0dc6a563f 100644 --- a/tests/gateway/test_telegram_network_reconnect.py +++ b/tests/gateway/test_telegram_network_reconnect.py @@ -473,3 +473,211 @@ async def test_reconnect_schedules_heartbeat_probe_on_success(): await t except (asyncio.CancelledError, Exception): pass + + +# ── Persistent heartbeat loop (_polling_heartbeat_loop) ────────────────────── +# +# These tests cover the continuous CLOSE-WAIT detection loop that fixes the bug +# (#48495) where a dead Telegram TCP socket caused the gateway to stop receiving +# messages silently. The _verify_polling_after_reconnect tests above cover the +# one-shot post-reconnect probe; these cover the background loop that runs for +# the gateway's full lifetime in polling mode. +# +# Loop structure: while True: sleep(INTERVAL) → fatal/app checks → get_me(). +# So with cancel raised on the Nth patched sleep, get_me() fires (N-1) times. + + +@pytest.mark.asyncio +async def test_heartbeat_loop_exits_cleanly_on_cancel(): + """The heartbeat loop must exit without raising when cancelled (normal shutdown).""" + adapter = _make_adapter() + + mock_app = MagicMock() + mock_app.bot.get_me = AsyncMock(return_value=MagicMock()) + adapter._app = mock_app + + sleep_count = 0 + + async def fast_sleep(seconds): + nonlocal sleep_count + sleep_count += 1 + # sleep #1 → get_me, sleep #2 → get_me, sleep #3 → cancel. + if sleep_count >= 3: + raise asyncio.CancelledError() + + with patch("asyncio.sleep", side_effect=fast_sleep): + # Should not raise — CancelledError is swallowed internally. + await adapter._polling_heartbeat_loop() + + assert mock_app.bot.get_me.await_count == 2 + + +@pytest.mark.asyncio +async def test_heartbeat_loop_triggers_reconnect_on_timeout(): + """A TimeoutError from get_me() must schedule a reconnect via _handle_polling_network_error.""" + adapter = _make_adapter() + adapter._handle_polling_network_error = AsyncMock() + + mock_app = MagicMock() + adapter._app = mock_app + + sleep_call = 0 + + async def fast_sleep(seconds): + nonlocal sleep_call + sleep_call += 1 + if sleep_call >= 3: + raise asyncio.CancelledError() + + async def fast_wait_for(coro, timeout): + if asyncio.iscoroutine(coro): + coro.close() + raise asyncio.TimeoutError() + + with patch("asyncio.sleep", side_effect=fast_sleep): + with patch("plugins.platforms.telegram.adapter.asyncio.wait_for", side_effect=fast_wait_for): + await adapter._polling_heartbeat_loop() + + # A reconnect task must have been created. + assert adapter._polling_error_task is not None + + +@pytest.mark.asyncio +async def test_heartbeat_loop_triggers_reconnect_on_os_error(): + """An OSError (e.g. connection reset) from get_me() must trigger a reconnect.""" + adapter = _make_adapter() + adapter._handle_polling_network_error = AsyncMock() + + mock_app = MagicMock() + adapter._app = mock_app + + sleep_call = 0 + + async def fast_sleep(seconds): + nonlocal sleep_call + sleep_call += 1 + if sleep_call >= 3: + raise asyncio.CancelledError() + + async def os_error_wait_for(coro, timeout): + if asyncio.iscoroutine(coro): + coro.close() + raise OSError("Connection reset by peer") + + with patch("asyncio.sleep", side_effect=fast_sleep): + with patch("plugins.platforms.telegram.adapter.asyncio.wait_for", side_effect=os_error_wait_for): + await adapter._polling_heartbeat_loop() + + assert adapter._polling_error_task is not None + + +@pytest.mark.asyncio +async def test_heartbeat_loop_skips_reconnect_if_already_in_progress(): + """If a reconnect task is already running, the heartbeat must not spawn another.""" + adapter = _make_adapter() + + # Simulate an already-running reconnect task. + existing_task = asyncio.get_event_loop().create_task(asyncio.sleep(3600)) + adapter._polling_error_task = existing_task + adapter._handle_polling_network_error = AsyncMock() + + mock_app = MagicMock() + adapter._app = mock_app + + sleep_call = 0 + + async def fast_sleep(seconds): + nonlocal sleep_call + sleep_call += 1 + if sleep_call >= 3: + raise asyncio.CancelledError() + + async def timeout_wait_for(coro, timeout): + if asyncio.iscoroutine(coro): + coro.close() + raise asyncio.TimeoutError() + + with patch("asyncio.sleep", side_effect=fast_sleep): + with patch("plugins.platforms.telegram.adapter.asyncio.wait_for", side_effect=timeout_wait_for): + await adapter._polling_heartbeat_loop() + + # _handle_polling_network_error must NOT have been called — existing task still running. + adapter._handle_polling_network_error.assert_not_awaited() + + existing_task.cancel() + try: + await existing_task + except (asyncio.CancelledError, Exception): + pass + + +@pytest.mark.asyncio +async def test_heartbeat_loop_ignores_non_connectivity_errors(): + """Errors that are not connectivity failures (e.g. TelegramError) must be swallowed.""" + adapter = _make_adapter() + adapter._handle_polling_network_error = AsyncMock() + + mock_app = MagicMock() + adapter._app = mock_app + + sleep_call = 0 + + async def fast_sleep(seconds): + nonlocal sleep_call + sleep_call += 1 + if sleep_call >= 3: + raise asyncio.CancelledError() + + async def telegram_error_wait_for(coro, timeout): + if asyncio.iscoroutine(coro): + coro.close() + raise RuntimeError("TelegramError: Unauthorized") # non-OSError, non-TimeoutError + + with patch("asyncio.sleep", side_effect=fast_sleep): + with patch("plugins.platforms.telegram.adapter.asyncio.wait_for", side_effect=telegram_error_wait_for): + await adapter._polling_heartbeat_loop() + + # No reconnect should have been triggered for a non-connectivity error. + adapter._handle_polling_network_error.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_heartbeat_loop_exits_on_fatal_error(): + """A fatal error short-circuits the loop before probing get_me().""" + adapter = _make_adapter() + adapter._set_fatal_error("telegram_network_error", "boom", retryable=True) + + mock_app = MagicMock() + mock_app.bot.get_me = AsyncMock(return_value=MagicMock()) + adapter._app = mock_app + + async def fast_sleep(seconds): + return None + + with patch("asyncio.sleep", side_effect=fast_sleep): + await adapter._polling_heartbeat_loop() + + # Fatal error returns before the get_me() probe. + mock_app.bot.get_me.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_disconnect_cancels_heartbeat_task(): + """disconnect() must cancel the heartbeat task before shutting down the app.""" + adapter = _make_adapter() + + # Simulate a running heartbeat. + heartbeat_task = asyncio.get_event_loop().create_task(asyncio.sleep(3600)) + adapter._polling_heartbeat_task = heartbeat_task + + mock_app = MagicMock() + mock_app.updater = MagicMock() + mock_app.updater.running = False + mock_app.running = False + mock_app.shutdown = AsyncMock() + adapter._app = mock_app + + await adapter.disconnect() + + assert heartbeat_task.cancelled(), "Heartbeat task must be cancelled by disconnect()" + assert adapter._polling_heartbeat_task is None