diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 9e78282be..524324c8d 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -601,6 +601,12 @@ class TelegramAdapter(BasePlatformAdapter): ) else: # ── Polling mode (default) ─────────────────────────── + # Clear any stale webhook first so polling doesn't inherit a + # previous webhook registration and silently stop receiving updates. + delete_webhook = getattr(self._bot, "delete_webhook", None) + if callable(delete_webhook): + await delete_webhook(drop_pending_updates=False) + loop = asyncio.get_running_loop() def _polling_error_callback(error: Exception) -> None: @@ -856,6 +862,21 @@ class TelegramAdapter(BasePlatformAdapter): await asyncio.sleep(wait) else: raise + except Exception as send_err: + retry_after = getattr(send_err, "retry_after", None) + if retry_after is not None or "retry after" in str(send_err).lower(): + if _send_attempt < 2: + wait = float(retry_after) if retry_after is not None else 1.0 + logger.warning( + "[%s] Telegram flood control on send (attempt %d/3), retrying in %.1fs: %s", + self.name, + _send_attempt + 1, + wait, + send_err, + ) + await asyncio.sleep(wait) + continue + raise message_ids.append(str(msg.message_id)) return SendResult( diff --git a/tests/gateway/test_telegram_conflict.py b/tests/gateway/test_telegram_conflict.py index 9f1074648..7a480d9fc 100644 --- a/tests/gateway/test_telegram_conflict.py +++ b/tests/gateway/test_telegram_conflict.py @@ -80,7 +80,7 @@ async def test_polling_conflict_retries_before_fatal(monkeypatch): stop=AsyncMock(), running=True, ) - bot = SimpleNamespace(set_my_commands=AsyncMock()) + bot = SimpleNamespace(set_my_commands=AsyncMock(), delete_webhook=AsyncMock()) app = SimpleNamespace( bot=bot, updater=updater, @@ -99,6 +99,7 @@ async def test_polling_conflict_retries_before_fatal(monkeypatch): ok = await adapter.connect() assert ok is True + bot.delete_webhook.assert_awaited_once_with(drop_pending_updates=False) assert callable(captured["error_callback"]) conflict = type("Conflict", (Exception,), {}) @@ -153,7 +154,7 @@ async def test_polling_conflict_becomes_fatal_after_retries(monkeypatch): stop=AsyncMock(), running=True, ) - bot = SimpleNamespace(set_my_commands=AsyncMock()) + bot = SimpleNamespace(set_my_commands=AsyncMock(), delete_webhook=AsyncMock()) app = SimpleNamespace( bot=bot, updater=updater, @@ -208,7 +209,7 @@ async def test_connect_marks_retryable_fatal_error_for_startup_network_failure(m builder = MagicMock() builder.token.return_value = builder app = SimpleNamespace( - bot=SimpleNamespace(), + bot=SimpleNamespace(delete_webhook=AsyncMock(), set_my_commands=AsyncMock()), updater=SimpleNamespace(), add_handler=MagicMock(), initialize=AsyncMock(side_effect=RuntimeError("Temporary failure in name resolution")), @@ -225,6 +226,49 @@ async def test_connect_marks_retryable_fatal_error_for_startup_network_failure(m assert "Temporary failure in name resolution" in adapter.fatal_error_message +@pytest.mark.asyncio +async def test_connect_clears_webhook_before_polling(monkeypatch): + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***")) + + monkeypatch.setattr( + "gateway.status.acquire_scoped_lock", + lambda scope, identity, metadata=None: (True, None), + ) + monkeypatch.setattr( + "gateway.status.release_scoped_lock", + lambda scope, identity: None, + ) + + updater = SimpleNamespace( + start_polling=AsyncMock(), + stop=AsyncMock(), + running=True, + ) + bot = SimpleNamespace( + delete_webhook=AsyncMock(), + set_my_commands=AsyncMock(), + ) + app = SimpleNamespace( + bot=bot, + updater=updater, + add_handler=MagicMock(), + initialize=AsyncMock(), + start=AsyncMock(), + ) + builder = MagicMock() + builder.token.return_value = builder + builder.build.return_value = app + monkeypatch.setattr( + "gateway.platforms.telegram.Application", + SimpleNamespace(builder=MagicMock(return_value=builder)), + ) + + ok = await adapter.connect() + + assert ok is True + bot.delete_webhook.assert_awaited_once_with(drop_pending_updates=False) + + @pytest.mark.asyncio async def test_disconnect_skips_inactive_updater_and_app(monkeypatch): adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***")) diff --git a/tests/gateway/test_telegram_thread_fallback.py b/tests/gateway/test_telegram_thread_fallback.py index 735744e4e..fee1dcc80 100644 --- a/tests/gateway/test_telegram_thread_fallback.py +++ b/tests/gateway/test_telegram_thread_fallback.py @@ -37,6 +37,12 @@ class FakeTimedOut(FakeNetworkError): pass +class FakeRetryAfter(Exception): + def __init__(self, seconds): + super().__init__(f"Retry after {seconds}") + self.retry_after = seconds + + # Build a fake telegram module tree so the adapter's internal imports work _fake_telegram = types.ModuleType("telegram") _fake_telegram_error = types.ModuleType("telegram.error") @@ -230,3 +236,25 @@ async def test_thread_fallback_only_fires_once(): # Second chunk: should use thread_id=None directly (effective_thread_id # was cleared per-chunk but the metadata doesn't change between chunks) # The key point: the message was delivered despite the invalid thread + + +@pytest.mark.asyncio +async def test_send_retries_retry_after_errors(): + """Telegram flood control should back off and retry instead of failing fast.""" + adapter = _make_adapter() + + attempt = [0] + + async def mock_send_message(**kwargs): + attempt[0] += 1 + if attempt[0] == 1: + raise FakeRetryAfter(2) + return SimpleNamespace(message_id=300) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send(chat_id="123", content="test message") + + assert result.success is True + assert result.message_id == "300" + assert attempt[0] == 2