diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 4fbd78cf2e4..af66248d0e7 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -886,60 +886,107 @@ class TelegramAdapter(BasePlatformAdapter): async def _handle_polling_conflict(self, error: Exception) -> None: if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict": return - # Track consecutive conflicts — transient 409s can occur when a - # previous gateway instance hasn't fully released its long-poll - # session on Telegram's server (e.g. during --replace handoffs or - # systemd Restart=on-failure respawns). Retry a few times before - # giving up, so the old session has time to expire. + # Transient 409 Conflict errors arise when the previous gateway process + # has been killed (e.g. during `hermes update` or `--replace` handoffs) + # but its long-poll connection hasn't yet expired on Telegram's servers. + # Telegram holds open getUpdates sessions for up to ~30s after the + # client disconnects, so a new gateway starting immediately will receive + # a 409 until that server-side session expires. + # + # Strategy: stop the local updater, wait long enough for Telegram's + # server-side session to expire (RETRY_DELAY grows with each attempt), + # drain the connection pool, then restart polling. We attempt this + # MAX_CONFLICT_RETRIES times before declaring a fatal error. + # + # Crucially, a failed retry must NOT leave polling in an ambiguous + # state. If start_polling() raises, the updater is neither running + # nor fatal — messages are silently dropped. We schedule another + # retry attempt instead of returning silently, and only escalate to + # fatal after all retries are exhausted. self._polling_conflict_count += 1 - MAX_CONFLICT_RETRIES = 3 - RETRY_DELAY = 10 # seconds + MAX_CONFLICT_RETRIES = 5 + # Delay grows with each attempt: 15s, 25s, 35s, 45s, 55s. + # Telegram server-side getUpdates sessions typically expire within + # 30s; the increasing back-off ensures we clear that window without + # hammering the API on fast-restart loops. + RETRY_DELAY = 10 + (self._polling_conflict_count * 10) # seconds if self._polling_conflict_count <= MAX_CONFLICT_RETRIES: logger.warning( - "[%s] Telegram polling conflict (%d/%d), will retry in %ds. Error: %s", + "[%s] Telegram polling conflict (%d/%d) — previous session still " + "held open on Telegram's servers. Waiting %ds for it to expire. " + "Error: %s", self.name, self._polling_conflict_count, MAX_CONFLICT_RETRIES, RETRY_DELAY, error, ) + # Stop the local updater cleanly before sleeping. If it's already + # stopped (e.g. PTB raised before updater.running was set) this is + # a no-op. try: if self._app and self._app.updater and self._app.updater.running: await self._app.updater.stop() except Exception: pass + await asyncio.sleep(RETRY_DELAY) await self._drain_polling_connections() + try: await self._app.updater.start_polling( allowed_updates=Update.ALL_TYPES, drop_pending_updates=False, error_callback=self._polling_error_callback_ref, ) - logger.info("[%s] Telegram polling resumed after conflict retry %d", self.name, self._polling_conflict_count) - self._polling_conflict_count = 0 # reset on success + logger.info( + "[%s] Telegram polling resumed after conflict retry %d/%d", + self.name, self._polling_conflict_count, MAX_CONFLICT_RETRIES, + ) + self._polling_conflict_count = 0 # reset counter on success return except Exception as retry_err: - logger.warning("[%s] Telegram polling retry failed: %s", self.name, retry_err) - # Don't fall through to fatal yet — wait for the next conflict - # to trigger another retry attempt (up to MAX_CONFLICT_RETRIES). - return + logger.warning( + "[%s] Telegram polling retry %d/%d failed: %s. " + "Scheduling next attempt.", + self.name, self._polling_conflict_count, MAX_CONFLICT_RETRIES, + retry_err, + ) + # Schedule the next retry rather than returning silently. + # Returning here without either restarting polling or setting + # a fatal error leaves the adapter in a limbo state: the + # gateway process is alive and reports "connected" but + # no messages are received or sent. + if self._polling_conflict_count < MAX_CONFLICT_RETRIES: + loop = asyncio.get_event_loop() + self._polling_error_task = loop.create_task( + self._handle_polling_conflict(retry_err) + ) + return + # Fall through to fatal on the last retry. - # Exhausted retries — fatal + # Exhausted all retries — declare a fatal error so the gateway + # runner can surface this clearly and the user knows to act. message = ( - "Another process is already polling this Telegram bot token " - "(possibly OpenClaw or another Hermes instance). " - "Hermes stopped Telegram polling after %d retries. " - "Only one poller can run per token — stop the other process " - "and restart with 'hermes start'." - % MAX_CONFLICT_RETRIES + "Telegram polling could not recover after %d retries (%ds total wait). " + "The previous gateway session is still held open on Telegram's servers, " + "or another process is using the same bot token. " + "To recover: ensure no other Hermes or OpenClaw instance is running " + "with this token, then restart the gateway with 'hermes gateway restart'." + % (MAX_CONFLICT_RETRIES, sum(10 + i * 10 for i in range(1, MAX_CONFLICT_RETRIES + 1))) + ) + logger.error( + "[%s] %s Original error: %s", + self.name, message, error, ) - logger.error("[%s] %s Original error: %s", self.name, message, error) self._set_fatal_error("telegram_polling_conflict", message, retryable=False) try: if self._app and self._app.updater: await self._app.updater.stop() except Exception as stop_error: - logger.warning("[%s] Failed stopping Telegram polling after conflict: %s", self.name, stop_error, exc_info=True) + logger.warning( + "[%s] Failed stopping Telegram updater after exhausting conflict retries: %s", + self.name, stop_error, exc_info=True, + ) await self._notify_fatal_error() async def _create_dm_topic( diff --git a/hermes_cli/main.py b/hermes_cli/main.py index c8cdadc847a..53cef76771c 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -8488,6 +8488,7 @@ def _cmd_update_impl(args, gateway_mode: bool): launch_detached_profile_gateway_restart, _get_service_pids, _graceful_restart_via_sigusr1, + _wait_for_gateway_exit, ) import signal as _signal @@ -8906,6 +8907,21 @@ def _cmd_update_impl(args, gateway_mode: bool): os.kill(pid, _signal.SIGTERM) except (ProcessLookupError, PermissionError): pass + # Wait for the old process to fully exit before the watcher + # spawns the new gateway. Telegram holds the previous + # getUpdates long-poll session open on its servers for up to + # ~30s after the client disconnects. If the new gateway + # connects before that window expires it receives a 409 + # Conflict, which _handle_polling_conflict() recovers from + # via back-off retries — but a brief wait here reduces the + # chance of hitting that path at all, especially on fast + # machines where the watcher loop restarts in < 1s. + # We wait up to 5s for the process to exit (the OS-level + # close, not the Telegram server-side expiry), then let the + # watcher take over. The Telegram adapter's retry logic + # handles any remaining 409s if the server session is still + # live when the new gateway polls. + _wait_for_gateway_exit(timeout=5.0, force_after=None) killed_pids.add(pid) relaunched_profiles.append(proc.profile)