diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 027a5a5306a..2e7361c18b4 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -4372,7 +4372,7 @@ class APIServerAdapter(BasePlatformAdapter): # BasePlatformAdapter interface # ------------------------------------------------------------------ - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Start the aiohttp web server.""" if not AIOHTTP_AVAILABLE: logger.warning("[%s] aiohttp not installed", self.name) diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 9971399c3d9..8dd9fc8fdd4 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -2634,10 +2634,21 @@ class BasePlatformAdapter(ABC): self._session_store = session_store @abstractmethod - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """ Connect to the platform and start receiving messages. - + + Args: + is_reconnect: False on a cold first boot (the gateway is + starting this platform for the first time); True when the + reconnect watcher is re-establishing a platform that was + previously running and dropped after an outage. Adapters + that buffer a server-side update queue (e.g. Telegram's Bot + API) should preserve that queue when ``is_reconnect`` is + True so messages sent during the outage are delivered rather + than silently discarded. Adapters with no such queue may + ignore the flag. + Returns True if connection was successful. """ pass diff --git a/gateway/platforms/bluebubbles.py b/gateway/platforms/bluebubbles.py index 31595b223b5..d4adbc73153 100644 --- a/gateway/platforms/bluebubbles.py +++ b/gateway/platforms/bluebubbles.py @@ -232,7 +232,7 @@ class BlueBubblesAdapter(BasePlatformAdapter): # Lifecycle # ------------------------------------------------------------------ - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: if not self.server_url or not self.password: logger.error( "[bluebubbles] BLUEBUBBLES_SERVER_URL and BLUEBUBBLES_PASSWORD are required" diff --git a/gateway/platforms/msgraph_webhook.py b/gateway/platforms/msgraph_webhook.py index d1d48996d73..88781d1f547 100644 --- a/gateway/platforms/msgraph_webhook.py +++ b/gateway/platforms/msgraph_webhook.py @@ -136,7 +136,7 @@ class MSGraphWebhookAdapter(BasePlatformAdapter): def _source_allowlist_required_but_missing(self) -> bool: return is_network_accessible(self._host) and not self._allowed_source_networks - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: if self._client_state is None: logger.error( "[msgraph_webhook] Refusing to start without extra.client_state configured" diff --git a/gateway/platforms/signal.py b/gateway/platforms/signal.py index f91dc96d60f..a3f6916707a 100644 --- a/gateway/platforms/signal.py +++ b/gateway/platforms/signal.py @@ -338,7 +338,7 @@ class SignalAdapter(BasePlatformAdapter): # Lifecycle # ------------------------------------------------------------------ - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to signal-cli daemon and start SSE listener.""" if not self.http_url or not self.account: logger.error("Signal: SIGNAL_HTTP_URL and SIGNAL_ACCOUNT are required") diff --git a/gateway/platforms/webhook.py b/gateway/platforms/webhook.py index d9f98282a8d..7c77a96b5b8 100644 --- a/gateway/platforms/webhook.py +++ b/gateway/platforms/webhook.py @@ -153,7 +153,7 @@ class WebhookAdapter(BasePlatformAdapter): # Lifecycle # ------------------------------------------------------------------ - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: # Load agent-created subscriptions before validating self._reload_dynamic_routes() diff --git a/gateway/platforms/weixin.py b/gateway/platforms/weixin.py index 4ce48719321..4e86dd0bfb9 100644 --- a/gateway/platforms/weixin.py +++ b/gateway/platforms/weixin.py @@ -1261,7 +1261,7 @@ class WeixinAdapter(BasePlatformAdapter): return [str(item).strip() for item in value if str(item).strip()] return [str(value).strip()] if str(value).strip() else [] - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: if not check_weixin_requirements(): message = "Weixin startup failed: aiohttp and cryptography are required" self._set_fatal_error("weixin_missing_dependency", message, retryable=False) diff --git a/gateway/platforms/whatsapp_cloud.py b/gateway/platforms/whatsapp_cloud.py index 126a79c86b8..c0bf600032c 100644 --- a/gateway/platforms/whatsapp_cloud.py +++ b/gateway/platforms/whatsapp_cloud.py @@ -347,7 +347,7 @@ class WhatsAppCloudAdapter(WhatsAppBehaviorMixin, BasePlatformAdapter): return super()._is_dm_allowed(sender_id) # ------------------------------------------------------------------ lifecycle - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: if not check_whatsapp_cloud_requirements(): self._set_fatal_error( "whatsapp_cloud_deps_missing", diff --git a/gateway/platforms/yuanbao.py b/gateway/platforms/yuanbao.py index ade1273c7f2..7f3b1f34e55 100644 --- a/gateway/platforms/yuanbao.py +++ b/gateway/platforms/yuanbao.py @@ -5114,7 +5114,7 @@ class YuanbaoAdapter(BasePlatformAdapter): """Yuanbao gates DM/group access at intake via dm_policy/group_policy.""" return True - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to Yuanbao WS gateway and authenticate. Delegates to ConnectionManager.open(). diff --git a/gateway/run.py b/gateway/run.py index 9d31cc2a60e..01429983e47 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3086,13 +3086,24 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew return max(0.0, timeout) return _PLATFORM_CONNECT_TIMEOUT_SECS_DEFAULT - async def _connect_adapter_with_timeout(self, adapter, platform) -> bool: - """Connect an adapter without allowing one platform to block others.""" + async def _connect_adapter_with_timeout( + self, adapter, platform, *, is_reconnect: bool = False + ) -> bool: + """Connect an adapter without allowing one platform to block others. + + ``is_reconnect`` is forwarded to ``adapter.connect()`` so platform + adapters can distinguish a cold first boot (drop any stale + server-side queue) from a watcher reconnect after a prolonged outage + (preserve the queue so messages sent during the outage are delivered + rather than silently dropped — #46621). + """ timeout = self._platform_connect_timeout_secs() if timeout <= 0: - return await adapter.connect() + return await adapter.connect(is_reconnect=is_reconnect) try: - return await asyncio.wait_for(adapter.connect(), timeout=timeout) + return await asyncio.wait_for( + adapter.connect(is_reconnect=is_reconnect), timeout=timeout + ) except asyncio.TimeoutError as exc: raise TimeoutError( f"{platform.value} connect timed out after {timeout:g}s" @@ -6718,7 +6729,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew adapter.set_topic_recovery_fn(self._recover_telegram_topic_thread_id) adapter._busy_text_mode = self._busy_text_mode - success = await self._connect_adapter_with_timeout(adapter, platform) + # Reconnect after an outage: preserve the platform's + # server-side update queue so messages sent while the bot + # was offline are delivered rather than dropped (#46621). + success = await self._connect_adapter_with_timeout( + adapter, platform, is_reconnect=True + ) if success: self.adapters[platform] = adapter self._sync_voice_mode_state_to_adapter(adapter) diff --git a/plugins/platforms/dingtalk/adapter.py b/plugins/platforms/dingtalk/adapter.py index 29abe98ecdf..8017589e350 100644 --- a/plugins/platforms/dingtalk/adapter.py +++ b/plugins/platforms/dingtalk/adapter.py @@ -239,7 +239,7 @@ class DingTalkAdapter(BasePlatformAdapter): # -- Connection lifecycle ----------------------------------------------- - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to DingTalk via Stream Mode.""" if not DINGTALK_STREAM_AVAILABLE: logger.warning( diff --git a/plugins/platforms/discord/adapter.py b/plugins/platforms/discord/adapter.py index 3c07f513b43..3d5369be522 100644 --- a/plugins/platforms/discord/adapter.py +++ b/plugins/platforms/discord/adapter.py @@ -859,7 +859,7 @@ class DiscordAdapter(BasePlatformAdapter): asyncio.create_task(_notify()) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to Discord and start receiving events.""" if not DISCORD_AVAILABLE: logger.error("[%s] discord.py not installed. Run: pip install discord.py", self.name) diff --git a/plugins/platforms/email/adapter.py b/plugins/platforms/email/adapter.py index db97dde9cd6..9521d586c6f 100644 --- a/plugins/platforms/email/adapter.py +++ b/plugins/platforms/email/adapter.py @@ -548,7 +548,7 @@ class EmailAdapter(BasePlatformAdapter): # Retry with IPv4 only. return _connect(ipv4_only=True) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to the IMAP server and start polling for new messages.""" # Validate up front so a missing host surfaces as an actionable config # error instead of IMAP4_SSL("") raising the cryptic diff --git a/plugins/platforms/feishu/adapter.py b/plugins/platforms/feishu/adapter.py index bf3c49d3b86..c3ce2c29431 100644 --- a/plugins/platforms/feishu/adapter.py +++ b/plugins/platforms/feishu/adapter.py @@ -1641,7 +1641,7 @@ class FeishuAdapter(BasePlatformAdapter): .build() ) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to Feishu/Lark.""" if not FEISHU_AVAILABLE: logger.error("[Feishu] lark-oapi not installed") diff --git a/plugins/platforms/google_chat/adapter.py b/plugins/platforms/google_chat/adapter.py index 6f738488123..5deb0e5af4c 100644 --- a/plugins/platforms/google_chat/adapter.py +++ b/plugins/platforms/google_chat/adapter.py @@ -761,7 +761,7 @@ class GoogleChatAdapter(BasePlatformAdapter): # ------------------------------------------------------------------ # Connection lifecycle # ------------------------------------------------------------------ - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Validate config, authenticate, start Pub/Sub pull, resolve bot id.""" # First call into the heavy google-cloud stack — trigger the lazy # import. ``_load_google_modules()`` is idempotent and rebinds the diff --git a/plugins/platforms/homeassistant/adapter.py b/plugins/platforms/homeassistant/adapter.py index 1baa3da75ad..6d59a30c0b4 100644 --- a/plugins/platforms/homeassistant/adapter.py +++ b/plugins/platforms/homeassistant/adapter.py @@ -98,7 +98,7 @@ class HomeAssistantAdapter(BasePlatformAdapter): # Connection lifecycle # ------------------------------------------------------------------ - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to HA WebSocket API and subscribe to events.""" if not AIOHTTP_AVAILABLE: logger.warning("[%s] aiohttp not installed. Run: pip install aiohttp", self.name) diff --git a/plugins/platforms/irc/adapter.py b/plugins/platforms/irc/adapter.py index 804e1dbc041..e78798adbe6 100644 --- a/plugins/platforms/irc/adapter.py +++ b/plugins/platforms/irc/adapter.py @@ -152,7 +152,7 @@ class IRCAdapter(BasePlatformAdapter): # ── Connection lifecycle ────────────────────────────────────────────── - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to the IRC server, register, and join the channel.""" if not self.server or not self.channel: logger.error("IRC: server and channel must be configured") diff --git a/plugins/platforms/line/adapter.py b/plugins/platforms/line/adapter.py index 130bb2e2c38..447cf5fb0c8 100644 --- a/plugins/platforms/line/adapter.py +++ b/plugins/platforms/line/adapter.py @@ -740,7 +740,7 @@ class LineAdapter(BasePlatformAdapter): # Connection lifecycle # ------------------------------------------------------------------ - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: if not self.channel_access_token or not self.channel_secret: self._set_fatal_error( "config_missing", diff --git a/plugins/platforms/matrix/adapter.py b/plugins/platforms/matrix/adapter.py index e8623d90a4a..86dc729bd2a 100644 --- a/plugins/platforms/matrix/adapter.py +++ b/plugins/platforms/matrix/adapter.py @@ -1135,7 +1135,7 @@ class MatrixAdapter(BasePlatformAdapter): # Required overrides # ------------------------------------------------------------------ - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to the Matrix homeserver and start syncing.""" from mautrix.api import HTTPAPI from mautrix.client import Client diff --git a/plugins/platforms/mattermost/adapter.py b/plugins/platforms/mattermost/adapter.py index d52beeb6f6f..7bdab48bb66 100644 --- a/plugins/platforms/mattermost/adapter.py +++ b/plugins/platforms/mattermost/adapter.py @@ -256,7 +256,7 @@ class MattermostAdapter(BasePlatformAdapter): # Required overrides # ------------------------------------------------------------------ - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to Mattermost and start the WebSocket listener.""" import aiohttp diff --git a/plugins/platforms/ntfy/adapter.py b/plugins/platforms/ntfy/adapter.py index 4ab46cecfb2..88741aa62f5 100644 --- a/plugins/platforms/ntfy/adapter.py +++ b/plugins/platforms/ntfy/adapter.py @@ -183,7 +183,7 @@ class NtfyAdapter(BasePlatformAdapter): # -- Connection lifecycle ----------------------------------------------- - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to ntfy by starting the streaming subscription task.""" if not HTTPX_AVAILABLE: logger.warning("[%s] httpx not installed. Run: pip install httpx", self.name) diff --git a/plugins/platforms/photon/adapter.py b/plugins/platforms/photon/adapter.py index 5a3a160c0fc..9bec435bfec 100644 --- a/plugins/platforms/photon/adapter.py +++ b/plugins/platforms/photon/adapter.py @@ -334,7 +334,7 @@ class PhotonAdapter(BasePlatformAdapter): # -- Connection lifecycle --------------------------------------------- - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: if not HTTPX_AVAILABLE: self._set_fatal_error( "MISSING_DEP", "httpx not installed", retryable=False diff --git a/plugins/platforms/raft/adapter.py b/plugins/platforms/raft/adapter.py index 7f65fa233c2..0a8b1a359b0 100644 --- a/plugins/platforms/raft/adapter.py +++ b/plugins/platforms/raft/adapter.py @@ -468,7 +468,7 @@ class RaftAdapter(BasePlatformAdapter): def runtime_session(self) -> str: return self._runtime_session - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: if not self._bridge_token: self._bridge_token = secrets.token_hex(32) logger.info("[raft] Auto-generated bridge token") diff --git a/plugins/platforms/simplex/adapter.py b/plugins/platforms/simplex/adapter.py index 0b241d588f3..ae4c6be34b6 100644 --- a/plugins/platforms/simplex/adapter.py +++ b/plugins/platforms/simplex/adapter.py @@ -208,7 +208,7 @@ class SimplexAdapter(BasePlatformAdapter): # Lifecycle # ------------------------------------------------------------------ - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to the simplex-chat daemon and start the WebSocket listener.""" try: import websockets # noqa: F401 diff --git a/plugins/platforms/slack/adapter.py b/plugins/platforms/slack/adapter.py index 5ef300b086f..60f510d7070 100644 --- a/plugins/platforms/slack/adapter.py +++ b/plugins/platforms/slack/adapter.py @@ -829,7 +829,7 @@ class SlackAdapter(BasePlatformAdapter): # Non-fatal — the user saw the initial ack already. return SendResult(success=True, message_id=None) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to Slack via Socket Mode.""" if not SLACK_AVAILABLE: logger.error( diff --git a/plugins/platforms/sms/adapter.py b/plugins/platforms/sms/adapter.py index a1edffb8e16..e40ec0ac39a 100644 --- a/plugins/platforms/sms/adapter.py +++ b/plugins/platforms/sms/adapter.py @@ -86,7 +86,7 @@ class SmsAdapter(BasePlatformAdapter): # Required abstract methods # ------------------------------------------------------------------ - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: import aiohttp from aiohttp import web diff --git a/plugins/platforms/teams/adapter.py b/plugins/platforms/teams/adapter.py index fdd0905e7f1..bc204b98b38 100644 --- a/plugins/platforms/teams/adapter.py +++ b/plugins/platforms/teams/adapter.py @@ -709,7 +709,7 @@ class TeamsAdapter(BasePlatformAdapter): # Used to send cards with the correct conversation type (personal/group/channel). self._conv_refs: Dict[str, Any] = {} - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: # Lazy-install the Teams SDK on demand (parity with Slack/Discord/etc.), # then re-check the module globals it rebinds. check_teams_requirements() diff --git a/plugins/platforms/telegram/adapter.py b/plugins/platforms/telegram/adapter.py index 00aff1ed815..8ae46cdd358 100644 --- a/plugins/platforms/telegram/adapter.py +++ b/plugins/platforms/telegram/adapter.py @@ -2232,7 +2232,7 @@ class TelegramAdapter(BasePlatformAdapter): self.name, topic_name, seed_err, ) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to Telegram via polling or webhook. By default, uses long polling (outbound connection to Telegram). @@ -2240,6 +2240,14 @@ class TelegramAdapter(BasePlatformAdapter): instead. Webhook mode is useful for cloud deployments (Fly.io, Railway) where inbound HTTP can wake a suspended machine. + ``is_reconnect`` distinguishes a cold first boot (False — drop any + stale Bot API queue) from a watcher reconnect after a prolonged + outage (True — preserve the updates Telegram queued while the bot + was offline, otherwise every message sent during the outage is + silently lost). The in-process network-error ladder and the + 409-conflict handler already pass ``drop_pending_updates=False`` + for the same reason; bootstrap follows suit on the reconnect path. + Env vars for webhook mode:: TELEGRAM_WEBHOOK_URL Public HTTPS URL (e.g. https://app.fly.dev/telegram) @@ -2476,7 +2484,11 @@ class TelegramAdapter(BasePlatformAdapter): webhook_url=webhook_url, secret_token=webhook_secret, allowed_updates=Update.ALL_TYPES, - drop_pending_updates=True, + # Webhooks are push-based — Telegram does not hold a + # server-side getUpdates queue, so this flag is a no-op + # in practice. Mirror the polling path's reconnect + # semantics for consistency. + drop_pending_updates=not is_reconnect, ) self._webhook_mode = True logger.info( @@ -2509,7 +2521,10 @@ class TelegramAdapter(BasePlatformAdapter): await self._app.updater.start_polling( allowed_updates=Update.ALL_TYPES, - drop_pending_updates=True, + # On a cold first boot drop the stale Bot API queue; on a + # watcher reconnect after an outage preserve it so messages + # sent while the bot was offline are delivered (#46621). + drop_pending_updates=not is_reconnect, error_callback=_polling_error_callback, ) diff --git a/plugins/platforms/wecom/adapter.py b/plugins/platforms/wecom/adapter.py index 0d3fe1da3df..7d809c19a2f 100644 --- a/plugins/platforms/wecom/adapter.py +++ b/plugins/platforms/wecom/adapter.py @@ -198,7 +198,7 @@ class WeComAdapter(BasePlatformAdapter): # Connection lifecycle # ------------------------------------------------------------------ - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """Connect to the WeCom AI Bot gateway.""" if not AIOHTTP_AVAILABLE: message = "WeCom startup failed: aiohttp not installed" diff --git a/plugins/platforms/whatsapp/adapter.py b/plugins/platforms/whatsapp/adapter.py index 5c3d6bbb823..05daeabeed8 100644 --- a/plugins/platforms/whatsapp/adapter.py +++ b/plugins/platforms/whatsapp/adapter.py @@ -411,7 +411,7 @@ class WhatsAppAdapter(WhatsAppBehaviorMixin, BasePlatformAdapter): return float(default) return parsed - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: """ Start the WhatsApp bridge. diff --git a/tests/gateway/relay/stub_connector.py b/tests/gateway/relay/stub_connector.py index e309750d5e8..2d9baab3e41 100644 --- a/tests/gateway/relay/stub_connector.py +++ b/tests/gateway/relay/stub_connector.py @@ -40,7 +40,7 @@ class StubConnector: # absent/expired capability or a tenant mismatch on the connector side. self.next_follow_up_result: Dict[str, Any] = {"success": True, "message_id": "f1"} - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: self.connected = True return True diff --git a/tests/gateway/relay/test_relay_going_idle.py b/tests/gateway/relay/test_relay_going_idle.py index 05ed8190734..d2895a87b6b 100644 --- a/tests/gateway/relay/test_relay_going_idle.py +++ b/tests/gateway/relay/test_relay_going_idle.py @@ -422,7 +422,7 @@ async def test_adapter_go_dormant_noop_on_stub_transport(): ) class _StubTransport: - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): return True def set_inbound_handler(self, h): diff --git a/tests/gateway/restart_test_helpers.py b/tests/gateway/restart_test_helpers.py index 77c56ec40eb..eac9c44c651 100644 --- a/tests/gateway/restart_test_helpers.py +++ b/tests/gateway/restart_test_helpers.py @@ -15,7 +15,7 @@ class RestartTestAdapter(BasePlatformAdapter): self.sent: list[str] = [] self.sent_calls: list[tuple[str, str, object]] = [] - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): return True async def disconnect(self): diff --git a/tests/gateway/test_active_session_text_merge.py b/tests/gateway/test_active_session_text_merge.py index 16d40815ba2..88f1b6cfeb8 100644 --- a/tests/gateway/test_active_session_text_merge.py +++ b/tests/gateway/test_active_session_text_merge.py @@ -65,7 +65,7 @@ def _make_event( class _DummyAdapter(BasePlatformAdapter): # type: ignore[misc] - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): pass async def disconnect(self): diff --git a/tests/gateway/test_base_topic_sessions.py b/tests/gateway/test_base_topic_sessions.py index dd2ef3a1262..4de540b49d1 100644 --- a/tests/gateway/test_base_topic_sessions.py +++ b/tests/gateway/test_base_topic_sessions.py @@ -20,7 +20,7 @@ class DummyTelegramAdapter(BasePlatformAdapter): self.typing = [] self.processing_hooks = [] - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: diff --git a/tests/gateway/test_cancel_background_drain.py b/tests/gateway/test_cancel_background_drain.py index c95fdc062eb..8f8c7694cde 100644 --- a/tests/gateway/test_cancel_background_drain.py +++ b/tests/gateway/test_cancel_background_drain.py @@ -19,7 +19,7 @@ from gateway.session import SessionSource, build_session_key class _StubAdapter(BasePlatformAdapter): - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): pass async def disconnect(self): diff --git a/tests/gateway/test_command_bypass_active_session.py b/tests/gateway/test_command_bypass_active_session.py index e5e8a4fa469..b741f667edd 100644 --- a/tests/gateway/test_command_bypass_active_session.py +++ b/tests/gateway/test_command_bypass_active_session.py @@ -29,7 +29,7 @@ from gateway.session import SessionSource, build_session_key class _StubAdapter(BasePlatformAdapter): """Concrete adapter with abstract methods stubbed out.""" - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): pass async def disconnect(self): diff --git a/tests/gateway/test_duplicate_reply_suppression.py b/tests/gateway/test_duplicate_reply_suppression.py index c7c047fdb65..8d1a36e978e 100644 --- a/tests/gateway/test_duplicate_reply_suppression.py +++ b/tests/gateway/test_duplicate_reply_suppression.py @@ -37,7 +37,7 @@ class StubAdapter(BasePlatformAdapter): super().__init__(PlatformConfig(enabled=True, token="fake"), Platform.DISCORD) self.sent = [] - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): return True async def disconnect(self): diff --git a/tests/gateway/test_ephemeral_reply.py b/tests/gateway/test_ephemeral_reply.py index 61b70748e16..1ed1237f2cc 100644 --- a/tests/gateway/test_ephemeral_reply.py +++ b/tests/gateway/test_ephemeral_reply.py @@ -39,7 +39,7 @@ from gateway.session import SessionSource class _NoDeleteAdapter(BasePlatformAdapter): """Adapter that does NOT override delete_message (silent degrade).""" - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): pass async def disconnect(self): @@ -59,7 +59,7 @@ class _DeleteCapableAdapter(BasePlatformAdapter): super().__init__(*a, **kw) self.deleted: list[tuple[str, str]] = [] - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): pass async def disconnect(self): diff --git a/tests/gateway/test_interrupt_key_match.py b/tests/gateway/test_interrupt_key_match.py index 3a703c0261d..5e206e6cf43 100644 --- a/tests/gateway/test_interrupt_key_match.py +++ b/tests/gateway/test_interrupt_key_match.py @@ -21,7 +21,7 @@ class StubAdapter(BasePlatformAdapter): def __init__(self): super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM) - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): return True async def disconnect(self): diff --git a/tests/gateway/test_keep_typing_timeout.py b/tests/gateway/test_keep_typing_timeout.py index 6d6a1624ca5..e303f103495 100644 --- a/tests/gateway/test_keep_typing_timeout.py +++ b/tests/gateway/test_keep_typing_timeout.py @@ -38,7 +38,7 @@ class _StubAdapter(BasePlatformAdapter): def __init__(self): super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: diff --git a/tests/gateway/test_pending_drain_no_recursion.py b/tests/gateway/test_pending_drain_no_recursion.py index b7569b8d02b..35a1348e50f 100644 --- a/tests/gateway/test_pending_drain_no_recursion.py +++ b/tests/gateway/test_pending_drain_no_recursion.py @@ -35,7 +35,7 @@ from gateway.session import SessionSource, build_session_key class _StubAdapter(BasePlatformAdapter): - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): pass async def disconnect(self): diff --git a/tests/gateway/test_pending_drain_race.py b/tests/gateway/test_pending_drain_race.py index 810d52e9e2a..0d46fb6bd6d 100644 --- a/tests/gateway/test_pending_drain_race.py +++ b/tests/gateway/test_pending_drain_race.py @@ -36,7 +36,7 @@ from gateway.session import SessionSource, build_session_key class _StubAdapter(BasePlatformAdapter): - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): pass async def disconnect(self): diff --git a/tests/gateway/test_platform_base.py b/tests/gateway/test_platform_base.py index 60b69e000be..c2b7a2c8749 100644 --- a/tests/gateway/test_platform_base.py +++ b/tests/gateway/test_platform_base.py @@ -1276,7 +1276,7 @@ class TestTruncateMessage: """Create a minimal adapter instance for testing static/instance methods.""" class StubAdapter(BasePlatformAdapter): - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): return True async def disconnect(self): diff --git a/tests/gateway/test_platform_reconnect.py b/tests/gateway/test_platform_reconnect.py index 80835f2146b..2f8a0c98943 100644 --- a/tests/gateway/test_platform_reconnect.py +++ b/tests/gateway/test_platform_reconnect.py @@ -26,8 +26,12 @@ class StubAdapter(BasePlatformAdapter): self._succeed = succeed self._fatal_error = fatal_error self._fatal_retryable = fatal_retryable + # Records the is_reconnect value of every connect() call so tests can + # assert that the watcher distinguishes reconnect from cold boot (#46621). + self.connect_calls: list[bool] = [] - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): + self.connect_calls.append(is_reconnect) if self._fatal_error: self._set_fatal_error("test_error", self._fatal_error, retryable=self._fatal_retryable) return False @@ -140,7 +144,7 @@ class TestStartupPlatformIsolation: runner = _make_runner() adapter = StubAdapter() - async def hang(): + async def hang(*, is_reconnect: bool = False): await asyncio.sleep(60) return True @@ -217,6 +221,59 @@ class TestPlatformReconnectWatcher: assert Platform.TELEGRAM not in runner._failed_platforms assert Platform.TELEGRAM in runner.adapters + @pytest.mark.asyncio + async def test_reconnect_passes_is_reconnect_true(self): + """The watcher must connect with is_reconnect=True so adapters preserve + their server-side update queue across an outage (#46621). Without this, + bootstrap start_polling(drop_pending_updates=True) silently dropped every + message queued while the bot was offline.""" + runner = _make_runner() + runner._sync_voice_mode_state_to_adapter = MagicMock() + + runner._failed_platforms[Platform.TELEGRAM] = { + "config": PlatformConfig(enabled=True, token="test"), + "attempts": 1, + "next_retry": time.monotonic() - 1, + } + + succeed_adapter = StubAdapter(succeed=True) + real_sleep = asyncio.sleep + + with patch.object(runner, "_create_adapter", return_value=succeed_adapter): + with patch("gateway.run.build_channel_directory", create=True): + runner._running = True + call_count = 0 + + async def fake_sleep(n): + nonlocal call_count + call_count += 1 + if call_count > 1: + runner._running = False + await real_sleep(0) + + with patch("asyncio.sleep", side_effect=fake_sleep): + await runner._platform_reconnect_watcher() + + assert succeed_adapter.connect_calls == [True], ( + f"watcher must pass is_reconnect=True; got {succeed_adapter.connect_calls!r}" + ) + assert Platform.TELEGRAM in runner.adapters + + @pytest.mark.asyncio + async def test_cold_connect_defaults_to_is_reconnect_false(self): + """The cold-start connect path (_connect_adapter_with_timeout with no + is_reconnect arg) must default to False so a first boot still drops any + stale queue (#46621).""" + runner = _make_runner() + adapter = StubAdapter(succeed=True) + + success = await runner._connect_adapter_with_timeout(adapter, Platform.TELEGRAM) + + assert success is True + assert adapter.connect_calls == [False], ( + f"cold-start must default to is_reconnect=False; got {adapter.connect_calls!r}" + ) + @pytest.mark.asyncio async def test_reconnect_retries_resume_pending_for_platform(self): """A successful reconnect retries the startup auto-resume scoped to diff --git a/tests/gateway/test_platform_reconnect_fd_leak.py b/tests/gateway/test_platform_reconnect_fd_leak.py index 221cf55ee27..bc31a9fc010 100644 --- a/tests/gateway/test_platform_reconnect_fd_leak.py +++ b/tests/gateway/test_platform_reconnect_fd_leak.py @@ -103,7 +103,7 @@ class _CountingAdapter(BasePlatformAdapter): self._fatal_retryable = fatal_retryable self._raise_during_connect = raise_during_connect - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: if self._raise_during_connect: raise RuntimeError("simulated connect exception") if self._fatal_error: @@ -254,7 +254,7 @@ class TestReconnectFDLeakRegression: Platform.TELEGRAM, ) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: diff --git a/tests/gateway/test_post_delivery_callback_chaining.py b/tests/gateway/test_post_delivery_callback_chaining.py index 38c1978f0fc..db0ddb3577f 100644 --- a/tests/gateway/test_post_delivery_callback_chaining.py +++ b/tests/gateway/test_post_delivery_callback_chaining.py @@ -13,7 +13,7 @@ from gateway.platforms.base import BasePlatformAdapter, SendResult class _MinAdapter(BasePlatformAdapter): - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: diff --git a/tests/gateway/test_queue_consumption.py b/tests/gateway/test_queue_consumption.py index 792d7b7ea52..ec1b0dedc83 100644 --- a/tests/gateway/test_queue_consumption.py +++ b/tests/gateway/test_queue_consumption.py @@ -27,7 +27,7 @@ class _StubAdapter(BasePlatformAdapter): def __init__(self): super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: diff --git a/tests/gateway/test_relay_capability_surface.py b/tests/gateway/test_relay_capability_surface.py index da36f0ac4f8..ad1e7b45da0 100644 --- a/tests/gateway/test_relay_capability_surface.py +++ b/tests/gateway/test_relay_capability_surface.py @@ -22,7 +22,7 @@ from gateway.platforms.base import BasePlatformAdapter class _MinAdapter(BasePlatformAdapter): """Smallest concrete adapter: implements exactly the abstract methods.""" - async def connect(self): # pragma: no cover - not called + async def connect(self, *, is_reconnect: bool = False): # pragma: no cover - not called return True async def disconnect(self): # pragma: no cover - not called diff --git a/tests/gateway/test_run_cleanup_progress.py b/tests/gateway/test_run_cleanup_progress.py index dfb5ef03342..466f83f5dc1 100644 --- a/tests/gateway/test_run_cleanup_progress.py +++ b/tests/gateway/test_run_cleanup_progress.py @@ -41,7 +41,7 @@ class CleanupCaptureAdapter(BasePlatformAdapter): self.edits = [] self.deleted = [] - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: diff --git a/tests/gateway/test_run_progress_interrupt.py b/tests/gateway/test_run_progress_interrupt.py index cc25b8db868..de047e0fe29 100644 --- a/tests/gateway/test_run_progress_interrupt.py +++ b/tests/gateway/test_run_progress_interrupt.py @@ -28,7 +28,7 @@ class ProgressCaptureAdapter(BasePlatformAdapter): self.edits = [] self.typing = [] - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: diff --git a/tests/gateway/test_run_progress_topics.py b/tests/gateway/test_run_progress_topics.py index c91b5a0a4c9..fed22fa7782 100644 --- a/tests/gateway/test_run_progress_topics.py +++ b/tests/gateway/test_run_progress_topics.py @@ -22,7 +22,7 @@ class ProgressCaptureAdapter(BasePlatformAdapter): self.edits = [] self.typing = [] - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: diff --git a/tests/gateway/test_runner_fatal_adapter.py b/tests/gateway/test_runner_fatal_adapter.py index 706514f1ae6..7e7739582d1 100644 --- a/tests/gateway/test_runner_fatal_adapter.py +++ b/tests/gateway/test_runner_fatal_adapter.py @@ -11,7 +11,7 @@ class _FatalAdapter(BasePlatformAdapter): def __init__(self): super().__init__(PlatformConfig(enabled=True, token="token"), Platform.TELEGRAM) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: self._set_fatal_error( "telegram_token_lock", "Another local Hermes gateway is already using this Telegram bot token.", @@ -33,7 +33,7 @@ class _RuntimeRetryableAdapter(BasePlatformAdapter): def __init__(self): super().__init__(PlatformConfig(enabled=True, token="token"), Platform.WHATSAPP) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: diff --git a/tests/gateway/test_runner_startup_failures.py b/tests/gateway/test_runner_startup_failures.py index 12aa5c4a3d8..4a1343ae768 100644 --- a/tests/gateway/test_runner_startup_failures.py +++ b/tests/gateway/test_runner_startup_failures.py @@ -12,7 +12,7 @@ class _RetryableFailureAdapter(BasePlatformAdapter): def __init__(self): super().__init__(PlatformConfig(enabled=True, token="***"), Platform.TELEGRAM) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: self._set_fatal_error( "telegram_connect_error", "Telegram startup failed: temporary DNS resolution failure.", @@ -34,7 +34,7 @@ class _DisabledAdapter(BasePlatformAdapter): def __init__(self): super().__init__(PlatformConfig(enabled=False, token="***"), Platform.TELEGRAM) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: raise AssertionError("connect should not be called for disabled platforms") async def disconnect(self) -> None: @@ -51,7 +51,7 @@ class _SuccessfulAdapter(BasePlatformAdapter): def __init__(self): super().__init__(PlatformConfig(enabled=True, token="***"), Platform.DISCORD) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: @@ -467,7 +467,7 @@ class _NonRetryableFailureAdapter(BasePlatformAdapter): def __init__(self): super().__init__(PlatformConfig(enabled=True, token="***"), Platform.DISCORD) - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: self._set_fatal_error( "discord-bot-token_lock", "Discord bot token already in use (PID 999). Stop the other gateway first.", diff --git a/tests/gateway/test_send_multiple_images.py b/tests/gateway/test_send_multiple_images.py index 590a763acc3..77acf205296 100644 --- a/tests/gateway/test_send_multiple_images.py +++ b/tests/gateway/test_send_multiple_images.py @@ -41,7 +41,7 @@ class _StubAdapter(BasePlatformAdapter): self.sent_animations = [] self.sent_files = [] - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): return True async def disconnect(self): diff --git a/tests/gateway/test_send_retry.py b/tests/gateway/test_send_retry.py index c57a7712e3d..75de6cd88eb 100644 --- a/tests/gateway/test_send_retry.py +++ b/tests/gateway/test_send_retry.py @@ -35,7 +35,7 @@ class _StubAdapter(BasePlatformAdapter): self._send_calls.append((chat_id, content)) return self._next_result() - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: diff --git a/tests/gateway/test_session_hygiene.py b/tests/gateway/test_session_hygiene.py index ccec24db53c..10bcf00cede 100644 --- a/tests/gateway/test_session_hygiene.py +++ b/tests/gateway/test_session_hygiene.py @@ -56,7 +56,7 @@ class HygieneCaptureAdapter(BasePlatformAdapter): super().__init__(PlatformConfig(enabled=True, token="fake-token"), Platform.TELEGRAM) self.sent = [] - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: diff --git a/tests/gateway/test_session_split_brain_11016.py b/tests/gateway/test_session_split_brain_11016.py index 4a00f31b138..b402f2aa529 100644 --- a/tests/gateway/test_session_split_brain_11016.py +++ b/tests/gateway/test_session_split_brain_11016.py @@ -37,7 +37,7 @@ from gateway.session import SessionSource, build_session_key class _StubAdapter(BasePlatformAdapter): - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): pass async def disconnect(self): diff --git a/tests/gateway/test_status_command.py b/tests/gateway/test_status_command.py index f02738b51f2..39ea4e3ff13 100644 --- a/tests/gateway/test_status_command.py +++ b/tests/gateway/test_status_command.py @@ -611,7 +611,7 @@ async def test_status_command_bypasses_active_session_guard(): class _ConcreteAdapter(BasePlatformAdapter): platform = Platform.TELEGRAM - async def connect(self): pass + async def connect(self, *, is_reconnect: bool = False): pass async def disconnect(self): pass async def send(self, chat_id, content, **kwargs): pass async def get_chat_info(self, chat_id): return {} @@ -692,7 +692,7 @@ async def test_post_delivery_callback_generation_snapshot_happens_after_bind(): class _ConcreteAdapter(BasePlatformAdapter): platform = Platform.TELEGRAM - async def connect(self): pass + async def connect(self, *, is_reconnect: bool = False): pass async def disconnect(self): pass async def send(self, chat_id, content, **kwargs): pass async def get_chat_info(self, chat_id): return {} diff --git a/tests/gateway/test_telegram_clarify_buttons.py b/tests/gateway/test_telegram_clarify_buttons.py index 81cb5c97ac5..70a06087a44 100644 --- a/tests/gateway/test_telegram_clarify_buttons.py +++ b/tests/gateway/test_telegram_clarify_buttons.py @@ -403,7 +403,7 @@ class TestBaseAdapterClarifyFallback: # Skip base __init__ — we're not exercising it self.sent: list = [] - async def connect(self): pass + async def connect(self, *, is_reconnect: bool = False): pass async def disconnect(self): pass async def send(self, chat_id, content, **kw): self.sent.append({"chat_id": chat_id, "content": content}) @@ -436,7 +436,7 @@ class TestBaseAdapterClarifyFallback: name = "stub" def __init__(self): self.sent: list = [] - async def connect(self): pass + async def connect(self, *, is_reconnect: bool = False): pass async def disconnect(self): pass async def send(self, chat_id, content, **kw): self.sent.append(content) diff --git a/tests/gateway/test_telegram_conflict.py b/tests/gateway/test_telegram_conflict.py index 31137212d74..def45a822d7 100644 --- a/tests/gateway/test_telegram_conflict.py +++ b/tests/gateway/test_telegram_conflict.py @@ -425,3 +425,67 @@ async def test_polling_conflict_reschedule_uses_running_loop(monkeypatch): except (asyncio.CancelledError, Exception): pass await _cancel_heartbeat(adapter) + + +def _build_polling_app(monkeypatch): + """Wire a mock PTB Application whose start_polling captures kwargs.""" + captured = {} + + async def fake_start_polling(**kwargs): + captured.update(kwargs) + + updater = SimpleNamespace( + start_polling=AsyncMock(side_effect=fake_start_polling), + stop=AsyncMock(), + running=True, + ) + bot = SimpleNamespace(set_my_commands=AsyncMock(), delete_webhook=AsyncMock()) + app = SimpleNamespace( + bot=bot, + updater=updater, + add_handler=MagicMock(), + initialize=AsyncMock(), + start=AsyncMock(), + ) + builder = MagicMock() + builder.token.return_value = builder + builder.request.return_value = builder + builder.get_updates_request.return_value = builder + builder.build.return_value = app + monkeypatch.setattr( + "plugins.platforms.telegram.adapter.Application", + SimpleNamespace(builder=MagicMock(return_value=builder)), + ) + monkeypatch.setattr( + "gateway.status.acquire_scoped_lock", + lambda scope, identity, metadata=None: (True, None), + ) + monkeypatch.setattr("asyncio.sleep", AsyncMock()) + return captured + + +@pytest.mark.asyncio +async def test_cold_connect_drops_pending_updates(monkeypatch): + """A cold first boot (is_reconnect=False) drops the stale Bot API queue.""" + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***")) + captured = _build_polling_app(monkeypatch) + + ok = await adapter.connect() # default is_reconnect=False + + assert ok is True + assert captured["drop_pending_updates"] is True + await _cancel_heartbeat(adapter) + + +@pytest.mark.asyncio +async def test_reconnect_preserves_pending_updates(monkeypatch): + """A watcher reconnect (is_reconnect=True) preserves the queue Telegram + accumulated during the outage — the core of #46621.""" + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***")) + captured = _build_polling_app(monkeypatch) + + ok = await adapter.connect(is_reconnect=True) + + assert ok is True + assert captured["drop_pending_updates"] is False + await _cancel_heartbeat(adapter) diff --git a/tests/gateway/test_telegram_thread_fallback.py b/tests/gateway/test_telegram_thread_fallback.py index 20b38a7cbe4..719b1367846 100644 --- a/tests/gateway/test_telegram_thread_fallback.py +++ b/tests/gateway/test_telegram_thread_fallback.py @@ -1127,7 +1127,7 @@ async def test_base_send_image_fallback_preserves_metadata(): from gateway.platforms.base import BasePlatformAdapter class _ConcreteBaseAdapter(BasePlatformAdapter): - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): return True async def disconnect(self): diff --git a/tests/gateway/test_tool_response_drop_recovery.py b/tests/gateway/test_tool_response_drop_recovery.py index b4a5a1063ab..f39362b30c8 100644 --- a/tests/gateway/test_tool_response_drop_recovery.py +++ b/tests/gateway/test_tool_response_drop_recovery.py @@ -40,7 +40,7 @@ class _DummyAdapter(BasePlatformAdapter): super().__init__(PlatformConfig(enabled=True, token="fake-token"), platform) self.sent: list[dict] = [] - async def connect(self) -> bool: + async def connect(self, *, is_reconnect: bool = False) -> bool: return True async def disconnect(self) -> None: diff --git a/tests/gateway/test_tts_media_routing.py b/tests/gateway/test_tts_media_routing.py index 016be97ea27..e152b99c274 100644 --- a/tests/gateway/test_tts_media_routing.py +++ b/tests/gateway/test_tts_media_routing.py @@ -22,7 +22,7 @@ class _MediaRoutingAdapter(BasePlatformAdapter): def __init__(self): super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM) - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): return True async def disconnect(self): diff --git a/tests/tools/test_send_message_tool.py b/tests/tools/test_send_message_tool.py index dcdb8f83266..d9c0b64b7ab 100644 --- a/tests/tools/test_send_message_tool.py +++ b/tests/tools/test_send_message_tool.py @@ -871,7 +871,7 @@ class TestSendToPlatformChunking: def __init__(self, _config): self.connected = False - async def connect(self): + async def connect(self, *, is_reconnect: bool = False): self.connected = True calls.append(("connect",)) return True