diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index cbee25393e..188038a1ad 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -512,6 +512,17 @@ class TelegramAdapter(BasePlatformAdapter): self.name, attempt, ) self._polling_network_error_count = 0 + # start_polling() returning is necessary but not sufficient: + # PTB's Updater can be left in a state where `running` is True + # but the underlying long-poll task is wedged on a stale httpx + # connection and never makes progress. No error_callback fires + # in that state, so the reconnect ladder won't advance on its + # own. Schedule a deferred probe to detect the wedge and + # re-enter the ladder if needed. + if not self.has_fatal_error: + probe = asyncio.ensure_future(self._verify_polling_after_reconnect()) + self._background_tasks.add(probe) + probe.add_done_callback(self._background_tasks.discard) except Exception as retry_err: logger.warning("[%s] Telegram polling reconnect failed: %s", self.name, retry_err) # start_polling failed — polling is dead and no further error @@ -523,6 +534,50 @@ class TelegramAdapter(BasePlatformAdapter): self._background_tasks.add(task) task.add_done_callback(self._background_tasks.discard) + async def _verify_polling_after_reconnect(self) -> None: + """Heartbeat probe scheduled after a successful reconnect. + + PTB's Updater can survive a botched stop()+start_polling() cycle + with `running=True` but a wedged consumer task. No error callback + fires, so the reconnect ladder doesn't advance on its own. This + probe detects the wedge by: + + 1. Sleeping HEARTBEAT_PROBE_DELAY so a healthy long-poll has time + to complete at least one cycle. + 2. Verifying `Updater.running` is still True. + 3. Probing the bot endpoint with a tight asyncio timeout. A + wedged httpx pool fails this probe; a healthy one returns + well under the timeout. + + On any failure, re-enter the reconnect ladder so the existing + MAX_NETWORK_RETRIES path can ultimately escalate to fatal-error. + """ + HEARTBEAT_PROBE_DELAY = 60 + PROBE_TIMEOUT = 10 + + await asyncio.sleep(HEARTBEAT_PROBE_DELAY) + + if self.has_fatal_error: + return + if not (self._app and self._app.updater and self._app.updater.running): + logger.warning( + "[%s] Updater not running %ds after reconnect — treating as wedged", + self.name, HEARTBEAT_PROBE_DELAY, + ) + await self._handle_polling_network_error( + RuntimeError("Updater not running after reconnect heartbeat") + ) + return + + try: + await asyncio.wait_for(self._app.bot.get_me(), PROBE_TIMEOUT) + except Exception as probe_err: + logger.warning( + "[%s] Polling heartbeat probe failed %ds after reconnect: %s", + self.name, HEARTBEAT_PROBE_DELAY, probe_err, + ) + await self._handle_polling_network_error(probe_err) + async def _handle_polling_conflict(self, error: Exception) -> None: if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict": return diff --git a/tests/gateway/test_telegram_network_reconnect.py b/tests/gateway/test_telegram_network_reconnect.py index 532639b2db..81b7bed12e 100644 --- a/tests/gateway/test_telegram_network_reconnect.py +++ b/tests/gateway/test_telegram_network_reconnect.py @@ -132,6 +132,7 @@ async def test_reconnect_success_resets_error_count(): mock_app = MagicMock() mock_app.updater = mock_updater + mock_app.bot.get_me = AsyncMock(return_value=MagicMock()) # heartbeat probe path adapter._app = mock_app with patch("asyncio.sleep", new_callable=AsyncMock): @@ -139,6 +140,15 @@ async def test_reconnect_success_resets_error_count(): assert adapter._polling_network_error_count == 0 + # Clean up the heartbeat-probe task scheduled after a successful reconnect. + pending = [t for t in adapter._background_tasks if not t.done()] + for t in pending: + t.cancel() + try: + await t + except (asyncio.CancelledError, Exception): + pass + @pytest.mark.asyncio async def test_reconnect_triggers_fatal_after_max_retries(): @@ -284,3 +294,182 @@ async def test_drain_helper_noop_without_app(): adapter._app = None # Should not raise await adapter._drain_polling_connections() + + +# ── Heartbeat probe ────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_heartbeat_probe_no_op_when_polling_healthy(): + """ + Probe scheduled after a successful reconnect: Updater.running=True and + bot.get_me() returns quickly → recovery confirmed, no further action. + """ + adapter = _make_adapter() + + mock_updater = MagicMock() + mock_updater.running = True + + mock_app = MagicMock() + mock_app.updater = mock_updater + mock_app.bot.get_me = AsyncMock(return_value=MagicMock()) + adapter._app = mock_app + + adapter._handle_polling_network_error = AsyncMock() + + with patch("asyncio.sleep", new_callable=AsyncMock): + await adapter._verify_polling_after_reconnect() + + mock_app.bot.get_me.assert_awaited_once() + adapter._handle_polling_network_error.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_heartbeat_probe_reenters_ladder_when_updater_not_running(): + """ + If Updater.running has flipped to False by the heartbeat delay, treat + as wedged: re-enter the reconnect ladder. + """ + adapter = _make_adapter() + + mock_updater = MagicMock() + mock_updater.running = False + + mock_app = MagicMock() + mock_app.updater = mock_updater + mock_app.bot.get_me = AsyncMock() + adapter._app = mock_app + + adapter._handle_polling_network_error = AsyncMock() + + with patch("asyncio.sleep", new_callable=AsyncMock): + await adapter._verify_polling_after_reconnect() + + mock_app.bot.get_me.assert_not_called() + adapter._handle_polling_network_error.assert_awaited_once() + err = adapter._handle_polling_network_error.await_args.args[0] + assert isinstance(err, RuntimeError) + assert "not running" in str(err).lower() + + +@pytest.mark.asyncio +async def test_heartbeat_probe_reenters_ladder_when_get_me_times_out(): + """ + If bot.get_me() hangs longer than PROBE_TIMEOUT, treat as wedged. + Simulates the connection-pool wedge that motivated this fix. + """ + adapter = _make_adapter() + + mock_updater = MagicMock() + mock_updater.running = True + + async def hang_forever(*args, **kwargs): + await asyncio.sleep(3600) + + mock_app = MagicMock() + mock_app.updater = mock_updater + mock_app.bot.get_me = AsyncMock(side_effect=hang_forever) + adapter._app = mock_app + + adapter._handle_polling_network_error = AsyncMock() + + async def fast_wait_for(coro, timeout): + if asyncio.iscoroutine(coro): + coro.close() + raise asyncio.TimeoutError() + + with patch("asyncio.sleep", new_callable=AsyncMock): + with patch("gateway.platforms.telegram.asyncio.wait_for", new=fast_wait_for): + await adapter._verify_polling_after_reconnect() + + adapter._handle_polling_network_error.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_heartbeat_probe_reenters_ladder_on_get_me_network_error(): + """ + Any exception raised by bot.get_me() (NetworkError, ConnectionError, etc.) + should re-enter the reconnect ladder with the original exception. + """ + adapter = _make_adapter() + + mock_updater = MagicMock() + mock_updater.running = True + + mock_app = MagicMock() + mock_app.updater = mock_updater + mock_app.bot.get_me = AsyncMock(side_effect=ConnectionError("pool wedged")) + adapter._app = mock_app + + adapter._handle_polling_network_error = AsyncMock() + + with patch("asyncio.sleep", new_callable=AsyncMock): + await adapter._verify_polling_after_reconnect() + + adapter._handle_polling_network_error.assert_awaited_once() + assert isinstance( + adapter._handle_polling_network_error.await_args.args[0], ConnectionError + ) + + +@pytest.mark.asyncio +async def test_heartbeat_probe_skips_when_already_fatal(): + """ + If the adapter is already in fatal-error state by the time the probe + delay elapses, the probe should bail without further action. + """ + adapter = _make_adapter() + adapter._set_fatal_error("telegram_polling_conflict", "already fatal", retryable=False) + + mock_app = MagicMock() + mock_app.bot.get_me = AsyncMock() + adapter._app = mock_app + + adapter._handle_polling_network_error = AsyncMock() + + with patch("asyncio.sleep", new_callable=AsyncMock): + await adapter._verify_polling_after_reconnect() + + mock_app.bot.get_me.assert_not_called() + adapter._handle_polling_network_error.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_reconnect_schedules_heartbeat_probe_on_success(): + """ + After a successful start_polling() in the reconnect path, a probe task + must be added to _background_tasks. Without it, a wedged Updater would + sit silent indefinitely with no further error_callback to advance the + reconnect ladder. + """ + adapter = _make_adapter() + adapter._polling_network_error_count = 1 + + mock_updater = MagicMock() + mock_updater.running = True + mock_updater.stop = AsyncMock() + mock_updater.start_polling = AsyncMock() # succeeds + + mock_app = MagicMock() + mock_app.updater = mock_updater + mock_app.bot.get_me = AsyncMock(return_value=MagicMock()) + adapter._app = mock_app + + initial_count = len(adapter._background_tasks) + + with patch("asyncio.sleep", new_callable=AsyncMock): + await adapter._handle_polling_network_error(Exception("Bad Gateway")) + + assert len(adapter._background_tasks) > initial_count, ( + "Expected a heartbeat probe task to be scheduled after a successful " + "reconnect's start_polling()" + ) + + # Clean up. + pending = [t for t in adapter._background_tasks if not t.done()] + for t in pending: + t.cancel() + try: + await t + except (asyncio.CancelledError, Exception): + pass