mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
fix(gateway): keep running when platforms fail; add per-platform circuit breaker + /platform (#26600)
Stop the gateway from exiting (or systemd-restart-looping) when a single
messaging adapter fails at startup or runtime. A misconfigured WhatsApp
(npm install timeout, unpaired bridge, missing creds.json) used to take
the entire gateway down, killing cron jobs and any other connected
platforms with it.
Changes:
• Startup (gateway/run.py): when connected_count==0 but the only
errors are retryable, log a degraded-state warning and keep the
gateway alive instead of returning False. Reconnect watcher then
recovers platforms as their underlying problem clears.
• Runtime (gateway/run.py _handle_adapter_fatal_error): when the last
adapter goes down with a retryable error and is queued for
reconnection, stay alive instead of exit-with-failure. Previously
this triggered systemd Restart=on-failure, which created infinite
restart loops on persistent retryable failures (proxy outage,
repeated bridge crashes).
• Reconnect watcher (gateway/run.py _platform_reconnect_watcher):
replace the 20-attempt hard drop with a circuit-breaker pause.
After _PAUSE_AFTER_FAILURES (10) consecutive retryable failures, the
platform stays in _failed_platforms with paused=True so the watcher
skips it but the operator can still see and resume it. Non-retryable
errors still drop out of the queue immediately. Resolves #17063
(gateway giving up on Telegram after 20 attempts).
• WhatsApp preflight (gateway/platforms/whatsapp.py): refuse to start
the Node bridge when creds.json is missing. Sets a non-retryable
whatsapp_not_paired fatal error so the watcher drops it cleanly
with a single 'run hermes whatsapp' log line instead of paying the
30s bridge bootstrap timeout on every gateway start.
• WhatsApp setup ordering (hermes_cli/main.py cmd_whatsapp): only set
WHATSAPP_ENABLED=true once pairing actually succeeds. Previously
the wizard wrote the env var at step 2 (before npm install and QR
pairing), so any Ctrl+C left .env claiming WhatsApp was ready when
the bridge had no creds.json. Also propagate the env var when the
user keeps an existing pairing on a re-run.
• /platform slash command (hermes_cli/commands.py + gateway/run.py):
new gateway-only command for manual circuit-breaker control.
/platform list — show connected + failed/paused platforms
/platform pause <name> — silence a known-broken platform
/platform resume <name> — re-queue a paused platform
Tests:
• New: pause/resume helpers, /platform list|pause|resume command,
WhatsApp creds.json preflight, WhatsApp setup ordering.
• Updated: stale assertions that codified the old 'exit and let
systemd restart' behavior in test_runner_fatal_adapter.py,
test_runner_startup_failures.py, and test_platform_reconnect.py
(the 20-attempt give-up test became a circuit-breaker pause test).
5488 tests pass in tests/gateway/.
This commit is contained in:
parent
3b9368a0c4
commit
518f39557b
9 changed files with 745 additions and 62 deletions
255
gateway/run.py
255
gateway/run.py
|
|
@ -1990,21 +1990,21 @@ class GatewayRunner:
|
|||
await self.stop()
|
||||
elif not self.adapters and self._failed_platforms:
|
||||
# All platforms are down and queued for background reconnection.
|
||||
# If the error is retryable, exit with failure so systemd Restart=on-failure
|
||||
# can restart the process. Otherwise stay alive and keep retrying in background.
|
||||
if adapter.fatal_error_retryable:
|
||||
self._exit_reason = adapter.fatal_error_message or "All messaging platforms failed with retryable errors"
|
||||
self._exit_with_failure = True
|
||||
logger.error(
|
||||
"All messaging platforms failed with retryable errors. "
|
||||
"Shutting down gateway for service restart (systemd will retry)."
|
||||
)
|
||||
await self.stop()
|
||||
else:
|
||||
logger.warning(
|
||||
"No connected messaging platforms remain, but %d platform(s) queued for reconnection",
|
||||
len(self._failed_platforms),
|
||||
)
|
||||
# Keep the gateway alive so:
|
||||
# • cron jobs still run
|
||||
# • the reconnect watcher can recover platforms when the
|
||||
# underlying problem clears (proxy comes back, user runs
|
||||
# `hermes whatsapp`, etc.)
|
||||
# We used to exit-with-failure here to trigger systemd restart,
|
||||
# but that converted a transient outage into a restart loop and
|
||||
# killed in-process state every time. The reconnect watcher
|
||||
# already handles long-running recovery — let it do its job.
|
||||
logger.warning(
|
||||
"No connected messaging platforms remain, but %d platform(s) "
|
||||
"queued for reconnection — gateway staying alive, watcher will "
|
||||
"retry in background.",
|
||||
len(self._failed_platforms),
|
||||
)
|
||||
|
||||
def _request_clean_exit(self, reason: str) -> None:
|
||||
self._exit_cleanly = True
|
||||
|
|
@ -2180,6 +2180,73 @@ class GatewayRunner:
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Per-platform circuit breaker (pause/resume) — used by the reconnect
|
||||
# watcher when a retryable failure recurs past a threshold, and by the
|
||||
# /platform pause|resume slash command for manual control.
|
||||
# ------------------------------------------------------------------
|
||||
def _pause_failed_platform(self, platform, *, reason: str = "") -> None:
|
||||
"""Mark a queued platform as paused — keep it in ``_failed_platforms``
|
||||
but stop the reconnect watcher from hammering it.
|
||||
|
||||
Used by the circuit breaker after ``_PAUSE_AFTER_FAILURES`` consecutive
|
||||
retryable failures, and by ``/platform pause <name>`` for manual
|
||||
intervention. Paused platforms are surfaced in ``/platform list``
|
||||
and resumed with ``/platform resume <name>``.
|
||||
"""
|
||||
info = getattr(self, "_failed_platforms", {}).get(platform)
|
||||
if info is None:
|
||||
return
|
||||
if info.get("paused"):
|
||||
return
|
||||
info["paused"] = True
|
||||
info["pause_reason"] = reason or "auto-paused after repeated failures"
|
||||
# Push next_retry far enough out that even if "paused" is missed
|
||||
# by a stale code path, the watcher won't fire on it.
|
||||
info["next_retry"] = float("inf")
|
||||
try:
|
||||
self._update_platform_runtime_status(
|
||||
platform.value,
|
||||
platform_state="paused",
|
||||
error_code=None,
|
||||
error_message=info["pause_reason"],
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
logger.warning(
|
||||
"%s paused after %d consecutive failures (%s) — "
|
||||
"fix the underlying issue then run `/platform resume %s` "
|
||||
"to retry, or `hermes gateway restart` to restart the gateway.",
|
||||
platform.value, info.get("attempts", 0),
|
||||
info["pause_reason"], platform.value,
|
||||
)
|
||||
|
||||
def _resume_paused_platform(self, platform) -> bool:
|
||||
"""Unpause a platform — reset its attempt counter and schedule an
|
||||
immediate retry. Returns True if the platform was paused and is
|
||||
now queued; False if it wasn't paused (or wasn't in the queue).
|
||||
"""
|
||||
info = getattr(self, "_failed_platforms", {}).get(platform)
|
||||
if info is None:
|
||||
return False
|
||||
if not info.get("paused"):
|
||||
return False
|
||||
info["paused"] = False
|
||||
info.pop("pause_reason", None)
|
||||
info["attempts"] = 0
|
||||
info["next_retry"] = time.monotonic() # retry on next watcher tick
|
||||
try:
|
||||
self._update_platform_runtime_status(
|
||||
platform.value,
|
||||
platform_state="retrying",
|
||||
error_code=None,
|
||||
error_message=None,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
logger.info("%s resumed — retrying on next watcher tick", platform.value)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _load_prefill_messages() -> List[Dict[str, Any]]:
|
||||
"""Load ephemeral prefill messages from config or env var.
|
||||
|
|
@ -3613,16 +3680,32 @@ class GatewayRunner:
|
|||
return True
|
||||
if enabled_platform_count > 0:
|
||||
if startup_retryable_errors:
|
||||
# At least one platform attempted a connection and failed —
|
||||
# this is a real startup error that should block the gateway.
|
||||
# All enabled platforms hit retryable failures (network
|
||||
# blip, bridge not paired, npm install timeout, etc.).
|
||||
# Keep the gateway alive so:
|
||||
# • cron jobs still run
|
||||
# • the reconnect watcher gets a chance to recover the
|
||||
# failing platforms once the underlying problem is
|
||||
# fixed (e.g. user runs `hermes whatsapp`, fixes
|
||||
# proxy, etc.)
|
||||
# Exiting here used to convert a single misconfigured
|
||||
# platform into an infinite systemd restart loop.
|
||||
reason = "; ".join(startup_retryable_errors)
|
||||
logger.error("Gateway failed to connect any configured messaging platform: %s", reason)
|
||||
logger.warning(
|
||||
"Gateway started with no connected platforms — "
|
||||
"%d platform(s) queued for retry: %s",
|
||||
len(self._failed_platforms), reason,
|
||||
)
|
||||
try:
|
||||
from gateway.status import write_runtime_status
|
||||
write_runtime_status(gateway_state="startup_failed", exit_reason=reason)
|
||||
write_runtime_status(
|
||||
gateway_state="degraded",
|
||||
exit_reason=None,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
# Fall through to the normal "running" state — reconnect
|
||||
# watcher takes it from here.
|
||||
# All enabled platforms had no adapter (missing library or credentials).
|
||||
# In fleet deployments the same config.yaml is shared across nodes that
|
||||
# may only have credentials for a subset of platforms. Rather than
|
||||
|
|
@ -4737,11 +4820,15 @@ class GatewayRunner:
|
|||
"""Background task that periodically retries connecting failed platforms.
|
||||
|
||||
Uses exponential backoff: 30s → 60s → 120s → 240s → 300s (cap).
|
||||
Stops retrying a platform after 20 failed attempts or if the error
|
||||
is non-retryable (e.g. bad auth token).
|
||||
Retryable failures keep retrying at the backoff cap indefinitely
|
||||
— but if a platform fails ``_PAUSE_AFTER_FAILURES`` times in a row
|
||||
without ever succeeding, it is *paused*: kept in the retry queue
|
||||
but no longer hammered. The user surfaces it with ``/platform list``
|
||||
and resumes it with ``/platform resume <name>``. Non-retryable
|
||||
failures (bad auth, etc.) still drop out of the queue immediately.
|
||||
"""
|
||||
_MAX_ATTEMPTS = 20
|
||||
_BACKOFF_CAP = 300 # 5 minutes max between retries
|
||||
_PAUSE_AFTER_FAILURES = 10 # circuit-breaker threshold
|
||||
|
||||
await asyncio.sleep(10) # initial delay — let startup finish
|
||||
while self._running:
|
||||
|
|
@ -4758,22 +4845,18 @@ class GatewayRunner:
|
|||
if not self._running:
|
||||
return
|
||||
info = self._failed_platforms[platform]
|
||||
# Skip paused platforms entirely — they need explicit
|
||||
# /platform resume to come back.
|
||||
if info.get("paused"):
|
||||
continue
|
||||
if now < info["next_retry"]:
|
||||
continue # not time yet
|
||||
|
||||
if info["attempts"] >= _MAX_ATTEMPTS:
|
||||
logger.warning(
|
||||
"Giving up reconnecting %s after %d attempts",
|
||||
platform.value, info["attempts"],
|
||||
)
|
||||
del self._failed_platforms[platform]
|
||||
continue
|
||||
|
||||
platform_config = info["config"]
|
||||
attempt = info["attempts"] + 1
|
||||
logger.info(
|
||||
"Reconnecting %s (attempt %d/%d)...",
|
||||
platform.value, attempt, _MAX_ATTEMPTS,
|
||||
"Reconnecting %s (attempt %d)...",
|
||||
platform.value, attempt,
|
||||
)
|
||||
|
||||
try:
|
||||
|
|
@ -4838,6 +4921,14 @@ class GatewayRunner:
|
|||
"Reconnect %s failed, next retry in %ds",
|
||||
platform.value, backoff,
|
||||
)
|
||||
if attempt >= _PAUSE_AFTER_FAILURES:
|
||||
self._pause_failed_platform(
|
||||
platform,
|
||||
reason=(
|
||||
adapter.fatal_error_message
|
||||
or "failed to reconnect"
|
||||
),
|
||||
)
|
||||
except Exception as e:
|
||||
self._update_platform_runtime_status(
|
||||
platform.value,
|
||||
|
|
@ -4852,6 +4943,8 @@ class GatewayRunner:
|
|||
"Reconnect %s error: %s, next retry in %ds",
|
||||
platform.value, e, backoff,
|
||||
)
|
||||
if attempt >= _PAUSE_AFTER_FAILURES:
|
||||
self._pause_failed_platform(platform, reason=str(e))
|
||||
|
||||
# Check every 10 seconds for platforms that need reconnection
|
||||
for _ in range(10):
|
||||
|
|
@ -6451,6 +6544,9 @@ class GatewayRunner:
|
|||
if canonical == "agents":
|
||||
return await self._handle_agents_command(event)
|
||||
|
||||
if canonical == "platform":
|
||||
return await self._handle_platform_command(event)
|
||||
|
||||
if canonical == "restart":
|
||||
return await self._handle_restart_command(event)
|
||||
|
||||
|
|
@ -8698,6 +8794,99 @@ class GatewayRunner:
|
|||
else:
|
||||
return t("gateway.stop.no_active")
|
||||
|
||||
async def _handle_platform_command(self, event: MessageEvent) -> str:
|
||||
"""Handle ``/platform list|pause|resume [name]`` — surface and
|
||||
manually control failed/paused gateway adapters.
|
||||
|
||||
Examples:
|
||||
``/platform list`` — show connected + failed/paused platforms
|
||||
``/platform pause whatsapp`` — stop the reconnect watcher hammering whatsapp
|
||||
``/platform resume whatsapp`` — re-queue a paused platform for retry
|
||||
"""
|
||||
text = (getattr(event, "content", "") or "").strip()
|
||||
# Strip the leading "/platform" (or "/PLATFORM") token if present
|
||||
parts = text.split(maxsplit=2)
|
||||
if parts and parts[0].lower().lstrip("/").startswith("platform"):
|
||||
parts = parts[1:]
|
||||
action = (parts[0] if parts else "list").lower()
|
||||
target = parts[1].lower() if len(parts) > 1 else ""
|
||||
|
||||
# Resolve platform name (case-insensitive, value match)
|
||||
def _resolve_platform(name: str):
|
||||
if not name:
|
||||
return None
|
||||
for p in Platform.__members__.values():
|
||||
if p.value.lower() == name:
|
||||
return p
|
||||
return None
|
||||
|
||||
if action == "list":
|
||||
lines = ["**Gateway platforms**"]
|
||||
connected = sorted(p.value for p in self.adapters.keys())
|
||||
if connected:
|
||||
lines.append("Connected: " + ", ".join(connected))
|
||||
else:
|
||||
lines.append("Connected: (none)")
|
||||
failed = getattr(self, "_failed_platforms", {}) or {}
|
||||
if failed:
|
||||
for p, info in failed.items():
|
||||
if info.get("paused"):
|
||||
reason = info.get("pause_reason") or "paused"
|
||||
lines.append(
|
||||
f" · {p.value} — PAUSED ({reason}). "
|
||||
f"Resume with `/platform resume {p.value}`."
|
||||
)
|
||||
else:
|
||||
attempts = info.get("attempts", 0)
|
||||
lines.append(
|
||||
f" · {p.value} — retrying (attempt {attempts})"
|
||||
)
|
||||
else:
|
||||
lines.append("Failed/paused: (none)")
|
||||
return "\n".join(lines)
|
||||
|
||||
if action in ("pause", "resume"):
|
||||
if not target:
|
||||
return f"Usage: /platform {action} <name>"
|
||||
platform = _resolve_platform(target)
|
||||
if platform is None:
|
||||
return f"Unknown platform: {target}"
|
||||
failed = getattr(self, "_failed_platforms", {}) or {}
|
||||
if action == "pause":
|
||||
if platform not in failed:
|
||||
return (
|
||||
f"{platform.value} is not in the retry queue "
|
||||
f"(it's either connected or not enabled)."
|
||||
)
|
||||
if failed[platform].get("paused"):
|
||||
return f"{platform.value} is already paused."
|
||||
self._pause_failed_platform(platform, reason="paused via /platform pause")
|
||||
return (
|
||||
f"✓ {platform.value} paused. "
|
||||
f"Resume with `/platform resume {platform.value}` or "
|
||||
f"`hermes gateway restart` to reset."
|
||||
)
|
||||
# action == "resume"
|
||||
if platform not in failed:
|
||||
return (
|
||||
f"{platform.value} is not in the retry queue — "
|
||||
f"nothing to resume."
|
||||
)
|
||||
if not failed[platform].get("paused"):
|
||||
return (
|
||||
f"{platform.value} is already retrying — "
|
||||
f"no resume needed."
|
||||
)
|
||||
self._resume_paused_platform(platform)
|
||||
return f"✓ {platform.value} resumed — retrying on next watcher tick."
|
||||
|
||||
return (
|
||||
"Usage: /platform <list|pause|resume> [name]\n"
|
||||
" /platform list — show platform status\n"
|
||||
" /platform pause <name> — stop retrying a failing platform\n"
|
||||
" /platform resume <name> — re-queue a paused platform"
|
||||
)
|
||||
|
||||
async def _handle_restart_command(self, event: MessageEvent) -> Union[str, EphemeralReply]:
|
||||
"""Handle /restart command - drain active work, then restart the gateway."""
|
||||
# Defensive idempotency check: if the previous gateway process
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue