fix(telegram): persistent heartbeat loop to detect CLOSE-WAIT polling sockets

When a Telegram long-poll TCP socket enters CLOSE-WAIT (remote sent FIN
but httpx hasn't noticed), epoll still reports it readable so no
exception is raised. PTB's error_callback never fires, the reconnect
ladder never engages, and the gateway silently stops receiving messages
while the process stays alive — until a manual systemctl restart.

The existing recovery only covers two cases: error_callback-driven
reconnects (which require an exception PTB never gets) and a one-shot
_verify_polling_after_reconnect probe (which runs only right after an
explicit reconnect). A socket that wedges during steady-state operation
is never detected.

Add _polling_heartbeat_loop: a background asyncio.Task started in
connect() (polling mode only) that probes get_me() every 90s on the
general request pool (not the getUpdates pool, so healthy long-polls are
never interrupted). On asyncio.TimeoutError/OSError it hands off to the
existing _handle_polling_network_error ladder; other errors are
swallowed. disconnect() cancels and awaits the task. Worst-case
detection window ~105s.

Complementary to #51541 (general-pool keepalive limits / fd leak) — that
recycles idle pooled connections; this detects a wedged active read.

Fixes #48495

Co-authored-by: agt-user <267614622+agt-user@users.noreply.github.com>
This commit is contained in:
agt-user 2026-06-23 23:51:45 -07:00 committed by Teknium
parent 56cf517ccd
commit 8501caf51f
2 changed files with 280 additions and 0 deletions

View file

@ -537,6 +537,7 @@ class TelegramAdapter(BasePlatformAdapter):
self._polling_conflict_count: int = 0
self._polling_network_error_count: int = 0
self._polling_error_callback_ref = None
self._polling_heartbeat_task: Optional[asyncio.Task] = None
# After sustained reconnect storms the PTB httpx pool can return
# SendResult(success=True) for sends that never actually transmit.
# _handle_polling_network_error sets this; _verify_polling_after_reconnect
@ -1714,6 +1715,57 @@ class TelegramAdapter(BasePlatformAdapter):
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
async def _polling_heartbeat_loop(self) -> None:
"""Detect dead Telegram TCP sockets (CLOSE-WAIT) by periodic probing.
PTB's long-poll task blocks on epoll waiting for Telegram to push an
update. When the underlying TCP connection enters CLOSE-WAIT (the remote
sent a FIN but the httpx pool has not yet noticed), epoll still reports
the socket as readable and no exception is raised so PTB's
``error_callback`` never fires and the gateway silently stops receiving
messages.
This loop probes ``get_me()`` every ``HEARTBEAT_INTERVAL`` seconds on the
*general* request path (not the getUpdates pool), so a healthy long-poll
waiting for the 30-second Telegram window is never interrupted. On any
connect-level failure the loop hands off to
``_handle_polling_network_error`` the same path triggered by PTB's own
``error_callback`` which drains the dead pool and restarts polling.
Unlike ``_verify_polling_after_reconnect`` (a one-shot probe scheduled
only after an explicit reconnect), this loop runs for the full lifetime
of the polling connection, so it catches a socket that wedges during
steady-state operation without any prior error event.
"""
HEARTBEAT_INTERVAL = 90 # seconds between probes
PROBE_TIMEOUT = 15 # seconds before declaring the path dead
while True:
try:
await asyncio.sleep(HEARTBEAT_INTERVAL)
if self.has_fatal_error:
return
if not (self._app and self._app.bot):
continue
await asyncio.wait_for(self._app.bot.get_me(), PROBE_TIMEOUT)
except asyncio.CancelledError:
return
except (asyncio.TimeoutError, OSError) as probe_err:
logger.warning(
"[%s] Polling heartbeat probe failed (%s); triggering reconnect",
self.name, probe_err,
)
if self._polling_error_task and not self._polling_error_task.done():
continue # reconnect already in progress
loop = asyncio.get_running_loop()
self._polling_error_task = loop.create_task(
self._handle_polling_network_error(probe_err)
)
except Exception:
# Non-connectivity errors (e.g. TelegramError 401) are not
# CLOSE-WAIT symptoms — let PTB's own handlers surface them.
pass
async def _verify_polling_after_reconnect(self) -> None:
"""Heartbeat probe scheduled after a successful reconnect.
@ -2505,6 +2557,16 @@ class TelegramAdapter(BasePlatformAdapter):
mode = "webhook" if self._webhook_mode else "polling"
logger.info("[%s] Connected to Telegram (%s mode)", self.name, mode)
# Start the persistent heartbeat loop in polling mode. Webhook mode
# receives updates via incoming pushes — there is no long-poll
# socket to wedge in CLOSE-WAIT, so the loop is not needed there.
if not self._webhook_mode:
if self._polling_heartbeat_task and not self._polling_heartbeat_task.done():
self._polling_heartbeat_task.cancel()
self._polling_heartbeat_task = asyncio.ensure_future(
self._polling_heartbeat_loop()
)
# Surface the gateway as "Online" in the bot's short description
# (opt-in via extra.status_indicator). Non-fatal.
try:
@ -2563,6 +2625,16 @@ class TelegramAdapter(BasePlatformAdapter):
async def disconnect(self) -> None:
"""Stop polling/webhook, cancel pending album flushes, and disconnect."""
# Cancel the heartbeat before tearing down the app so the probe task
# cannot fire get_me() into a half-shutdown bot client.
if self._polling_heartbeat_task and not self._polling_heartbeat_task.done():
self._polling_heartbeat_task.cancel()
try:
await self._polling_heartbeat_task
except asyncio.CancelledError:
pass
self._polling_heartbeat_task = None
# Mark the bot "Offline" in its short description while the bot's HTTP
# client is still alive (before app shutdown closes it). Opt-in via
# extra.status_indicator. Non-fatal. This is the clean-shutdown path;

View file

@ -473,3 +473,211 @@ async def test_reconnect_schedules_heartbeat_probe_on_success():
await t
except (asyncio.CancelledError, Exception):
pass
# ── Persistent heartbeat loop (_polling_heartbeat_loop) ──────────────────────
#
# These tests cover the continuous CLOSE-WAIT detection loop that fixes the bug
# (#48495) where a dead Telegram TCP socket caused the gateway to stop receiving
# messages silently. The _verify_polling_after_reconnect tests above cover the
# one-shot post-reconnect probe; these cover the background loop that runs for
# the gateway's full lifetime in polling mode.
#
# Loop structure: while True: sleep(INTERVAL) → fatal/app checks → get_me().
# So with cancel raised on the Nth patched sleep, get_me() fires (N-1) times.
@pytest.mark.asyncio
async def test_heartbeat_loop_exits_cleanly_on_cancel():
"""The heartbeat loop must exit without raising when cancelled (normal shutdown)."""
adapter = _make_adapter()
mock_app = MagicMock()
mock_app.bot.get_me = AsyncMock(return_value=MagicMock())
adapter._app = mock_app
sleep_count = 0
async def fast_sleep(seconds):
nonlocal sleep_count
sleep_count += 1
# sleep #1 → get_me, sleep #2 → get_me, sleep #3 → cancel.
if sleep_count >= 3:
raise asyncio.CancelledError()
with patch("asyncio.sleep", side_effect=fast_sleep):
# Should not raise — CancelledError is swallowed internally.
await adapter._polling_heartbeat_loop()
assert mock_app.bot.get_me.await_count == 2
@pytest.mark.asyncio
async def test_heartbeat_loop_triggers_reconnect_on_timeout():
"""A TimeoutError from get_me() must schedule a reconnect via _handle_polling_network_error."""
adapter = _make_adapter()
adapter._handle_polling_network_error = AsyncMock()
mock_app = MagicMock()
adapter._app = mock_app
sleep_call = 0
async def fast_sleep(seconds):
nonlocal sleep_call
sleep_call += 1
if sleep_call >= 3:
raise asyncio.CancelledError()
async def fast_wait_for(coro, timeout):
if asyncio.iscoroutine(coro):
coro.close()
raise asyncio.TimeoutError()
with patch("asyncio.sleep", side_effect=fast_sleep):
with patch("plugins.platforms.telegram.adapter.asyncio.wait_for", side_effect=fast_wait_for):
await adapter._polling_heartbeat_loop()
# A reconnect task must have been created.
assert adapter._polling_error_task is not None
@pytest.mark.asyncio
async def test_heartbeat_loop_triggers_reconnect_on_os_error():
"""An OSError (e.g. connection reset) from get_me() must trigger a reconnect."""
adapter = _make_adapter()
adapter._handle_polling_network_error = AsyncMock()
mock_app = MagicMock()
adapter._app = mock_app
sleep_call = 0
async def fast_sleep(seconds):
nonlocal sleep_call
sleep_call += 1
if sleep_call >= 3:
raise asyncio.CancelledError()
async def os_error_wait_for(coro, timeout):
if asyncio.iscoroutine(coro):
coro.close()
raise OSError("Connection reset by peer")
with patch("asyncio.sleep", side_effect=fast_sleep):
with patch("plugins.platforms.telegram.adapter.asyncio.wait_for", side_effect=os_error_wait_for):
await adapter._polling_heartbeat_loop()
assert adapter._polling_error_task is not None
@pytest.mark.asyncio
async def test_heartbeat_loop_skips_reconnect_if_already_in_progress():
"""If a reconnect task is already running, the heartbeat must not spawn another."""
adapter = _make_adapter()
# Simulate an already-running reconnect task.
existing_task = asyncio.get_event_loop().create_task(asyncio.sleep(3600))
adapter._polling_error_task = existing_task
adapter._handle_polling_network_error = AsyncMock()
mock_app = MagicMock()
adapter._app = mock_app
sleep_call = 0
async def fast_sleep(seconds):
nonlocal sleep_call
sleep_call += 1
if sleep_call >= 3:
raise asyncio.CancelledError()
async def timeout_wait_for(coro, timeout):
if asyncio.iscoroutine(coro):
coro.close()
raise asyncio.TimeoutError()
with patch("asyncio.sleep", side_effect=fast_sleep):
with patch("plugins.platforms.telegram.adapter.asyncio.wait_for", side_effect=timeout_wait_for):
await adapter._polling_heartbeat_loop()
# _handle_polling_network_error must NOT have been called — existing task still running.
adapter._handle_polling_network_error.assert_not_awaited()
existing_task.cancel()
try:
await existing_task
except (asyncio.CancelledError, Exception):
pass
@pytest.mark.asyncio
async def test_heartbeat_loop_ignores_non_connectivity_errors():
"""Errors that are not connectivity failures (e.g. TelegramError) must be swallowed."""
adapter = _make_adapter()
adapter._handle_polling_network_error = AsyncMock()
mock_app = MagicMock()
adapter._app = mock_app
sleep_call = 0
async def fast_sleep(seconds):
nonlocal sleep_call
sleep_call += 1
if sleep_call >= 3:
raise asyncio.CancelledError()
async def telegram_error_wait_for(coro, timeout):
if asyncio.iscoroutine(coro):
coro.close()
raise RuntimeError("TelegramError: Unauthorized") # non-OSError, non-TimeoutError
with patch("asyncio.sleep", side_effect=fast_sleep):
with patch("plugins.platforms.telegram.adapter.asyncio.wait_for", side_effect=telegram_error_wait_for):
await adapter._polling_heartbeat_loop()
# No reconnect should have been triggered for a non-connectivity error.
adapter._handle_polling_network_error.assert_not_awaited()
@pytest.mark.asyncio
async def test_heartbeat_loop_exits_on_fatal_error():
"""A fatal error short-circuits the loop before probing get_me()."""
adapter = _make_adapter()
adapter._set_fatal_error("telegram_network_error", "boom", retryable=True)
mock_app = MagicMock()
mock_app.bot.get_me = AsyncMock(return_value=MagicMock())
adapter._app = mock_app
async def fast_sleep(seconds):
return None
with patch("asyncio.sleep", side_effect=fast_sleep):
await adapter._polling_heartbeat_loop()
# Fatal error returns before the get_me() probe.
mock_app.bot.get_me.assert_not_awaited()
@pytest.mark.asyncio
async def test_disconnect_cancels_heartbeat_task():
"""disconnect() must cancel the heartbeat task before shutting down the app."""
adapter = _make_adapter()
# Simulate a running heartbeat.
heartbeat_task = asyncio.get_event_loop().create_task(asyncio.sleep(3600))
adapter._polling_heartbeat_task = heartbeat_task
mock_app = MagicMock()
mock_app.updater = MagicMock()
mock_app.updater.running = False
mock_app.running = False
mock_app.shutdown = AsyncMock()
adapter._app = mock_app
await adapter.disconnect()
assert heartbeat_task.cancelled(), "Heartbeat task must be cancelled by disconnect()"
assert adapter._polling_heartbeat_task is None