mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
fix(telegram): recover from post-update polling conflict without entering limbo
This commit is contained in:
parent
6be579f626
commit
f260aa6dc0
2 changed files with 86 additions and 23 deletions
|
|
@ -886,60 +886,107 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||
async def _handle_polling_conflict(self, error: Exception) -> None:
|
||||
if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict":
|
||||
return
|
||||
# Track consecutive conflicts — transient 409s can occur when a
|
||||
# previous gateway instance hasn't fully released its long-poll
|
||||
# session on Telegram's server (e.g. during --replace handoffs or
|
||||
# systemd Restart=on-failure respawns). Retry a few times before
|
||||
# giving up, so the old session has time to expire.
|
||||
# Transient 409 Conflict errors arise when the previous gateway process
|
||||
# has been killed (e.g. during `hermes update` or `--replace` handoffs)
|
||||
# but its long-poll connection hasn't yet expired on Telegram's servers.
|
||||
# Telegram holds open getUpdates sessions for up to ~30s after the
|
||||
# client disconnects, so a new gateway starting immediately will receive
|
||||
# a 409 until that server-side session expires.
|
||||
#
|
||||
# Strategy: stop the local updater, wait long enough for Telegram's
|
||||
# server-side session to expire (RETRY_DELAY grows with each attempt),
|
||||
# drain the connection pool, then restart polling. We attempt this
|
||||
# MAX_CONFLICT_RETRIES times before declaring a fatal error.
|
||||
#
|
||||
# Crucially, a failed retry must NOT leave polling in an ambiguous
|
||||
# state. If start_polling() raises, the updater is neither running
|
||||
# nor fatal — messages are silently dropped. We schedule another
|
||||
# retry attempt instead of returning silently, and only escalate to
|
||||
# fatal after all retries are exhausted.
|
||||
self._polling_conflict_count += 1
|
||||
|
||||
MAX_CONFLICT_RETRIES = 3
|
||||
RETRY_DELAY = 10 # seconds
|
||||
MAX_CONFLICT_RETRIES = 5
|
||||
# Delay grows with each attempt: 15s, 25s, 35s, 45s, 55s.
|
||||
# Telegram server-side getUpdates sessions typically expire within
|
||||
# 30s; the increasing back-off ensures we clear that window without
|
||||
# hammering the API on fast-restart loops.
|
||||
RETRY_DELAY = 10 + (self._polling_conflict_count * 10) # seconds
|
||||
|
||||
if self._polling_conflict_count <= MAX_CONFLICT_RETRIES:
|
||||
logger.warning(
|
||||
"[%s] Telegram polling conflict (%d/%d), will retry in %ds. Error: %s",
|
||||
"[%s] Telegram polling conflict (%d/%d) — previous session still "
|
||||
"held open on Telegram's servers. Waiting %ds for it to expire. "
|
||||
"Error: %s",
|
||||
self.name, self._polling_conflict_count, MAX_CONFLICT_RETRIES,
|
||||
RETRY_DELAY, error,
|
||||
)
|
||||
# Stop the local updater cleanly before sleeping. If it's already
|
||||
# stopped (e.g. PTB raised before updater.running was set) this is
|
||||
# a no-op.
|
||||
try:
|
||||
if self._app and self._app.updater and self._app.updater.running:
|
||||
await self._app.updater.stop()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
await asyncio.sleep(RETRY_DELAY)
|
||||
await self._drain_polling_connections()
|
||||
|
||||
try:
|
||||
await self._app.updater.start_polling(
|
||||
allowed_updates=Update.ALL_TYPES,
|
||||
drop_pending_updates=False,
|
||||
error_callback=self._polling_error_callback_ref,
|
||||
)
|
||||
logger.info("[%s] Telegram polling resumed after conflict retry %d", self.name, self._polling_conflict_count)
|
||||
self._polling_conflict_count = 0 # reset on success
|
||||
logger.info(
|
||||
"[%s] Telegram polling resumed after conflict retry %d/%d",
|
||||
self.name, self._polling_conflict_count, MAX_CONFLICT_RETRIES,
|
||||
)
|
||||
self._polling_conflict_count = 0 # reset counter on success
|
||||
return
|
||||
except Exception as retry_err:
|
||||
logger.warning("[%s] Telegram polling retry failed: %s", self.name, retry_err)
|
||||
# Don't fall through to fatal yet — wait for the next conflict
|
||||
# to trigger another retry attempt (up to MAX_CONFLICT_RETRIES).
|
||||
return
|
||||
logger.warning(
|
||||
"[%s] Telegram polling retry %d/%d failed: %s. "
|
||||
"Scheduling next attempt.",
|
||||
self.name, self._polling_conflict_count, MAX_CONFLICT_RETRIES,
|
||||
retry_err,
|
||||
)
|
||||
# Schedule the next retry rather than returning silently.
|
||||
# Returning here without either restarting polling or setting
|
||||
# a fatal error leaves the adapter in a limbo state: the
|
||||
# gateway process is alive and reports "connected" but
|
||||
# no messages are received or sent.
|
||||
if self._polling_conflict_count < MAX_CONFLICT_RETRIES:
|
||||
loop = asyncio.get_event_loop()
|
||||
self._polling_error_task = loop.create_task(
|
||||
self._handle_polling_conflict(retry_err)
|
||||
)
|
||||
return
|
||||
# Fall through to fatal on the last retry.
|
||||
|
||||
# Exhausted retries — fatal
|
||||
# Exhausted all retries — declare a fatal error so the gateway
|
||||
# runner can surface this clearly and the user knows to act.
|
||||
message = (
|
||||
"Another process is already polling this Telegram bot token "
|
||||
"(possibly OpenClaw or another Hermes instance). "
|
||||
"Hermes stopped Telegram polling after %d retries. "
|
||||
"Only one poller can run per token — stop the other process "
|
||||
"and restart with 'hermes start'."
|
||||
% MAX_CONFLICT_RETRIES
|
||||
"Telegram polling could not recover after %d retries (%ds total wait). "
|
||||
"The previous gateway session is still held open on Telegram's servers, "
|
||||
"or another process is using the same bot token. "
|
||||
"To recover: ensure no other Hermes or OpenClaw instance is running "
|
||||
"with this token, then restart the gateway with 'hermes gateway restart'."
|
||||
% (MAX_CONFLICT_RETRIES, sum(10 + i * 10 for i in range(1, MAX_CONFLICT_RETRIES + 1)))
|
||||
)
|
||||
logger.error(
|
||||
"[%s] %s Original error: %s",
|
||||
self.name, message, error,
|
||||
)
|
||||
logger.error("[%s] %s Original error: %s", self.name, message, error)
|
||||
self._set_fatal_error("telegram_polling_conflict", message, retryable=False)
|
||||
try:
|
||||
if self._app and self._app.updater:
|
||||
await self._app.updater.stop()
|
||||
except Exception as stop_error:
|
||||
logger.warning("[%s] Failed stopping Telegram polling after conflict: %s", self.name, stop_error, exc_info=True)
|
||||
logger.warning(
|
||||
"[%s] Failed stopping Telegram updater after exhausting conflict retries: %s",
|
||||
self.name, stop_error, exc_info=True,
|
||||
)
|
||||
await self._notify_fatal_error()
|
||||
|
||||
async def _create_dm_topic(
|
||||
|
|
|
|||
|
|
@ -8488,6 +8488,7 @@ def _cmd_update_impl(args, gateway_mode: bool):
|
|||
launch_detached_profile_gateway_restart,
|
||||
_get_service_pids,
|
||||
_graceful_restart_via_sigusr1,
|
||||
_wait_for_gateway_exit,
|
||||
)
|
||||
import signal as _signal
|
||||
|
||||
|
|
@ -8906,6 +8907,21 @@ def _cmd_update_impl(args, gateway_mode: bool):
|
|||
os.kill(pid, _signal.SIGTERM)
|
||||
except (ProcessLookupError, PermissionError):
|
||||
pass
|
||||
# Wait for the old process to fully exit before the watcher
|
||||
# spawns the new gateway. Telegram holds the previous
|
||||
# getUpdates long-poll session open on its servers for up to
|
||||
# ~30s after the client disconnects. If the new gateway
|
||||
# connects before that window expires it receives a 409
|
||||
# Conflict, which _handle_polling_conflict() recovers from
|
||||
# via back-off retries — but a brief wait here reduces the
|
||||
# chance of hitting that path at all, especially on fast
|
||||
# machines where the watcher loop restarts in < 1s.
|
||||
# We wait up to 5s for the process to exit (the OS-level
|
||||
# close, not the Telegram server-side expiry), then let the
|
||||
# watcher take over. The Telegram adapter's retry logic
|
||||
# handles any remaining 409s if the server session is still
|
||||
# live when the new gateway polls.
|
||||
_wait_for_gateway_exit(timeout=5.0, force_after=None)
|
||||
killed_pids.add(pid)
|
||||
relaunched_profiles.append(proc.profile)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue