fix(telegram): kill 409 polling conflict loop by disarming PTB retry synchronously (#53941)

Telegram polling entered a self-inflicted ~31s loop of 409 Conflict ->
retry -> resume -> Conflict. The error_callback PTB invokes synchronously
inside its internal network_retry_loop only scheduled our async recovery
task (loop.create_task) and returned, so PTB kept polling getUpdates on its
own while our handler concurrently ran stop -> sleep -> start_polling. The
two polling sessions overlapped and Telegram returned a fresh 409.

Fix: in the conflict branch of the error_callback, synchronously set PTB's
private polling stop_event before scheduling recovery. PTB's loop exits on
its next tick (it races that event in do_action), so our handler owns
polling alone. The handler's await updater.stop() drains the task and PTB
clears the event, so the subsequent start_polling() builds a fresh event
and is not poisoned.

Keeps the existing reconnect ladder intact (option B) — fixes only the
race. Defensive: probes mangled + unmangled stop_event spellings and no-ops
(prior behaviour) if neither exists; never flips _running, which would make
the handler skip stop() and leave the loop wedged.
This commit is contained in:
Teknium 2026-06-27 20:46:08 -07:00 committed by GitHub
parent d43e0cf304
commit f03823014b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 181 additions and 0 deletions

View file

@ -1696,6 +1696,69 @@ class TelegramAdapter(BasePlatformAdapter):
)
await self._handle_polling_network_error(probe_err)
def _disarm_ptb_retry_loop(self) -> None:
"""Synchronously stop PTB's internal polling retry loop.
PTB wraps ``getUpdates`` in ``network_retry_loop`` with
``max_retries=-1`` (retry forever). When a ``TelegramError`` (including
a 409 ``Conflict``) fires, that loop calls our ``error_callback``
*synchronously*, then sleeps and re-checks ``while is_running()`` before
polling again. Our ``error_callback`` only schedules an async recovery
task (``loop.create_task(...)``) and returns immediately, so PTB's loop
keeps polling while our handler concurrently runs
``stop -> sleep -> start_polling``. The two polling sessions overlap and
Telegram returns a fresh 409 a self-inflicted conflict loop on a
~31s cadence.
The loop is wired with ``is_running=lambda: updater.running`` and a
private ``stop_event`` (``do_action`` races that event and returns the
moment it is set). Setting that event *synchronously inside the
callback* before it returns makes PTB's loop exit on its own next
tick instead of racing our recovery. Our async handler then performs
the real ``await updater.stop()`` (idempotent) followed by
drain + ``start_polling()``, which builds a fresh ``stop_event`` so the
restart is not poisoned.
Best-effort and defensive: PTB names the attribute differently across
versions (``_Updater__polling_task_stop_event`` via name-mangling), so
we probe for both spellings. If neither is found we do nothing and
fall back to the prior behaviour (async ``updater.stop()`` racing PTB)
i.e. we never make things worse than before.
We deliberately do NOT fall back to flipping ``updater._running``:
``stop()`` raises ``RuntimeError`` when ``running`` is already False and
our recovery handler guards its ``stop()`` call on ``running``, so
clearing the flag here would skip the real teardown and leave PTB's
stop_event uncleared poisoning the subsequent ``start_polling()``.
The stop_event lever leaves ``_running`` True, so the handler's
``await updater.stop()`` still runs, drains the polling task, and clears
the event for a clean restart.
"""
updater = getattr(self._app, "updater", None) if self._app else None
if updater is None:
return
# Preferred (and only) lever: PTB's polling stop_event. Name-mangled on
# Updater, so probe both the mangled and unmangled spellings.
for attr in (
"_Updater__polling_task_stop_event",
"_polling_task_stop_event",
):
stop_event = getattr(updater, attr, None)
if isinstance(stop_event, asyncio.Event):
if not stop_event.is_set():
stop_event.set()
logger.debug(
"[%s] Disarmed PTB polling retry loop via %s",
self.name, attr,
)
return
logger.debug(
"[%s] Could not disarm PTB polling retry loop "
"(stop_event not found on this PTB version); "
"falling back to async stop()",
self.name,
)
async def _handle_polling_conflict(self, error: Exception) -> None:
if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict":
return
@ -2388,6 +2451,14 @@ class TelegramAdapter(BasePlatformAdapter):
if self._polling_error_task and not self._polling_error_task.done():
return
if self._looks_like_polling_conflict(error):
# Synchronously stop PTB's internal network_retry_loop
# BEFORE scheduling our async recovery task. PTB calls
# this callback synchronously inside its loop and then
# keeps polling on its own; if we only schedule a task
# here, PTB's retry and our stop->restart overlap and
# produce a fresh 409. Disarming the loop now makes it
# exit on its next tick so recovery owns polling alone.
self._disarm_ptb_retry_loop()
self._polling_error_task = loop.create_task(self._handle_polling_conflict(error))
elif self._looks_like_network_error(error):
logger.warning("[%s] Telegram network error, scheduling reconnect: %s", self.name, error)

View file

@ -489,3 +489,113 @@ async def test_reconnect_preserves_pending_updates(monkeypatch):
assert ok is True
assert captured["drop_pending_updates"] is False
await _cancel_heartbeat(adapter)
@pytest.mark.asyncio
async def test_disarm_sets_ptb_stop_event():
"""_disarm_ptb_retry_loop sets PTB's name-mangled polling stop_event.
This is the root-cause fix for the 409 conflict loop (#30122): the
error_callback must synchronously signal PTB's internal network_retry_loop
to stop BEFORE our async recovery task restarts polling, otherwise the two
polling sessions overlap and produce a fresh 409.
"""
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
stop_event = asyncio.Event()
# PTB stores it name-mangled as _Updater__polling_task_stop_event.
updater = SimpleNamespace(running=True)
setattr(updater, "_Updater__polling_task_stop_event", stop_event)
adapter._app = SimpleNamespace(updater=updater)
assert not stop_event.is_set()
adapter._disarm_ptb_retry_loop()
assert stop_event.is_set(), "disarm must set PTB's polling stop_event"
# Must not flip _running — the recovery handler's stop() guards on running
# and stop() raises if running is already False.
assert updater.running is True
@pytest.mark.asyncio
async def test_disarm_noop_when_stop_event_absent():
"""When PTB exposes no stop_event, disarm is a safe no-op (no regression).
It must NOT flip _running (which would make the handler skip stop() and
leave the loop wedged) it just falls back to the prior async stop() race.
"""
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
updater = SimpleNamespace(running=True, _running=True)
adapter._app = SimpleNamespace(updater=updater)
adapter._disarm_ptb_retry_loop() # no stop_event attribute present
assert updater.running is True
assert updater._running is True, "disarm must not flip _running as a fallback"
@pytest.mark.asyncio
async def test_conflict_callback_disarms_before_scheduling(monkeypatch):
"""The polling error_callback disarms PTB synchronously, then schedules
recovery proving the fix is wired into the live callback, not just the
helper (#30122)."""
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
fatal_handler = AsyncMock()
adapter.set_fatal_error_handler(fatal_handler)
monkeypatch.setattr(
"gateway.status.acquire_scoped_lock",
lambda scope, identity, metadata=None: (True, None),
)
monkeypatch.setattr(
"gateway.status.release_scoped_lock",
lambda scope, identity: None,
)
monkeypatch.setattr("asyncio.sleep", AsyncMock())
captured = {}
async def fake_start_polling(**kwargs):
captured["error_callback"] = kwargs["error_callback"]
stop_event = asyncio.Event()
updater = SimpleNamespace(
start_polling=AsyncMock(side_effect=fake_start_polling),
stop=AsyncMock(),
running=True,
)
setattr(updater, "_Updater__polling_task_stop_event", stop_event)
bot = SimpleNamespace(set_my_commands=AsyncMock(), delete_webhook=AsyncMock())
app = SimpleNamespace(
bot=bot,
updater=updater,
add_handler=MagicMock(),
initialize=AsyncMock(),
start=AsyncMock(),
)
builder = MagicMock()
builder.token.return_value = builder
builder.request.return_value = builder
builder.get_updates_request.return_value = builder
builder.build.return_value = app
monkeypatch.setattr(
"plugins.platforms.telegram.adapter.Application",
SimpleNamespace(builder=MagicMock(return_value=builder)),
)
ok = await adapter.connect()
assert ok is True
conflict = type("Conflict", (Exception,), {})
# Fire a 409 through the live callback. The disarm must happen
# synchronously (before any await), so the stop_event is set immediately
# on return — before the scheduled recovery task gets a chance to run.
assert not stop_event.is_set()
captured["error_callback"](conflict("Conflict: terminated by other getUpdates"))
assert stop_event.is_set(), "callback must disarm PTB synchronously"
assert adapter._polling_error_task is not None, "recovery task must be scheduled"
# Drain the scheduled recovery task so it doesn't outlive the test.
for _ in range(10):
await asyncio.sleep(0)
await _cancel_heartbeat(adapter)