mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-07 08:02:23 +00:00
fix(telegram): gate send() on send-path health after reconnect storms (#31165)
After sustained Bad Gateway / TimedOut reconnect cycles, the PTB httpx client can enter a state where bot.send_message() returns a valid Message (real message_id) but the message never reaches the recipient. TelegramAdapter.send returns SendResult(success=True) and cron's live-adapter branch marks the run delivered while the message is silently dropped. Add a _send_path_degraded flag. _handle_polling_network_error sets it on reconnect storms; the existing _verify_polling_after_reconnect heartbeat probe clears it once getMe() confirms the Bot client is healthy. While the flag is set, send() short-circuits with SendResult(success=False, retryable=True) so cron falls through to the standalone delivery path (fresh HTTP session). Closes #31165. Co-authored-by: teknium1 <127238744+teknium1@users.noreply.github.com>
This commit is contained in:
parent
54e61f9331
commit
476c897439
2 changed files with 104 additions and 1 deletions
|
|
@ -429,6 +429,13 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
self._polling_conflict_count: int = 0
|
||||
self._polling_network_error_count: int = 0
|
||||
self._polling_error_callback_ref = None
|
||||
# After sustained reconnect storms the PTB httpx pool can return
|
||||
# SendResult(success=True) for sends that never actually transmit.
|
||||
# _handle_polling_network_error sets this; _verify_polling_after_reconnect
|
||||
# clears it once getMe() confirms the Bot client is healthy.
|
||||
# While True, send() short-circuits to a failure so callers
|
||||
# (cron live-adapter branch) fall through to standalone delivery.
|
||||
self._send_path_degraded: bool = False
|
||||
# DM Topics: map of topic_name -> message_thread_id (populated at startup)
|
||||
self._dm_topics: Dict[str, int] = {}
|
||||
# Track forum chats where we've already registered bot commands
|
||||
|
|
@ -874,6 +881,7 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
MAX_DELAY = 60
|
||||
|
||||
self._polling_network_error_count += 1
|
||||
self._send_path_degraded = True
|
||||
attempt = self._polling_network_error_count
|
||||
|
||||
if attempt > MAX_NETWORK_RETRIES:
|
||||
|
|
@ -971,6 +979,7 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
|
||||
try:
|
||||
await asyncio.wait_for(self._app.bot.get_me(), PROBE_TIMEOUT)
|
||||
self._send_path_degraded = False
|
||||
except Exception as probe_err:
|
||||
logger.warning(
|
||||
"[%s] Polling heartbeat probe failed %ds after reconnect: %s",
|
||||
|
|
@ -1683,7 +1692,11 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
"""Send a message to a Telegram chat."""
|
||||
if not self._bot:
|
||||
return SendResult(success=False, error="Not connected")
|
||||
|
||||
|
||||
# getattr() — tests build adapters via object.__new__() (no __init__).
|
||||
if getattr(self, "_send_path_degraded", False):
|
||||
return SendResult(success=False, error="send_path_degraded", retryable=True)
|
||||
|
||||
# Skip whitespace-only text to prevent Telegram 400 empty-text errors.
|
||||
if not content or not content.strip():
|
||||
return SendResult(success=True, message_id=None)
|
||||
|
|
|
|||
90
tests/gateway/test_telegram_send_path_health.py
Normal file
90
tests/gateway/test_telegram_send_path_health.py
Normal file
|
|
@ -0,0 +1,90 @@
|
|||
"""TelegramAdapter send-path health gating after reconnect storms.
|
||||
|
||||
After sustained Bad Gateway / TimedOut reconnect cycles, the PTB httpx client
|
||||
can enter a wedged state where ``bot.send_message()`` returns a valid Message
|
||||
but nothing reaches the recipient. ``_send_path_degraded`` short-circuits
|
||||
``send()`` so cron's live-adapter branch falls through to standalone HTTP.
|
||||
"""
|
||||
import sys
|
||||
import types
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
|
||||
|
||||
def _ensure_telegram_mock():
|
||||
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
|
||||
return
|
||||
mod = MagicMock()
|
||||
mod.error.NetworkError = type("NetworkError", (OSError,), {})
|
||||
mod.error.TimedOut = type("TimedOut", (OSError,), {})
|
||||
mod.error.BadRequest = type("BadRequest", (Exception,), {})
|
||||
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
|
||||
sys.modules.setdefault(name, mod)
|
||||
sys.modules.setdefault("telegram.error", mod.error)
|
||||
|
||||
|
||||
_ensure_telegram_mock()
|
||||
|
||||
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
|
||||
|
||||
|
||||
def _make_adapter() -> TelegramAdapter:
|
||||
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
|
||||
adapter._bot = MagicMock()
|
||||
adapter._bot.send_message = AsyncMock(return_value=MagicMock(message_id=42))
|
||||
return adapter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_succeeds_when_path_healthy():
|
||||
"""Healthy adapter delivers normally; send_message is called."""
|
||||
adapter = _make_adapter()
|
||||
assert adapter._send_path_degraded is False
|
||||
|
||||
result = await adapter.send("123", "hello")
|
||||
|
||||
assert result.success is True
|
||||
adapter._bot.send_message.assert_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_short_circuits_when_path_degraded():
|
||||
"""Degraded adapter returns failure WITHOUT calling send_message,
|
||||
so cron's live-adapter branch falls through to standalone HTTP."""
|
||||
adapter = _make_adapter()
|
||||
adapter._send_path_degraded = True
|
||||
|
||||
result = await adapter.send("123", "hello")
|
||||
|
||||
assert result.success is False
|
||||
assert result.error == "send_path_degraded"
|
||||
assert result.retryable is True
|
||||
adapter._bot.send_message.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_storm_sets_and_heartbeat_clears_flag(monkeypatch):
|
||||
"""_handle_polling_network_error sets the flag; a successful heartbeat
|
||||
probe in _verify_polling_after_reconnect clears it."""
|
||||
adapter = _make_adapter()
|
||||
adapter._app = MagicMock()
|
||||
adapter._app.updater = MagicMock()
|
||||
adapter._app.updater.running = True
|
||||
adapter._app.updater.stop = AsyncMock()
|
||||
adapter._app.updater.start_polling = AsyncMock()
|
||||
adapter._app.bot = MagicMock()
|
||||
adapter._app.bot.get_me = AsyncMock(return_value=MagicMock())
|
||||
adapter._polling_error_callback_ref = AsyncMock()
|
||||
monkeypatch.setattr(
|
||||
"gateway.platforms.telegram.Update", MagicMock(ALL_TYPES=[])
|
||||
)
|
||||
|
||||
await adapter._handle_polling_network_error(OSError("Bad Gateway"))
|
||||
assert adapter._send_path_degraded is True
|
||||
|
||||
with patch("gateway.platforms.telegram.asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._verify_polling_after_reconnect()
|
||||
assert adapter._send_path_degraded is False
|
||||
Loading…
Add table
Add a link
Reference in a new issue