diff --git a/plugins/platforms/telegram/adapter.py b/plugins/platforms/telegram/adapter.py index 11b6f603295..b379909026b 100644 --- a/plugins/platforms/telegram/adapter.py +++ b/plugins/platforms/telegram/adapter.py @@ -1696,6 +1696,69 @@ class TelegramAdapter(BasePlatformAdapter): ) await self._handle_polling_network_error(probe_err) + def _disarm_ptb_retry_loop(self) -> None: + """Synchronously stop PTB's internal polling retry loop. + + PTB wraps ``getUpdates`` in ``network_retry_loop`` with + ``max_retries=-1`` (retry forever). When a ``TelegramError`` (including + a 409 ``Conflict``) fires, that loop calls our ``error_callback`` + *synchronously*, then sleeps and re-checks ``while is_running()`` before + polling again. Our ``error_callback`` only schedules an async recovery + task (``loop.create_task(...)``) and returns immediately, so PTB's loop + keeps polling while our handler concurrently runs + ``stop -> sleep -> start_polling``. The two polling sessions overlap and + Telegram returns a fresh 409 — a self-inflicted conflict loop on a + ~31s cadence. + + The loop is wired with ``is_running=lambda: updater.running`` and a + private ``stop_event`` (``do_action`` races that event and returns the + moment it is set). Setting that event *synchronously inside the + callback* — before it returns — makes PTB's loop exit on its own next + tick instead of racing our recovery. Our async handler then performs + the real ``await updater.stop()`` (idempotent) followed by + drain + ``start_polling()``, which builds a fresh ``stop_event`` so the + restart is not poisoned. + + Best-effort and defensive: PTB names the attribute differently across + versions (``_Updater__polling_task_stop_event`` via name-mangling), so + we probe for both spellings. If neither is found we do nothing and + fall back to the prior behaviour (async ``updater.stop()`` racing PTB) — + i.e. we never make things worse than before. + + We deliberately do NOT fall back to flipping ``updater._running``: + ``stop()`` raises ``RuntimeError`` when ``running`` is already False and + our recovery handler guards its ``stop()`` call on ``running``, so + clearing the flag here would skip the real teardown and leave PTB's + stop_event uncleared — poisoning the subsequent ``start_polling()``. + The stop_event lever leaves ``_running`` True, so the handler's + ``await updater.stop()`` still runs, drains the polling task, and clears + the event for a clean restart. + """ + updater = getattr(self._app, "updater", None) if self._app else None + if updater is None: + return + # Preferred (and only) lever: PTB's polling stop_event. Name-mangled on + # Updater, so probe both the mangled and unmangled spellings. + for attr in ( + "_Updater__polling_task_stop_event", + "_polling_task_stop_event", + ): + stop_event = getattr(updater, attr, None) + if isinstance(stop_event, asyncio.Event): + if not stop_event.is_set(): + stop_event.set() + logger.debug( + "[%s] Disarmed PTB polling retry loop via %s", + self.name, attr, + ) + return + logger.debug( + "[%s] Could not disarm PTB polling retry loop " + "(stop_event not found on this PTB version); " + "falling back to async stop()", + self.name, + ) + async def _handle_polling_conflict(self, error: Exception) -> None: if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict": return @@ -2388,6 +2451,14 @@ class TelegramAdapter(BasePlatformAdapter): if self._polling_error_task and not self._polling_error_task.done(): return if self._looks_like_polling_conflict(error): + # Synchronously stop PTB's internal network_retry_loop + # BEFORE scheduling our async recovery task. PTB calls + # this callback synchronously inside its loop and then + # keeps polling on its own; if we only schedule a task + # here, PTB's retry and our stop->restart overlap and + # produce a fresh 409. Disarming the loop now makes it + # exit on its next tick so recovery owns polling alone. + self._disarm_ptb_retry_loop() self._polling_error_task = loop.create_task(self._handle_polling_conflict(error)) elif self._looks_like_network_error(error): logger.warning("[%s] Telegram network error, scheduling reconnect: %s", self.name, error) diff --git a/tests/gateway/test_telegram_conflict.py b/tests/gateway/test_telegram_conflict.py index def45a822d7..6236d3655f1 100644 --- a/tests/gateway/test_telegram_conflict.py +++ b/tests/gateway/test_telegram_conflict.py @@ -489,3 +489,113 @@ async def test_reconnect_preserves_pending_updates(monkeypatch): assert ok is True assert captured["drop_pending_updates"] is False await _cancel_heartbeat(adapter) + + +@pytest.mark.asyncio +async def test_disarm_sets_ptb_stop_event(): + """_disarm_ptb_retry_loop sets PTB's name-mangled polling stop_event. + + This is the root-cause fix for the 409 conflict loop (#30122): the + error_callback must synchronously signal PTB's internal network_retry_loop + to stop BEFORE our async recovery task restarts polling, otherwise the two + polling sessions overlap and produce a fresh 409. + """ + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***")) + + stop_event = asyncio.Event() + # PTB stores it name-mangled as _Updater__polling_task_stop_event. + updater = SimpleNamespace(running=True) + setattr(updater, "_Updater__polling_task_stop_event", stop_event) + adapter._app = SimpleNamespace(updater=updater) + + assert not stop_event.is_set() + adapter._disarm_ptb_retry_loop() + assert stop_event.is_set(), "disarm must set PTB's polling stop_event" + # Must not flip _running — the recovery handler's stop() guards on running + # and stop() raises if running is already False. + assert updater.running is True + + +@pytest.mark.asyncio +async def test_disarm_noop_when_stop_event_absent(): + """When PTB exposes no stop_event, disarm is a safe no-op (no regression). + + It must NOT flip _running (which would make the handler skip stop() and + leave the loop wedged) — it just falls back to the prior async stop() race. + """ + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***")) + updater = SimpleNamespace(running=True, _running=True) + adapter._app = SimpleNamespace(updater=updater) + + adapter._disarm_ptb_retry_loop() # no stop_event attribute present + + assert updater.running is True + assert updater._running is True, "disarm must not flip _running as a fallback" + + +@pytest.mark.asyncio +async def test_conflict_callback_disarms_before_scheduling(monkeypatch): + """The polling error_callback disarms PTB synchronously, then schedules + recovery — proving the fix is wired into the live callback, not just the + helper (#30122).""" + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***")) + fatal_handler = AsyncMock() + adapter.set_fatal_error_handler(fatal_handler) + + monkeypatch.setattr( + "gateway.status.acquire_scoped_lock", + lambda scope, identity, metadata=None: (True, None), + ) + monkeypatch.setattr( + "gateway.status.release_scoped_lock", + lambda scope, identity: None, + ) + monkeypatch.setattr("asyncio.sleep", AsyncMock()) + + captured = {} + + async def fake_start_polling(**kwargs): + captured["error_callback"] = kwargs["error_callback"] + + stop_event = asyncio.Event() + updater = SimpleNamespace( + start_polling=AsyncMock(side_effect=fake_start_polling), + stop=AsyncMock(), + running=True, + ) + setattr(updater, "_Updater__polling_task_stop_event", stop_event) + bot = SimpleNamespace(set_my_commands=AsyncMock(), delete_webhook=AsyncMock()) + app = SimpleNamespace( + bot=bot, + updater=updater, + add_handler=MagicMock(), + initialize=AsyncMock(), + start=AsyncMock(), + ) + builder = MagicMock() + builder.token.return_value = builder + builder.request.return_value = builder + builder.get_updates_request.return_value = builder + builder.build.return_value = app + monkeypatch.setattr( + "plugins.platforms.telegram.adapter.Application", + SimpleNamespace(builder=MagicMock(return_value=builder)), + ) + + ok = await adapter.connect() + assert ok is True + + conflict = type("Conflict", (Exception,), {}) + # Fire a 409 through the live callback. The disarm must happen + # synchronously (before any await), so the stop_event is set immediately + # on return — before the scheduled recovery task gets a chance to run. + assert not stop_event.is_set() + captured["error_callback"](conflict("Conflict: terminated by other getUpdates")) + assert stop_event.is_set(), "callback must disarm PTB synchronously" + assert adapter._polling_error_task is not None, "recovery task must be scheduled" + + # Drain the scheduled recovery task so it doesn't outlive the test. + for _ in range(10): + await asyncio.sleep(0) + await _cancel_heartbeat(adapter) +