mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-07 02:51:50 +00:00
fix(telegram): probe polling liveness after reconnect to detect wedged Updater
After a transient Telegram 502, _handle_polling_network_error's stop()+start_polling() cycle can leave PTB's Updater with `running=True` but a wedged consumer task that never makes progress. No error_callback fires in that state, so the reconnect ladder never advances past attempt 1, the MAX_NETWORK_RETRIES fatal-error path is never reached, and the gateway sits silent indefinitely. Schedule a heartbeat probe (60s after a successful reconnect) that verifies Updater.running is still True and bot.get_me() responds within a tight asyncio.wait_for timeout. Either failure feeds back into the reconnect ladder so the existing escalation path fires. No PTB-internal coupling, no Application rebuild — minimal additive defense inside the existing reconnect abstraction. Tests cover healthy / Updater non-running / probe timeout / probe network error / already-fatal cases, plus an integration check that the probe is actually scheduled after a successful start_polling(). Closes the silent-wedge case observed in the wild after a transient Telegram 502; existing reconnect tests updated to mock bot.get_me() now that the success path schedules a heartbeat probe.
This commit is contained in:
parent
9bf260472b
commit
2470434d60
2 changed files with 244 additions and 0 deletions
|
|
@ -512,6 +512,17 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
self.name, attempt,
|
||||
)
|
||||
self._polling_network_error_count = 0
|
||||
# start_polling() returning is necessary but not sufficient:
|
||||
# PTB's Updater can be left in a state where `running` is True
|
||||
# but the underlying long-poll task is wedged on a stale httpx
|
||||
# connection and never makes progress. No error_callback fires
|
||||
# in that state, so the reconnect ladder won't advance on its
|
||||
# own. Schedule a deferred probe to detect the wedge and
|
||||
# re-enter the ladder if needed.
|
||||
if not self.has_fatal_error:
|
||||
probe = asyncio.ensure_future(self._verify_polling_after_reconnect())
|
||||
self._background_tasks.add(probe)
|
||||
probe.add_done_callback(self._background_tasks.discard)
|
||||
except Exception as retry_err:
|
||||
logger.warning("[%s] Telegram polling reconnect failed: %s", self.name, retry_err)
|
||||
# start_polling failed — polling is dead and no further error
|
||||
|
|
@ -523,6 +534,50 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
self._background_tasks.add(task)
|
||||
task.add_done_callback(self._background_tasks.discard)
|
||||
|
||||
async def _verify_polling_after_reconnect(self) -> None:
|
||||
"""Heartbeat probe scheduled after a successful reconnect.
|
||||
|
||||
PTB's Updater can survive a botched stop()+start_polling() cycle
|
||||
with `running=True` but a wedged consumer task. No error callback
|
||||
fires, so the reconnect ladder doesn't advance on its own. This
|
||||
probe detects the wedge by:
|
||||
|
||||
1. Sleeping HEARTBEAT_PROBE_DELAY so a healthy long-poll has time
|
||||
to complete at least one cycle.
|
||||
2. Verifying `Updater.running` is still True.
|
||||
3. Probing the bot endpoint with a tight asyncio timeout. A
|
||||
wedged httpx pool fails this probe; a healthy one returns
|
||||
well under the timeout.
|
||||
|
||||
On any failure, re-enter the reconnect ladder so the existing
|
||||
MAX_NETWORK_RETRIES path can ultimately escalate to fatal-error.
|
||||
"""
|
||||
HEARTBEAT_PROBE_DELAY = 60
|
||||
PROBE_TIMEOUT = 10
|
||||
|
||||
await asyncio.sleep(HEARTBEAT_PROBE_DELAY)
|
||||
|
||||
if self.has_fatal_error:
|
||||
return
|
||||
if not (self._app and self._app.updater and self._app.updater.running):
|
||||
logger.warning(
|
||||
"[%s] Updater not running %ds after reconnect — treating as wedged",
|
||||
self.name, HEARTBEAT_PROBE_DELAY,
|
||||
)
|
||||
await self._handle_polling_network_error(
|
||||
RuntimeError("Updater not running after reconnect heartbeat")
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(self._app.bot.get_me(), PROBE_TIMEOUT)
|
||||
except Exception as probe_err:
|
||||
logger.warning(
|
||||
"[%s] Polling heartbeat probe failed %ds after reconnect: %s",
|
||||
self.name, HEARTBEAT_PROBE_DELAY, probe_err,
|
||||
)
|
||||
await self._handle_polling_network_error(probe_err)
|
||||
|
||||
async def _handle_polling_conflict(self, error: Exception) -> None:
|
||||
if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict":
|
||||
return
|
||||
|
|
|
|||
|
|
@ -132,6 +132,7 @@ async def test_reconnect_success_resets_error_count():
|
|||
|
||||
mock_app = MagicMock()
|
||||
mock_app.updater = mock_updater
|
||||
mock_app.bot.get_me = AsyncMock(return_value=MagicMock()) # heartbeat probe path
|
||||
adapter._app = mock_app
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
|
|
@ -139,6 +140,15 @@ async def test_reconnect_success_resets_error_count():
|
|||
|
||||
assert adapter._polling_network_error_count == 0
|
||||
|
||||
# Clean up the heartbeat-probe task scheduled after a successful reconnect.
|
||||
pending = [t for t in adapter._background_tasks if not t.done()]
|
||||
for t in pending:
|
||||
t.cancel()
|
||||
try:
|
||||
await t
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_triggers_fatal_after_max_retries():
|
||||
|
|
@ -284,3 +294,182 @@ async def test_drain_helper_noop_without_app():
|
|||
adapter._app = None
|
||||
# Should not raise
|
||||
await adapter._drain_polling_connections()
|
||||
|
||||
|
||||
# ── Heartbeat probe ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_heartbeat_probe_no_op_when_polling_healthy():
|
||||
"""
|
||||
Probe scheduled after a successful reconnect: Updater.running=True and
|
||||
bot.get_me() returns quickly → recovery confirmed, no further action.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
|
||||
mock_updater = MagicMock()
|
||||
mock_updater.running = True
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_app.updater = mock_updater
|
||||
mock_app.bot.get_me = AsyncMock(return_value=MagicMock())
|
||||
adapter._app = mock_app
|
||||
|
||||
adapter._handle_polling_network_error = AsyncMock()
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._verify_polling_after_reconnect()
|
||||
|
||||
mock_app.bot.get_me.assert_awaited_once()
|
||||
adapter._handle_polling_network_error.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_heartbeat_probe_reenters_ladder_when_updater_not_running():
|
||||
"""
|
||||
If Updater.running has flipped to False by the heartbeat delay, treat
|
||||
as wedged: re-enter the reconnect ladder.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
|
||||
mock_updater = MagicMock()
|
||||
mock_updater.running = False
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_app.updater = mock_updater
|
||||
mock_app.bot.get_me = AsyncMock()
|
||||
adapter._app = mock_app
|
||||
|
||||
adapter._handle_polling_network_error = AsyncMock()
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._verify_polling_after_reconnect()
|
||||
|
||||
mock_app.bot.get_me.assert_not_called()
|
||||
adapter._handle_polling_network_error.assert_awaited_once()
|
||||
err = adapter._handle_polling_network_error.await_args.args[0]
|
||||
assert isinstance(err, RuntimeError)
|
||||
assert "not running" in str(err).lower()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_heartbeat_probe_reenters_ladder_when_get_me_times_out():
|
||||
"""
|
||||
If bot.get_me() hangs longer than PROBE_TIMEOUT, treat as wedged.
|
||||
Simulates the connection-pool wedge that motivated this fix.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
|
||||
mock_updater = MagicMock()
|
||||
mock_updater.running = True
|
||||
|
||||
async def hang_forever(*args, **kwargs):
|
||||
await asyncio.sleep(3600)
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_app.updater = mock_updater
|
||||
mock_app.bot.get_me = AsyncMock(side_effect=hang_forever)
|
||||
adapter._app = mock_app
|
||||
|
||||
adapter._handle_polling_network_error = AsyncMock()
|
||||
|
||||
async def fast_wait_for(coro, timeout):
|
||||
if asyncio.iscoroutine(coro):
|
||||
coro.close()
|
||||
raise asyncio.TimeoutError()
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
with patch("gateway.platforms.telegram.asyncio.wait_for", new=fast_wait_for):
|
||||
await adapter._verify_polling_after_reconnect()
|
||||
|
||||
adapter._handle_polling_network_error.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_heartbeat_probe_reenters_ladder_on_get_me_network_error():
|
||||
"""
|
||||
Any exception raised by bot.get_me() (NetworkError, ConnectionError, etc.)
|
||||
should re-enter the reconnect ladder with the original exception.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
|
||||
mock_updater = MagicMock()
|
||||
mock_updater.running = True
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_app.updater = mock_updater
|
||||
mock_app.bot.get_me = AsyncMock(side_effect=ConnectionError("pool wedged"))
|
||||
adapter._app = mock_app
|
||||
|
||||
adapter._handle_polling_network_error = AsyncMock()
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._verify_polling_after_reconnect()
|
||||
|
||||
adapter._handle_polling_network_error.assert_awaited_once()
|
||||
assert isinstance(
|
||||
adapter._handle_polling_network_error.await_args.args[0], ConnectionError
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_heartbeat_probe_skips_when_already_fatal():
|
||||
"""
|
||||
If the adapter is already in fatal-error state by the time the probe
|
||||
delay elapses, the probe should bail without further action.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
adapter._set_fatal_error("telegram_polling_conflict", "already fatal", retryable=False)
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_app.bot.get_me = AsyncMock()
|
||||
adapter._app = mock_app
|
||||
|
||||
adapter._handle_polling_network_error = AsyncMock()
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._verify_polling_after_reconnect()
|
||||
|
||||
mock_app.bot.get_me.assert_not_called()
|
||||
adapter._handle_polling_network_error.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_schedules_heartbeat_probe_on_success():
|
||||
"""
|
||||
After a successful start_polling() in the reconnect path, a probe task
|
||||
must be added to _background_tasks. Without it, a wedged Updater would
|
||||
sit silent indefinitely with no further error_callback to advance the
|
||||
reconnect ladder.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
adapter._polling_network_error_count = 1
|
||||
|
||||
mock_updater = MagicMock()
|
||||
mock_updater.running = True
|
||||
mock_updater.stop = AsyncMock()
|
||||
mock_updater.start_polling = AsyncMock() # succeeds
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_app.updater = mock_updater
|
||||
mock_app.bot.get_me = AsyncMock(return_value=MagicMock())
|
||||
adapter._app = mock_app
|
||||
|
||||
initial_count = len(adapter._background_tasks)
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._handle_polling_network_error(Exception("Bad Gateway"))
|
||||
|
||||
assert len(adapter._background_tasks) > initial_count, (
|
||||
"Expected a heartbeat probe task to be scheduled after a successful "
|
||||
"reconnect's start_polling()"
|
||||
)
|
||||
|
||||
# Clean up.
|
||||
pending = [t for t in adapter._background_tasks if not t.done()]
|
||||
for t in pending:
|
||||
t.cancel()
|
||||
try:
|
||||
await t
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue