diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 7b13cbc33dc..1e3ac5728d4 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -429,6 +429,13 @@ class TelegramAdapter(BasePlatformAdapter): self._polling_conflict_count: int = 0 self._polling_network_error_count: int = 0 self._polling_error_callback_ref = None + # After sustained reconnect storms the PTB httpx pool can return + # SendResult(success=True) for sends that never actually transmit. + # _handle_polling_network_error sets this; _verify_polling_after_reconnect + # clears it once getMe() confirms the Bot client is healthy. + # While True, send() short-circuits to a failure so callers + # (cron live-adapter branch) fall through to standalone delivery. + self._send_path_degraded: bool = False # DM Topics: map of topic_name -> message_thread_id (populated at startup) self._dm_topics: Dict[str, int] = {} # Track forum chats where we've already registered bot commands @@ -874,6 +881,7 @@ class TelegramAdapter(BasePlatformAdapter): MAX_DELAY = 60 self._polling_network_error_count += 1 + self._send_path_degraded = True attempt = self._polling_network_error_count if attempt > MAX_NETWORK_RETRIES: @@ -971,6 +979,7 @@ class TelegramAdapter(BasePlatformAdapter): try: await asyncio.wait_for(self._app.bot.get_me(), PROBE_TIMEOUT) + self._send_path_degraded = False except Exception as probe_err: logger.warning( "[%s] Polling heartbeat probe failed %ds after reconnect: %s", @@ -1683,7 +1692,11 @@ class TelegramAdapter(BasePlatformAdapter): """Send a message to a Telegram chat.""" if not self._bot: return SendResult(success=False, error="Not connected") - + + # getattr() — tests build adapters via object.__new__() (no __init__). + if getattr(self, "_send_path_degraded", False): + return SendResult(success=False, error="send_path_degraded", retryable=True) + # Skip whitespace-only text to prevent Telegram 400 empty-text errors. if not content or not content.strip(): return SendResult(success=True, message_id=None) diff --git a/tests/gateway/test_telegram_send_path_health.py b/tests/gateway/test_telegram_send_path_health.py new file mode 100644 index 00000000000..940633224e4 --- /dev/null +++ b/tests/gateway/test_telegram_send_path_health.py @@ -0,0 +1,90 @@ +"""TelegramAdapter send-path health gating after reconnect storms. + +After sustained Bad Gateway / TimedOut reconnect cycles, the PTB httpx client +can enter a wedged state where ``bot.send_message()`` returns a valid Message +but nothing reaches the recipient. ``_send_path_degraded`` short-circuits +``send()`` so cron's live-adapter branch falls through to standalone HTTP. +""" +import sys +import types +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import PlatformConfig + + +def _ensure_telegram_mock(): + if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"): + return + mod = MagicMock() + mod.error.NetworkError = type("NetworkError", (OSError,), {}) + mod.error.TimedOut = type("TimedOut", (OSError,), {}) + mod.error.BadRequest = type("BadRequest", (Exception,), {}) + for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"): + sys.modules.setdefault(name, mod) + sys.modules.setdefault("telegram.error", mod.error) + + +_ensure_telegram_mock() + +from gateway.platforms.telegram import TelegramAdapter # noqa: E402 + + +def _make_adapter() -> TelegramAdapter: + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***")) + adapter._bot = MagicMock() + adapter._bot.send_message = AsyncMock(return_value=MagicMock(message_id=42)) + return adapter + + +@pytest.mark.asyncio +async def test_send_succeeds_when_path_healthy(): + """Healthy adapter delivers normally; send_message is called.""" + adapter = _make_adapter() + assert adapter._send_path_degraded is False + + result = await adapter.send("123", "hello") + + assert result.success is True + adapter._bot.send_message.assert_awaited() + + +@pytest.mark.asyncio +async def test_send_short_circuits_when_path_degraded(): + """Degraded adapter returns failure WITHOUT calling send_message, + so cron's live-adapter branch falls through to standalone HTTP.""" + adapter = _make_adapter() + adapter._send_path_degraded = True + + result = await adapter.send("123", "hello") + + assert result.success is False + assert result.error == "send_path_degraded" + assert result.retryable is True + adapter._bot.send_message.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_reconnect_storm_sets_and_heartbeat_clears_flag(monkeypatch): + """_handle_polling_network_error sets the flag; a successful heartbeat + probe in _verify_polling_after_reconnect clears it.""" + adapter = _make_adapter() + adapter._app = MagicMock() + adapter._app.updater = MagicMock() + adapter._app.updater.running = True + adapter._app.updater.stop = AsyncMock() + adapter._app.updater.start_polling = AsyncMock() + adapter._app.bot = MagicMock() + adapter._app.bot.get_me = AsyncMock(return_value=MagicMock()) + adapter._polling_error_callback_ref = AsyncMock() + monkeypatch.setattr( + "gateway.platforms.telegram.Update", MagicMock(ALL_TYPES=[]) + ) + + await adapter._handle_polling_network_error(OSError("Bad Gateway")) + assert adapter._send_path_degraded is True + + with patch("gateway.platforms.telegram.asyncio.sleep", new_callable=AsyncMock): + await adapter._verify_polling_after_reconnect() + assert adapter._send_path_degraded is False