fix(telegram): preserve Bot API update queue on watcher reconnect

After a prolonged outage the in-process network-error ladder escalates to
fatal and GatewayRunner._platform_reconnect_watcher rebuilds a fresh adapter
that reconnects through the bootstrap path. That path called
start_polling(drop_pending_updates=True), discarding every update Telegram
queued during the outage — all messages sent while the bot was down were
silently lost. The in-process ladder and 409-conflict handler already passed
drop_pending_updates=False; only bootstrap did not distinguish a cold first
boot from a reconnect.

Thread an is_reconnect signal from the watcher through
_connect_adapter_with_timeout into adapter.connect(). The base
BasePlatformAdapter.connect() gains a keyword-only is_reconnect=False so every
adapter inherits a tolerant signature (no per-platform breakage when the
runner forwards the kwarg). Telegram translates is_reconnect into
drop_pending_updates=not is_reconnect on both the polling and webhook bootstrap
calls. Cold boot still drops the stale queue; a watcher reconnect preserves it.

Fixes #46621.

Co-authored-by: annguyenNous <annguyen@nousresearch.com>
Co-authored-by: kyssta-exe <kyssta-exe@users.noreply.github.com>
Co-authored-by: Kewe63 <Kewe63@users.noreply.github.com>
This commit is contained in:
teknium1 2026-06-25 21:19:44 -07:00 committed by Teknium
parent f44415e71a
commit 43b8ba4181
65 changed files with 243 additions and 80 deletions

View file

@ -4372,7 +4372,7 @@ class APIServerAdapter(BasePlatformAdapter):
# BasePlatformAdapter interface
# ------------------------------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Start the aiohttp web server."""
if not AIOHTTP_AVAILABLE:
logger.warning("[%s] aiohttp not installed", self.name)

View file

@ -2634,10 +2634,21 @@ class BasePlatformAdapter(ABC):
self._session_store = session_store
@abstractmethod
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""
Connect to the platform and start receiving messages.
Args:
is_reconnect: False on a cold first boot (the gateway is
starting this platform for the first time); True when the
reconnect watcher is re-establishing a platform that was
previously running and dropped after an outage. Adapters
that buffer a server-side update queue (e.g. Telegram's Bot
API) should preserve that queue when ``is_reconnect`` is
True so messages sent during the outage are delivered rather
than silently discarded. Adapters with no such queue may
ignore the flag.
Returns True if connection was successful.
"""
pass

View file

@ -232,7 +232,7 @@ class BlueBubblesAdapter(BasePlatformAdapter):
# Lifecycle
# ------------------------------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
if not self.server_url or not self.password:
logger.error(
"[bluebubbles] BLUEBUBBLES_SERVER_URL and BLUEBUBBLES_PASSWORD are required"

View file

@ -136,7 +136,7 @@ class MSGraphWebhookAdapter(BasePlatformAdapter):
def _source_allowlist_required_but_missing(self) -> bool:
return is_network_accessible(self._host) and not self._allowed_source_networks
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
if self._client_state is None:
logger.error(
"[msgraph_webhook] Refusing to start without extra.client_state configured"

View file

@ -338,7 +338,7 @@ class SignalAdapter(BasePlatformAdapter):
# Lifecycle
# ------------------------------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to signal-cli daemon and start SSE listener."""
if not self.http_url or not self.account:
logger.error("Signal: SIGNAL_HTTP_URL and SIGNAL_ACCOUNT are required")

View file

@ -153,7 +153,7 @@ class WebhookAdapter(BasePlatformAdapter):
# Lifecycle
# ------------------------------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
# Load agent-created subscriptions before validating
self._reload_dynamic_routes()

View file

@ -1261,7 +1261,7 @@ class WeixinAdapter(BasePlatformAdapter):
return [str(item).strip() for item in value if str(item).strip()]
return [str(value).strip()] if str(value).strip() else []
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
if not check_weixin_requirements():
message = "Weixin startup failed: aiohttp and cryptography are required"
self._set_fatal_error("weixin_missing_dependency", message, retryable=False)

View file

@ -347,7 +347,7 @@ class WhatsAppCloudAdapter(WhatsAppBehaviorMixin, BasePlatformAdapter):
return super()._is_dm_allowed(sender_id)
# ------------------------------------------------------------------ lifecycle
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
if not check_whatsapp_cloud_requirements():
self._set_fatal_error(
"whatsapp_cloud_deps_missing",

View file

@ -5114,7 +5114,7 @@ class YuanbaoAdapter(BasePlatformAdapter):
"""Yuanbao gates DM/group access at intake via dm_policy/group_policy."""
return True
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to Yuanbao WS gateway and authenticate.
Delegates to ConnectionManager.open().

View file

@ -3086,13 +3086,24 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
return max(0.0, timeout)
return _PLATFORM_CONNECT_TIMEOUT_SECS_DEFAULT
async def _connect_adapter_with_timeout(self, adapter, platform) -> bool:
"""Connect an adapter without allowing one platform to block others."""
async def _connect_adapter_with_timeout(
self, adapter, platform, *, is_reconnect: bool = False
) -> bool:
"""Connect an adapter without allowing one platform to block others.
``is_reconnect`` is forwarded to ``adapter.connect()`` so platform
adapters can distinguish a cold first boot (drop any stale
server-side queue) from a watcher reconnect after a prolonged outage
(preserve the queue so messages sent during the outage are delivered
rather than silently dropped #46621).
"""
timeout = self._platform_connect_timeout_secs()
if timeout <= 0:
return await adapter.connect()
return await adapter.connect(is_reconnect=is_reconnect)
try:
return await asyncio.wait_for(adapter.connect(), timeout=timeout)
return await asyncio.wait_for(
adapter.connect(is_reconnect=is_reconnect), timeout=timeout
)
except asyncio.TimeoutError as exc:
raise TimeoutError(
f"{platform.value} connect timed out after {timeout:g}s"
@ -6718,7 +6729,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
adapter.set_topic_recovery_fn(self._recover_telegram_topic_thread_id)
adapter._busy_text_mode = self._busy_text_mode
success = await self._connect_adapter_with_timeout(adapter, platform)
# Reconnect after an outage: preserve the platform's
# server-side update queue so messages sent while the bot
# was offline are delivered rather than dropped (#46621).
success = await self._connect_adapter_with_timeout(
adapter, platform, is_reconnect=True
)
if success:
self.adapters[platform] = adapter
self._sync_voice_mode_state_to_adapter(adapter)

View file

@ -239,7 +239,7 @@ class DingTalkAdapter(BasePlatformAdapter):
# -- Connection lifecycle -----------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to DingTalk via Stream Mode."""
if not DINGTALK_STREAM_AVAILABLE:
logger.warning(

View file

@ -859,7 +859,7 @@ class DiscordAdapter(BasePlatformAdapter):
asyncio.create_task(_notify())
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to Discord and start receiving events."""
if not DISCORD_AVAILABLE:
logger.error("[%s] discord.py not installed. Run: pip install discord.py", self.name)

View file

@ -548,7 +548,7 @@ class EmailAdapter(BasePlatformAdapter):
# Retry with IPv4 only.
return _connect(ipv4_only=True)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to the IMAP server and start polling for new messages."""
# Validate up front so a missing host surfaces as an actionable config
# error instead of IMAP4_SSL("") raising the cryptic

View file

@ -1641,7 +1641,7 @@ class FeishuAdapter(BasePlatformAdapter):
.build()
)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to Feishu/Lark."""
if not FEISHU_AVAILABLE:
logger.error("[Feishu] lark-oapi not installed")

View file

@ -761,7 +761,7 @@ class GoogleChatAdapter(BasePlatformAdapter):
# ------------------------------------------------------------------
# Connection lifecycle
# ------------------------------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Validate config, authenticate, start Pub/Sub pull, resolve bot id."""
# First call into the heavy google-cloud stack — trigger the lazy
# import. ``_load_google_modules()`` is idempotent and rebinds the

View file

@ -98,7 +98,7 @@ class HomeAssistantAdapter(BasePlatformAdapter):
# Connection lifecycle
# ------------------------------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to HA WebSocket API and subscribe to events."""
if not AIOHTTP_AVAILABLE:
logger.warning("[%s] aiohttp not installed. Run: pip install aiohttp", self.name)

View file

@ -152,7 +152,7 @@ class IRCAdapter(BasePlatformAdapter):
# ── Connection lifecycle ──────────────────────────────────────────────
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to the IRC server, register, and join the channel."""
if not self.server or not self.channel:
logger.error("IRC: server and channel must be configured")

View file

@ -740,7 +740,7 @@ class LineAdapter(BasePlatformAdapter):
# Connection lifecycle
# ------------------------------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
if not self.channel_access_token or not self.channel_secret:
self._set_fatal_error(
"config_missing",

View file

@ -1135,7 +1135,7 @@ class MatrixAdapter(BasePlatformAdapter):
# Required overrides
# ------------------------------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to the Matrix homeserver and start syncing."""
from mautrix.api import HTTPAPI
from mautrix.client import Client

View file

@ -256,7 +256,7 @@ class MattermostAdapter(BasePlatformAdapter):
# Required overrides
# ------------------------------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to Mattermost and start the WebSocket listener."""
import aiohttp

View file

@ -183,7 +183,7 @@ class NtfyAdapter(BasePlatformAdapter):
# -- Connection lifecycle -----------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to ntfy by starting the streaming subscription task."""
if not HTTPX_AVAILABLE:
logger.warning("[%s] httpx not installed. Run: pip install httpx", self.name)

View file

@ -334,7 +334,7 @@ class PhotonAdapter(BasePlatformAdapter):
# -- Connection lifecycle ---------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
if not HTTPX_AVAILABLE:
self._set_fatal_error(
"MISSING_DEP", "httpx not installed", retryable=False

View file

@ -468,7 +468,7 @@ class RaftAdapter(BasePlatformAdapter):
def runtime_session(self) -> str:
return self._runtime_session
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
if not self._bridge_token:
self._bridge_token = secrets.token_hex(32)
logger.info("[raft] Auto-generated bridge token")

View file

@ -208,7 +208,7 @@ class SimplexAdapter(BasePlatformAdapter):
# Lifecycle
# ------------------------------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to the simplex-chat daemon and start the WebSocket listener."""
try:
import websockets # noqa: F401

View file

@ -829,7 +829,7 @@ class SlackAdapter(BasePlatformAdapter):
# Non-fatal — the user saw the initial ack already.
return SendResult(success=True, message_id=None)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to Slack via Socket Mode."""
if not SLACK_AVAILABLE:
logger.error(

View file

@ -86,7 +86,7 @@ class SmsAdapter(BasePlatformAdapter):
# Required abstract methods
# ------------------------------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
import aiohttp
from aiohttp import web

View file

@ -709,7 +709,7 @@ class TeamsAdapter(BasePlatformAdapter):
# Used to send cards with the correct conversation type (personal/group/channel).
self._conv_refs: Dict[str, Any] = {}
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
# Lazy-install the Teams SDK on demand (parity with Slack/Discord/etc.),
# then re-check the module globals it rebinds.
check_teams_requirements()

View file

@ -2232,7 +2232,7 @@ class TelegramAdapter(BasePlatformAdapter):
self.name, topic_name, seed_err,
)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to Telegram via polling or webhook.
By default, uses long polling (outbound connection to Telegram).
@ -2240,6 +2240,14 @@ class TelegramAdapter(BasePlatformAdapter):
instead. Webhook mode is useful for cloud deployments (Fly.io,
Railway) where inbound HTTP can wake a suspended machine.
``is_reconnect`` distinguishes a cold first boot (False drop any
stale Bot API queue) from a watcher reconnect after a prolonged
outage (True preserve the updates Telegram queued while the bot
was offline, otherwise every message sent during the outage is
silently lost). The in-process network-error ladder and the
409-conflict handler already pass ``drop_pending_updates=False``
for the same reason; bootstrap follows suit on the reconnect path.
Env vars for webhook mode::
TELEGRAM_WEBHOOK_URL Public HTTPS URL (e.g. https://app.fly.dev/telegram)
@ -2476,7 +2484,11 @@ class TelegramAdapter(BasePlatformAdapter):
webhook_url=webhook_url,
secret_token=webhook_secret,
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=True,
# Webhooks are push-based — Telegram does not hold a
# server-side getUpdates queue, so this flag is a no-op
# in practice. Mirror the polling path's reconnect
# semantics for consistency.
drop_pending_updates=not is_reconnect,
)
self._webhook_mode = True
logger.info(
@ -2509,7 +2521,10 @@ class TelegramAdapter(BasePlatformAdapter):
await self._app.updater.start_polling(
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=True,
# On a cold first boot drop the stale Bot API queue; on a
# watcher reconnect after an outage preserve it so messages
# sent while the bot was offline are delivered (#46621).
drop_pending_updates=not is_reconnect,
error_callback=_polling_error_callback,
)

View file

@ -198,7 +198,7 @@ class WeComAdapter(BasePlatformAdapter):
# Connection lifecycle
# ------------------------------------------------------------------
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""Connect to the WeCom AI Bot gateway."""
if not AIOHTTP_AVAILABLE:
message = "WeCom startup failed: aiohttp not installed"

View file

@ -411,7 +411,7 @@ class WhatsAppAdapter(WhatsAppBehaviorMixin, BasePlatformAdapter):
return float(default)
return parsed
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
"""
Start the WhatsApp bridge.

View file

@ -40,7 +40,7 @@ class StubConnector:
# absent/expired capability or a tenant mismatch on the connector side.
self.next_follow_up_result: Dict[str, Any] = {"success": True, "message_id": "f1"}
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
self.connected = True
return True

View file

@ -422,7 +422,7 @@ async def test_adapter_go_dormant_noop_on_stub_transport():
)
class _StubTransport:
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
return True
def set_inbound_handler(self, h):

View file

@ -15,7 +15,7 @@ class RestartTestAdapter(BasePlatformAdapter):
self.sent: list[str] = []
self.sent_calls: list[tuple[str, str, object]] = []
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
return True
async def disconnect(self):

View file

@ -65,7 +65,7 @@ def _make_event(
class _DummyAdapter(BasePlatformAdapter): # type: ignore[misc]
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
pass
async def disconnect(self):

View file

@ -20,7 +20,7 @@ class DummyTelegramAdapter(BasePlatformAdapter):
self.typing = []
self.processing_hooks = []
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:

View file

@ -19,7 +19,7 @@ from gateway.session import SessionSource, build_session_key
class _StubAdapter(BasePlatformAdapter):
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
pass
async def disconnect(self):

View file

@ -29,7 +29,7 @@ from gateway.session import SessionSource, build_session_key
class _StubAdapter(BasePlatformAdapter):
"""Concrete adapter with abstract methods stubbed out."""
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
pass
async def disconnect(self):

View file

@ -37,7 +37,7 @@ class StubAdapter(BasePlatformAdapter):
super().__init__(PlatformConfig(enabled=True, token="fake"), Platform.DISCORD)
self.sent = []
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
return True
async def disconnect(self):

View file

@ -39,7 +39,7 @@ from gateway.session import SessionSource
class _NoDeleteAdapter(BasePlatformAdapter):
"""Adapter that does NOT override delete_message (silent degrade)."""
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
pass
async def disconnect(self):
@ -59,7 +59,7 @@ class _DeleteCapableAdapter(BasePlatformAdapter):
super().__init__(*a, **kw)
self.deleted: list[tuple[str, str]] = []
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
pass
async def disconnect(self):

View file

@ -21,7 +21,7 @@ class StubAdapter(BasePlatformAdapter):
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM)
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
return True
async def disconnect(self):

View file

@ -38,7 +38,7 @@ class _StubAdapter(BasePlatformAdapter):
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:

View file

@ -35,7 +35,7 @@ from gateway.session import SessionSource, build_session_key
class _StubAdapter(BasePlatformAdapter):
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
pass
async def disconnect(self):

View file

@ -36,7 +36,7 @@ from gateway.session import SessionSource, build_session_key
class _StubAdapter(BasePlatformAdapter):
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
pass
async def disconnect(self):

View file

@ -1276,7 +1276,7 @@ class TestTruncateMessage:
"""Create a minimal adapter instance for testing static/instance methods."""
class StubAdapter(BasePlatformAdapter):
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
return True
async def disconnect(self):

View file

@ -26,8 +26,12 @@ class StubAdapter(BasePlatformAdapter):
self._succeed = succeed
self._fatal_error = fatal_error
self._fatal_retryable = fatal_retryable
# Records the is_reconnect value of every connect() call so tests can
# assert that the watcher distinguishes reconnect from cold boot (#46621).
self.connect_calls: list[bool] = []
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
self.connect_calls.append(is_reconnect)
if self._fatal_error:
self._set_fatal_error("test_error", self._fatal_error, retryable=self._fatal_retryable)
return False
@ -140,7 +144,7 @@ class TestStartupPlatformIsolation:
runner = _make_runner()
adapter = StubAdapter()
async def hang():
async def hang(*, is_reconnect: bool = False):
await asyncio.sleep(60)
return True
@ -217,6 +221,59 @@ class TestPlatformReconnectWatcher:
assert Platform.TELEGRAM not in runner._failed_platforms
assert Platform.TELEGRAM in runner.adapters
@pytest.mark.asyncio
async def test_reconnect_passes_is_reconnect_true(self):
"""The watcher must connect with is_reconnect=True so adapters preserve
their server-side update queue across an outage (#46621). Without this,
bootstrap start_polling(drop_pending_updates=True) silently dropped every
message queued while the bot was offline."""
runner = _make_runner()
runner._sync_voice_mode_state_to_adapter = MagicMock()
runner._failed_platforms[Platform.TELEGRAM] = {
"config": PlatformConfig(enabled=True, token="test"),
"attempts": 1,
"next_retry": time.monotonic() - 1,
}
succeed_adapter = StubAdapter(succeed=True)
real_sleep = asyncio.sleep
with patch.object(runner, "_create_adapter", return_value=succeed_adapter):
with patch("gateway.run.build_channel_directory", create=True):
runner._running = True
call_count = 0
async def fake_sleep(n):
nonlocal call_count
call_count += 1
if call_count > 1:
runner._running = False
await real_sleep(0)
with patch("asyncio.sleep", side_effect=fake_sleep):
await runner._platform_reconnect_watcher()
assert succeed_adapter.connect_calls == [True], (
f"watcher must pass is_reconnect=True; got {succeed_adapter.connect_calls!r}"
)
assert Platform.TELEGRAM in runner.adapters
@pytest.mark.asyncio
async def test_cold_connect_defaults_to_is_reconnect_false(self):
"""The cold-start connect path (_connect_adapter_with_timeout with no
is_reconnect arg) must default to False so a first boot still drops any
stale queue (#46621)."""
runner = _make_runner()
adapter = StubAdapter(succeed=True)
success = await runner._connect_adapter_with_timeout(adapter, Platform.TELEGRAM)
assert success is True
assert adapter.connect_calls == [False], (
f"cold-start must default to is_reconnect=False; got {adapter.connect_calls!r}"
)
@pytest.mark.asyncio
async def test_reconnect_retries_resume_pending_for_platform(self):
"""A successful reconnect retries the startup auto-resume scoped to

View file

@ -103,7 +103,7 @@ class _CountingAdapter(BasePlatformAdapter):
self._fatal_retryable = fatal_retryable
self._raise_during_connect = raise_during_connect
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
if self._raise_during_connect:
raise RuntimeError("simulated connect exception")
if self._fatal_error:
@ -254,7 +254,7 @@ class TestReconnectFDLeakRegression:
Platform.TELEGRAM,
)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:

View file

@ -13,7 +13,7 @@ from gateway.platforms.base import BasePlatformAdapter, SendResult
class _MinAdapter(BasePlatformAdapter):
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:

View file

@ -27,7 +27,7 @@ class _StubAdapter(BasePlatformAdapter):
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:

View file

@ -22,7 +22,7 @@ from gateway.platforms.base import BasePlatformAdapter
class _MinAdapter(BasePlatformAdapter):
"""Smallest concrete adapter: implements exactly the abstract methods."""
async def connect(self): # pragma: no cover - not called
async def connect(self, *, is_reconnect: bool = False): # pragma: no cover - not called
return True
async def disconnect(self): # pragma: no cover - not called

View file

@ -41,7 +41,7 @@ class CleanupCaptureAdapter(BasePlatformAdapter):
self.edits = []
self.deleted = []
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:

View file

@ -28,7 +28,7 @@ class ProgressCaptureAdapter(BasePlatformAdapter):
self.edits = []
self.typing = []
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:

View file

@ -22,7 +22,7 @@ class ProgressCaptureAdapter(BasePlatformAdapter):
self.edits = []
self.typing = []
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:

View file

@ -11,7 +11,7 @@ class _FatalAdapter(BasePlatformAdapter):
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="token"), Platform.TELEGRAM)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
self._set_fatal_error(
"telegram_token_lock",
"Another local Hermes gateway is already using this Telegram bot token.",
@ -33,7 +33,7 @@ class _RuntimeRetryableAdapter(BasePlatformAdapter):
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="token"), Platform.WHATSAPP)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:

View file

@ -12,7 +12,7 @@ class _RetryableFailureAdapter(BasePlatformAdapter):
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="***"), Platform.TELEGRAM)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
self._set_fatal_error(
"telegram_connect_error",
"Telegram startup failed: temporary DNS resolution failure.",
@ -34,7 +34,7 @@ class _DisabledAdapter(BasePlatformAdapter):
def __init__(self):
super().__init__(PlatformConfig(enabled=False, token="***"), Platform.TELEGRAM)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
raise AssertionError("connect should not be called for disabled platforms")
async def disconnect(self) -> None:
@ -51,7 +51,7 @@ class _SuccessfulAdapter(BasePlatformAdapter):
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="***"), Platform.DISCORD)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:
@ -467,7 +467,7 @@ class _NonRetryableFailureAdapter(BasePlatformAdapter):
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="***"), Platform.DISCORD)
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
self._set_fatal_error(
"discord-bot-token_lock",
"Discord bot token already in use (PID 999). Stop the other gateway first.",

View file

@ -41,7 +41,7 @@ class _StubAdapter(BasePlatformAdapter):
self.sent_animations = []
self.sent_files = []
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
return True
async def disconnect(self):

View file

@ -35,7 +35,7 @@ class _StubAdapter(BasePlatformAdapter):
self._send_calls.append((chat_id, content))
return self._next_result()
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:

View file

@ -56,7 +56,7 @@ class HygieneCaptureAdapter(BasePlatformAdapter):
super().__init__(PlatformConfig(enabled=True, token="fake-token"), Platform.TELEGRAM)
self.sent = []
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:

View file

@ -37,7 +37,7 @@ from gateway.session import SessionSource, build_session_key
class _StubAdapter(BasePlatformAdapter):
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
pass
async def disconnect(self):

View file

@ -611,7 +611,7 @@ async def test_status_command_bypasses_active_session_guard():
class _ConcreteAdapter(BasePlatformAdapter):
platform = Platform.TELEGRAM
async def connect(self): pass
async def connect(self, *, is_reconnect: bool = False): pass
async def disconnect(self): pass
async def send(self, chat_id, content, **kwargs): pass
async def get_chat_info(self, chat_id): return {}
@ -692,7 +692,7 @@ async def test_post_delivery_callback_generation_snapshot_happens_after_bind():
class _ConcreteAdapter(BasePlatformAdapter):
platform = Platform.TELEGRAM
async def connect(self): pass
async def connect(self, *, is_reconnect: bool = False): pass
async def disconnect(self): pass
async def send(self, chat_id, content, **kwargs): pass
async def get_chat_info(self, chat_id): return {}

View file

@ -403,7 +403,7 @@ class TestBaseAdapterClarifyFallback:
# Skip base __init__ — we're not exercising it
self.sent: list = []
async def connect(self): pass
async def connect(self, *, is_reconnect: bool = False): pass
async def disconnect(self): pass
async def send(self, chat_id, content, **kw):
self.sent.append({"chat_id": chat_id, "content": content})
@ -436,7 +436,7 @@ class TestBaseAdapterClarifyFallback:
name = "stub"
def __init__(self):
self.sent: list = []
async def connect(self): pass
async def connect(self, *, is_reconnect: bool = False): pass
async def disconnect(self): pass
async def send(self, chat_id, content, **kw):
self.sent.append(content)

View file

@ -425,3 +425,67 @@ async def test_polling_conflict_reschedule_uses_running_loop(monkeypatch):
except (asyncio.CancelledError, Exception):
pass
await _cancel_heartbeat(adapter)
def _build_polling_app(monkeypatch):
"""Wire a mock PTB Application whose start_polling captures kwargs."""
captured = {}
async def fake_start_polling(**kwargs):
captured.update(kwargs)
updater = SimpleNamespace(
start_polling=AsyncMock(side_effect=fake_start_polling),
stop=AsyncMock(),
running=True,
)
bot = SimpleNamespace(set_my_commands=AsyncMock(), delete_webhook=AsyncMock())
app = SimpleNamespace(
bot=bot,
updater=updater,
add_handler=MagicMock(),
initialize=AsyncMock(),
start=AsyncMock(),
)
builder = MagicMock()
builder.token.return_value = builder
builder.request.return_value = builder
builder.get_updates_request.return_value = builder
builder.build.return_value = app
monkeypatch.setattr(
"plugins.platforms.telegram.adapter.Application",
SimpleNamespace(builder=MagicMock(return_value=builder)),
)
monkeypatch.setattr(
"gateway.status.acquire_scoped_lock",
lambda scope, identity, metadata=None: (True, None),
)
monkeypatch.setattr("asyncio.sleep", AsyncMock())
return captured
@pytest.mark.asyncio
async def test_cold_connect_drops_pending_updates(monkeypatch):
"""A cold first boot (is_reconnect=False) drops the stale Bot API queue."""
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
captured = _build_polling_app(monkeypatch)
ok = await adapter.connect() # default is_reconnect=False
assert ok is True
assert captured["drop_pending_updates"] is True
await _cancel_heartbeat(adapter)
@pytest.mark.asyncio
async def test_reconnect_preserves_pending_updates(monkeypatch):
"""A watcher reconnect (is_reconnect=True) preserves the queue Telegram
accumulated during the outage the core of #46621."""
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
captured = _build_polling_app(monkeypatch)
ok = await adapter.connect(is_reconnect=True)
assert ok is True
assert captured["drop_pending_updates"] is False
await _cancel_heartbeat(adapter)

View file

@ -1127,7 +1127,7 @@ async def test_base_send_image_fallback_preserves_metadata():
from gateway.platforms.base import BasePlatformAdapter
class _ConcreteBaseAdapter(BasePlatformAdapter):
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
return True
async def disconnect(self):

View file

@ -40,7 +40,7 @@ class _DummyAdapter(BasePlatformAdapter):
super().__init__(PlatformConfig(enabled=True, token="fake-token"), platform)
self.sent: list[dict] = []
async def connect(self) -> bool:
async def connect(self, *, is_reconnect: bool = False) -> bool:
return True
async def disconnect(self) -> None:

View file

@ -22,7 +22,7 @@ class _MediaRoutingAdapter(BasePlatformAdapter):
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM)
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
return True
async def disconnect(self):

View file

@ -871,7 +871,7 @@ class TestSendToPlatformChunking:
def __init__(self, _config):
self.connected = False
async def connect(self):
async def connect(self, *, is_reconnect: bool = False):
self.connected = True
calls.append(("connect",))
return True