mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): harden Telegram polling conflict handling
- detect Telegram getUpdates conflicts and stop polling cleanly instead of retry-spamming forever
- add a machine-local token-scoped lock so different HERMES_HOME profiles on the same host can't poll the same bot token at once
- persist gateway runtime health/fatal adapter state and surface it in ● hermes-gateway.service - Hermes Agent Gateway - Messaging Platform Integration
Loaded: loaded (/home/teknium/.config/systemd/user/hermes-gateway.service; enabled; preset: enabled)
Active: active (running) since Sat 2026-03-14 09:25:35 PDT; 2h 45min ago
Invocation: 8879379b25994201b98381f4bd80c2af
Main PID: 1147926 (python)
Tasks: 16 (limit: 76757)
Memory: 151.4M (peak: 168.1M)
CPU: 47.883s
CGroup: /user.slice/user-1000.slice/user@1000.service/app.slice/hermes-gateway.service
├─1147926 /home/teknium/.hermes/hermes-agent/venv/bin/python -m hermes_cli.main gateway run --replace
└─1147966 node /home/teknium/.hermes/hermes-agent/scripts/whatsapp-bridge/bridge.js --port 3000 --session /home/teknium/.hermes/whatsapp/session --mode self-chat
Mar 14 09:27:03 teknium-dev python[1147926]: 🔄 Retrying API call (2/3)...
Mar 14 09:27:04 teknium-dev python[1147926]: [409B blob data]
Mar 14 09:27:04 teknium-dev python[1147926]: Content: ''
Mar 14 09:27:04 teknium-dev python[1147926]: ❌ Max retries (3) for empty content exceeded.
Mar 14 09:27:07 teknium-dev python[1147926]: [1K blob data]
Mar 14 09:27:07 teknium-dev python[1147926]: Content: ''
Mar 14 09:27:07 teknium-dev python[1147926]: 🔄 Retrying API call (1/3)...
Mar 14 09:27:12 teknium-dev python[1147926]: [1.7K blob data]
Mar 14 09:27:12 teknium-dev python[1147926]: Content: ''
Mar 14 09:27:12 teknium-dev python[1147926]: 🔄 Retrying API call (2/3)...
⚠ Installed gateway service definition is outdated
Run: hermes gateway restart # auto-refreshes the unit
✓ Gateway service is running
✓ Systemd linger is enabled (service survives logout)
- cleanly exit non-retryable startup conflicts without triggering service restart loops
Tests:
- gateway status runtime-state helpers
- Telegram token-lock and polling-conflict behavior
- GatewayRunner clean exit on non-retryable startup conflict
- CLI runtime health summary
This commit is contained in:
parent
21ad98b74c
commit
5a2fcaab39
9 changed files with 692 additions and 9 deletions
|
|
@ -346,6 +346,10 @@ class BasePlatformAdapter(ABC):
|
||||||
self.platform = platform
|
self.platform = platform
|
||||||
self._message_handler: Optional[MessageHandler] = None
|
self._message_handler: Optional[MessageHandler] = None
|
||||||
self._running = False
|
self._running = False
|
||||||
|
self._fatal_error_code: Optional[str] = None
|
||||||
|
self._fatal_error_message: Optional[str] = None
|
||||||
|
self._fatal_error_retryable = True
|
||||||
|
self._fatal_error_handler: Optional[Callable[["BasePlatformAdapter"], Awaitable[None] | None]] = None
|
||||||
|
|
||||||
# Track active message handlers per session for interrupt support
|
# Track active message handlers per session for interrupt support
|
||||||
# Key: session_key (e.g., chat_id), Value: (event, asyncio.Event for interrupt)
|
# Key: session_key (e.g., chat_id), Value: (event, asyncio.Event for interrupt)
|
||||||
|
|
@ -353,6 +357,70 @@ class BasePlatformAdapter(ABC):
|
||||||
self._pending_messages: Dict[str, MessageEvent] = {}
|
self._pending_messages: Dict[str, MessageEvent] = {}
|
||||||
# Chats where auto-TTS on voice input is disabled (set by /voice off)
|
# Chats where auto-TTS on voice input is disabled (set by /voice off)
|
||||||
self._auto_tts_disabled_chats: set = set()
|
self._auto_tts_disabled_chats: set = set()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def has_fatal_error(self) -> bool:
|
||||||
|
return self._fatal_error_message is not None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def fatal_error_message(self) -> Optional[str]:
|
||||||
|
return self._fatal_error_message
|
||||||
|
|
||||||
|
@property
|
||||||
|
def fatal_error_code(self) -> Optional[str]:
|
||||||
|
return self._fatal_error_code
|
||||||
|
|
||||||
|
@property
|
||||||
|
def fatal_error_retryable(self) -> bool:
|
||||||
|
return self._fatal_error_retryable
|
||||||
|
|
||||||
|
def set_fatal_error_handler(self, handler: Callable[["BasePlatformAdapter"], Awaitable[None] | None]) -> None:
|
||||||
|
self._fatal_error_handler = handler
|
||||||
|
|
||||||
|
def _mark_connected(self) -> None:
|
||||||
|
self._running = True
|
||||||
|
self._fatal_error_code = None
|
||||||
|
self._fatal_error_message = None
|
||||||
|
self._fatal_error_retryable = True
|
||||||
|
try:
|
||||||
|
from gateway.status import write_runtime_status
|
||||||
|
write_runtime_status(platform=self.platform.value, platform_state="connected", error_code=None, error_message=None)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _mark_disconnected(self) -> None:
|
||||||
|
self._running = False
|
||||||
|
if self.has_fatal_error:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
from gateway.status import write_runtime_status
|
||||||
|
write_runtime_status(platform=self.platform.value, platform_state="disconnected", error_code=None, error_message=None)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _set_fatal_error(self, code: str, message: str, *, retryable: bool) -> None:
|
||||||
|
self._running = False
|
||||||
|
self._fatal_error_code = code
|
||||||
|
self._fatal_error_message = message
|
||||||
|
self._fatal_error_retryable = retryable
|
||||||
|
try:
|
||||||
|
from gateway.status import write_runtime_status
|
||||||
|
write_runtime_status(
|
||||||
|
platform=self.platform.value,
|
||||||
|
platform_state="fatal",
|
||||||
|
error_code=code,
|
||||||
|
error_message=message,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def _notify_fatal_error(self) -> None:
|
||||||
|
handler = self._fatal_error_handler
|
||||||
|
if not handler:
|
||||||
|
return
|
||||||
|
result = handler(self)
|
||||||
|
if asyncio.iscoroutine(result):
|
||||||
|
await result
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self) -> str:
|
def name(self) -> str:
|
||||||
|
|
|
||||||
|
|
@ -110,7 +110,35 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
super().__init__(config, Platform.TELEGRAM)
|
super().__init__(config, Platform.TELEGRAM)
|
||||||
self._app: Optional[Application] = None
|
self._app: Optional[Application] = None
|
||||||
self._bot: Optional[Bot] = None
|
self._bot: Optional[Bot] = None
|
||||||
|
self._token_lock_identity: Optional[str] = None
|
||||||
|
self._polling_error_task: Optional[asyncio.Task] = None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _looks_like_polling_conflict(error: Exception) -> bool:
|
||||||
|
text = str(error).lower()
|
||||||
|
return (
|
||||||
|
error.__class__.__name__.lower() == "conflict"
|
||||||
|
or "terminated by other getupdates request" in text
|
||||||
|
or "another bot instance is running" in text
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _handle_polling_conflict(self, error: Exception) -> None:
|
||||||
|
if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict":
|
||||||
|
return
|
||||||
|
message = (
|
||||||
|
"Another Telegram bot poller is already using this token. "
|
||||||
|
"Hermes stopped Telegram polling to avoid endless retry spam. "
|
||||||
|
"Make sure only one gateway instance is running for this bot token."
|
||||||
|
)
|
||||||
|
logger.error("[%s] %s Original error: %s", self.name, message, error)
|
||||||
|
self._set_fatal_error("telegram_polling_conflict", message, retryable=False)
|
||||||
|
try:
|
||||||
|
if self._app and self._app.updater:
|
||||||
|
await self._app.updater.stop()
|
||||||
|
except Exception as stop_error:
|
||||||
|
logger.warning("[%s] Failed stopping Telegram polling after conflict: %s", self.name, stop_error, exc_info=True)
|
||||||
|
await self._notify_fatal_error()
|
||||||
|
|
||||||
async def connect(self) -> bool:
|
async def connect(self) -> bool:
|
||||||
"""Connect to Telegram and start polling for updates."""
|
"""Connect to Telegram and start polling for updates."""
|
||||||
if not TELEGRAM_AVAILABLE:
|
if not TELEGRAM_AVAILABLE:
|
||||||
|
|
@ -125,6 +153,25 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
from gateway.status import acquire_scoped_lock
|
||||||
|
|
||||||
|
self._token_lock_identity = self.config.token
|
||||||
|
acquired, existing = acquire_scoped_lock(
|
||||||
|
"telegram-bot-token",
|
||||||
|
self._token_lock_identity,
|
||||||
|
metadata={"platform": self.platform.value},
|
||||||
|
)
|
||||||
|
if not acquired:
|
||||||
|
owner_pid = existing.get("pid") if isinstance(existing, dict) else None
|
||||||
|
message = (
|
||||||
|
"Another local Hermes gateway is already using this Telegram bot token"
|
||||||
|
+ (f" (PID {owner_pid})." if owner_pid else ".")
|
||||||
|
+ " Stop the other gateway before starting a second Telegram poller."
|
||||||
|
)
|
||||||
|
logger.error("[%s] %s", self.name, message)
|
||||||
|
self._set_fatal_error("telegram_token_lock", message, retryable=False)
|
||||||
|
return False
|
||||||
|
|
||||||
# Build the application
|
# Build the application
|
||||||
self._app = Application.builder().token(self.config.token).build()
|
self._app = Application.builder().token(self.config.token).build()
|
||||||
self._bot = self._app.bot
|
self._bot = self._app.bot
|
||||||
|
|
@ -150,9 +197,20 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
# Start polling in background
|
# Start polling in background
|
||||||
await self._app.initialize()
|
await self._app.initialize()
|
||||||
await self._app.start()
|
await self._app.start()
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
|
||||||
|
def _polling_error_callback(error: Exception) -> None:
|
||||||
|
if not self._looks_like_polling_conflict(error):
|
||||||
|
logger.error("[%s] Telegram polling error: %s", self.name, error, exc_info=True)
|
||||||
|
return
|
||||||
|
if self._polling_error_task and not self._polling_error_task.done():
|
||||||
|
return
|
||||||
|
self._polling_error_task = loop.create_task(self._handle_polling_conflict(error))
|
||||||
|
|
||||||
await self._app.updater.start_polling(
|
await self._app.updater.start_polling(
|
||||||
allowed_updates=Update.ALL_TYPES,
|
allowed_updates=Update.ALL_TYPES,
|
||||||
drop_pending_updates=True,
|
drop_pending_updates=True,
|
||||||
|
error_callback=_polling_error_callback,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Register bot commands so Telegram shows a hint menu when users type /
|
# Register bot commands so Telegram shows a hint menu when users type /
|
||||||
|
|
@ -188,11 +246,17 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
self._running = True
|
self._mark_connected()
|
||||||
logger.info("[%s] Connected and polling for Telegram updates", self.name)
|
logger.info("[%s] Connected and polling for Telegram updates", self.name)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
if self._token_lock_identity:
|
||||||
|
try:
|
||||||
|
from gateway.status import release_scoped_lock
|
||||||
|
release_scoped_lock("telegram-bot-token", self._token_lock_identity)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
logger.error("[%s] Failed to connect to Telegram: %s", self.name, e, exc_info=True)
|
logger.error("[%s] Failed to connect to Telegram: %s", self.name, e, exc_info=True)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
@ -205,10 +269,17 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
await self._app.shutdown()
|
await self._app.shutdown()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("[%s] Error during Telegram disconnect: %s", self.name, e, exc_info=True)
|
logger.warning("[%s] Error during Telegram disconnect: %s", self.name, e, exc_info=True)
|
||||||
|
if self._token_lock_identity:
|
||||||
|
try:
|
||||||
|
from gateway.status import release_scoped_lock
|
||||||
|
release_scoped_lock("telegram-bot-token", self._token_lock_identity)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("[%s] Error releasing Telegram token lock: %s", self.name, e, exc_info=True)
|
||||||
|
|
||||||
self._running = False
|
self._mark_disconnected()
|
||||||
self._app = None
|
self._app = None
|
||||||
self._bot = None
|
self._bot = None
|
||||||
|
self._token_lock_identity = None
|
||||||
logger.info("[%s] Disconnected from Telegram", self.name)
|
logger.info("[%s] Disconnected from Telegram", self.name)
|
||||||
|
|
||||||
async def send(
|
async def send(
|
||||||
|
|
|
||||||
|
|
@ -245,6 +245,8 @@ class GatewayRunner:
|
||||||
self.delivery_router = DeliveryRouter(self.config)
|
self.delivery_router = DeliveryRouter(self.config)
|
||||||
self._running = False
|
self._running = False
|
||||||
self._shutdown_event = asyncio.Event()
|
self._shutdown_event = asyncio.Event()
|
||||||
|
self._exit_cleanly = False
|
||||||
|
self._exit_reason: Optional[str] = None
|
||||||
|
|
||||||
# Track running agents per session for interrupt support
|
# Track running agents per session for interrupt support
|
||||||
# Key: session_key, Value: AIAgent instance
|
# Key: session_key, Value: AIAgent instance
|
||||||
|
|
@ -463,6 +465,41 @@ class GatewayRunner:
|
||||||
"""Run the sync memory flush in a thread pool so it won't block the event loop."""
|
"""Run the sync memory flush in a thread pool so it won't block the event loop."""
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
await loop.run_in_executor(None, self._flush_memories_for_session, old_session_id)
|
await loop.run_in_executor(None, self._flush_memories_for_session, old_session_id)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def should_exit_cleanly(self) -> bool:
|
||||||
|
return self._exit_cleanly
|
||||||
|
|
||||||
|
@property
|
||||||
|
def exit_reason(self) -> Optional[str]:
|
||||||
|
return self._exit_reason
|
||||||
|
|
||||||
|
async def _handle_adapter_fatal_error(self, adapter: BasePlatformAdapter) -> None:
|
||||||
|
"""React to a non-retryable adapter failure after startup."""
|
||||||
|
logger.error(
|
||||||
|
"Fatal %s adapter error (%s): %s",
|
||||||
|
adapter.platform.value,
|
||||||
|
adapter.fatal_error_code or "unknown",
|
||||||
|
adapter.fatal_error_message or "unknown error",
|
||||||
|
)
|
||||||
|
|
||||||
|
existing = self.adapters.get(adapter.platform)
|
||||||
|
if existing is adapter:
|
||||||
|
try:
|
||||||
|
await adapter.disconnect()
|
||||||
|
finally:
|
||||||
|
self.adapters.pop(adapter.platform, None)
|
||||||
|
self.delivery_router.adapters = self.adapters
|
||||||
|
|
||||||
|
if not self.adapters:
|
||||||
|
self._exit_reason = adapter.fatal_error_message or "All messaging adapters disconnected"
|
||||||
|
logger.error("No connected messaging platforms remain. Shutting down gateway cleanly.")
|
||||||
|
await self.stop()
|
||||||
|
|
||||||
|
def _request_clean_exit(self, reason: str) -> None:
|
||||||
|
self._exit_cleanly = True
|
||||||
|
self._exit_reason = reason
|
||||||
|
self._shutdown_event.set()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _load_prefill_messages() -> List[Dict[str, Any]]:
|
def _load_prefill_messages() -> List[Dict[str, Any]]:
|
||||||
|
|
@ -647,6 +684,11 @@ class GatewayRunner:
|
||||||
"""
|
"""
|
||||||
logger.info("Starting Hermes Gateway...")
|
logger.info("Starting Hermes Gateway...")
|
||||||
logger.info("Session storage: %s", self.config.sessions_dir)
|
logger.info("Session storage: %s", self.config.sessions_dir)
|
||||||
|
try:
|
||||||
|
from gateway.status import write_runtime_status
|
||||||
|
write_runtime_status(gateway_state="starting", exit_reason=None)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
# Warn if no user allowlists are configured and open access is not opted in
|
# Warn if no user allowlists are configured and open access is not opted in
|
||||||
_any_allowlist = any(
|
_any_allowlist = any(
|
||||||
|
|
@ -676,6 +718,7 @@ class GatewayRunner:
|
||||||
logger.warning("Process checkpoint recovery: %s", e)
|
logger.warning("Process checkpoint recovery: %s", e)
|
||||||
|
|
||||||
connected_count = 0
|
connected_count = 0
|
||||||
|
startup_nonretryable_errors: list[str] = []
|
||||||
|
|
||||||
# Initialize and connect each configured platform
|
# Initialize and connect each configured platform
|
||||||
for platform, platform_config in self.config.platforms.items():
|
for platform, platform_config in self.config.platforms.items():
|
||||||
|
|
@ -687,8 +730,9 @@ class GatewayRunner:
|
||||||
logger.warning("No adapter available for %s", platform.value)
|
logger.warning("No adapter available for %s", platform.value)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Set up message handler
|
# Set up message + fatal error handlers
|
||||||
adapter.set_message_handler(self._handle_message)
|
adapter.set_message_handler(self._handle_message)
|
||||||
|
adapter.set_fatal_error_handler(self._handle_adapter_fatal_error)
|
||||||
|
|
||||||
# Try to connect
|
# Try to connect
|
||||||
logger.info("Connecting to %s...", platform.value)
|
logger.info("Connecting to %s...", platform.value)
|
||||||
|
|
@ -701,10 +745,24 @@ class GatewayRunner:
|
||||||
logger.info("✓ %s connected", platform.value)
|
logger.info("✓ %s connected", platform.value)
|
||||||
else:
|
else:
|
||||||
logger.warning("✗ %s failed to connect", platform.value)
|
logger.warning("✗ %s failed to connect", platform.value)
|
||||||
|
if adapter.has_fatal_error and not adapter.fatal_error_retryable:
|
||||||
|
startup_nonretryable_errors.append(
|
||||||
|
f"{platform.value}: {adapter.fatal_error_message}"
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("✗ %s error: %s", platform.value, e)
|
logger.error("✗ %s error: %s", platform.value, e)
|
||||||
|
|
||||||
if connected_count == 0:
|
if connected_count == 0:
|
||||||
|
if startup_nonretryable_errors:
|
||||||
|
reason = "; ".join(startup_nonretryable_errors)
|
||||||
|
logger.error("Gateway hit a non-retryable startup conflict: %s", reason)
|
||||||
|
try:
|
||||||
|
from gateway.status import write_runtime_status
|
||||||
|
write_runtime_status(gateway_state="startup_failed", exit_reason=reason)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._request_clean_exit(reason)
|
||||||
|
return True
|
||||||
logger.warning("No messaging platforms connected.")
|
logger.warning("No messaging platforms connected.")
|
||||||
logger.info("Gateway will continue running for cron job execution.")
|
logger.info("Gateway will continue running for cron job execution.")
|
||||||
|
|
||||||
|
|
@ -712,6 +770,11 @@ class GatewayRunner:
|
||||||
self.delivery_router.adapters = self.adapters
|
self.delivery_router.adapters = self.adapters
|
||||||
|
|
||||||
self._running = True
|
self._running = True
|
||||||
|
try:
|
||||||
|
from gateway.status import write_runtime_status
|
||||||
|
write_runtime_status(gateway_state="running", exit_reason=None)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
# Emit gateway:startup hook
|
# Emit gateway:startup hook
|
||||||
hook_count = len(self.hooks.loaded_hooks)
|
hook_count = len(self.hooks.loaded_hooks)
|
||||||
|
|
@ -806,8 +869,12 @@ class GatewayRunner:
|
||||||
self._shutdown_all_gateway_honcho()
|
self._shutdown_all_gateway_honcho()
|
||||||
self._shutdown_event.set()
|
self._shutdown_event.set()
|
||||||
|
|
||||||
from gateway.status import remove_pid_file
|
from gateway.status import remove_pid_file, write_runtime_status
|
||||||
remove_pid_file()
|
remove_pid_file()
|
||||||
|
try:
|
||||||
|
write_runtime_status(gateway_state="stopped", exit_reason=self._exit_reason)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
logger.info("Gateway stopped")
|
logger.info("Gateway stopped")
|
||||||
|
|
||||||
|
|
@ -4340,6 +4407,10 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
||||||
success = await runner.start()
|
success = await runner.start()
|
||||||
if not success:
|
if not success:
|
||||||
return False
|
return False
|
||||||
|
if runner.should_exit_cleanly:
|
||||||
|
if runner.exit_reason:
|
||||||
|
logger.error("Gateway exiting cleanly: %s", runner.exit_reason)
|
||||||
|
return True
|
||||||
|
|
||||||
# Write PID file so CLI can detect gateway is running
|
# Write PID file so CLI can detect gateway is running
|
||||||
import atexit
|
import atexit
|
||||||
|
|
|
||||||
|
|
@ -11,13 +11,17 @@ that will be useful when we add named profiles (multiple agents running
|
||||||
concurrently under distinct configurations).
|
concurrently under distinct configurations).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import Any, Optional
|
||||||
|
|
||||||
_GATEWAY_KIND = "hermes-gateway"
|
_GATEWAY_KIND = "hermes-gateway"
|
||||||
|
_RUNTIME_STATUS_FILE = "gateway_state.json"
|
||||||
|
_LOCKS_DIRNAME = "gateway-locks"
|
||||||
|
|
||||||
|
|
||||||
def _get_pid_path() -> Path:
|
def _get_pid_path() -> Path:
|
||||||
|
|
@ -26,6 +30,32 @@ def _get_pid_path() -> Path:
|
||||||
return home / "gateway.pid"
|
return home / "gateway.pid"
|
||||||
|
|
||||||
|
|
||||||
|
def _get_runtime_status_path() -> Path:
|
||||||
|
"""Return the persisted runtime health/status file path."""
|
||||||
|
return _get_pid_path().with_name(_RUNTIME_STATUS_FILE)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_lock_dir() -> Path:
|
||||||
|
"""Return the machine-local directory for token-scoped gateway locks."""
|
||||||
|
override = os.getenv("HERMES_GATEWAY_LOCK_DIR")
|
||||||
|
if override:
|
||||||
|
return Path(override)
|
||||||
|
state_home = Path(os.getenv("XDG_STATE_HOME", Path.home() / ".local" / "state"))
|
||||||
|
return state_home / "hermes" / _LOCKS_DIRNAME
|
||||||
|
|
||||||
|
|
||||||
|
def _utc_now_iso() -> str:
|
||||||
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
def _scope_hash(identity: str) -> str:
|
||||||
|
return hashlib.sha256(identity.encode("utf-8")).hexdigest()[:16]
|
||||||
|
|
||||||
|
|
||||||
|
def _get_scope_lock_path(scope: str, identity: str) -> Path:
|
||||||
|
return _get_lock_dir() / f"{scope}-{_scope_hash(identity)}.lock"
|
||||||
|
|
||||||
|
|
||||||
def _get_process_start_time(pid: int) -> Optional[int]:
|
def _get_process_start_time(pid: int) -> Optional[int]:
|
||||||
"""Return the kernel start time for a process when available."""
|
"""Return the kernel start time for a process when available."""
|
||||||
stat_path = Path(f"/proc/{pid}/stat")
|
stat_path = Path(f"/proc/{pid}/stat")
|
||||||
|
|
@ -73,6 +103,38 @@ def _build_pid_record() -> dict:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _build_runtime_status_record() -> dict[str, Any]:
|
||||||
|
payload = _build_pid_record()
|
||||||
|
payload.update({
|
||||||
|
"gateway_state": "starting",
|
||||||
|
"exit_reason": None,
|
||||||
|
"platforms": {},
|
||||||
|
"updated_at": _utc_now_iso(),
|
||||||
|
})
|
||||||
|
return payload
|
||||||
|
|
||||||
|
|
||||||
|
def _read_json_file(path: Path) -> Optional[dict[str, Any]]:
|
||||||
|
if not path.exists():
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
raw = path.read_text().strip()
|
||||||
|
except OSError:
|
||||||
|
return None
|
||||||
|
if not raw:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
payload = json.loads(raw)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return None
|
||||||
|
return payload if isinstance(payload, dict) else None
|
||||||
|
|
||||||
|
|
||||||
|
def _write_json_file(path: Path, payload: dict[str, Any]) -> None:
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
path.write_text(json.dumps(payload))
|
||||||
|
|
||||||
|
|
||||||
def _read_pid_record() -> Optional[dict]:
|
def _read_pid_record() -> Optional[dict]:
|
||||||
pid_path = _get_pid_path()
|
pid_path = _get_pid_path()
|
||||||
if not pid_path.exists():
|
if not pid_path.exists():
|
||||||
|
|
@ -99,9 +161,49 @@ def _read_pid_record() -> Optional[dict]:
|
||||||
|
|
||||||
def write_pid_file() -> None:
|
def write_pid_file() -> None:
|
||||||
"""Write the current process PID and metadata to the gateway PID file."""
|
"""Write the current process PID and metadata to the gateway PID file."""
|
||||||
pid_path = _get_pid_path()
|
_write_json_file(_get_pid_path(), _build_pid_record())
|
||||||
pid_path.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
pid_path.write_text(json.dumps(_build_pid_record()))
|
|
||||||
|
def write_runtime_status(
|
||||||
|
*,
|
||||||
|
gateway_state: Optional[str] = None,
|
||||||
|
exit_reason: Optional[str] = None,
|
||||||
|
platform: Optional[str] = None,
|
||||||
|
platform_state: Optional[str] = None,
|
||||||
|
error_code: Optional[str] = None,
|
||||||
|
error_message: Optional[str] = None,
|
||||||
|
) -> None:
|
||||||
|
"""Persist gateway runtime health information for diagnostics/status."""
|
||||||
|
path = _get_runtime_status_path()
|
||||||
|
payload = _read_json_file(path) or _build_runtime_status_record()
|
||||||
|
payload.setdefault("platforms", {})
|
||||||
|
payload.setdefault("kind", _GATEWAY_KIND)
|
||||||
|
payload.setdefault("pid", os.getpid())
|
||||||
|
payload.setdefault("start_time", _get_process_start_time(os.getpid()))
|
||||||
|
payload["updated_at"] = _utc_now_iso()
|
||||||
|
|
||||||
|
if gateway_state is not None:
|
||||||
|
payload["gateway_state"] = gateway_state
|
||||||
|
if exit_reason is not None:
|
||||||
|
payload["exit_reason"] = exit_reason
|
||||||
|
|
||||||
|
if platform is not None:
|
||||||
|
platform_payload = payload["platforms"].get(platform, {})
|
||||||
|
if platform_state is not None:
|
||||||
|
platform_payload["state"] = platform_state
|
||||||
|
if error_code is not None:
|
||||||
|
platform_payload["error_code"] = error_code
|
||||||
|
if error_message is not None:
|
||||||
|
platform_payload["error_message"] = error_message
|
||||||
|
platform_payload["updated_at"] = _utc_now_iso()
|
||||||
|
payload["platforms"][platform] = platform_payload
|
||||||
|
|
||||||
|
_write_json_file(path, payload)
|
||||||
|
|
||||||
|
|
||||||
|
def read_runtime_status() -> Optional[dict[str, Any]]:
|
||||||
|
"""Read the persisted gateway runtime health/status information."""
|
||||||
|
return _read_json_file(_get_runtime_status_path())
|
||||||
|
|
||||||
|
|
||||||
def remove_pid_file() -> None:
|
def remove_pid_file() -> None:
|
||||||
|
|
@ -112,6 +214,87 @@ def remove_pid_file() -> None:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def acquire_scoped_lock(scope: str, identity: str, metadata: Optional[dict[str, Any]] = None) -> tuple[bool, Optional[dict[str, Any]]]:
|
||||||
|
"""Acquire a machine-local lock keyed by scope + identity.
|
||||||
|
|
||||||
|
Used to prevent multiple local gateways from using the same external identity
|
||||||
|
at once (e.g. the same Telegram bot token across different HERMES_HOME dirs).
|
||||||
|
"""
|
||||||
|
lock_path = _get_scope_lock_path(scope, identity)
|
||||||
|
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
record = {
|
||||||
|
**_build_pid_record(),
|
||||||
|
"scope": scope,
|
||||||
|
"identity_hash": _scope_hash(identity),
|
||||||
|
"metadata": metadata or {},
|
||||||
|
"updated_at": _utc_now_iso(),
|
||||||
|
}
|
||||||
|
|
||||||
|
existing = _read_json_file(lock_path)
|
||||||
|
if existing:
|
||||||
|
try:
|
||||||
|
existing_pid = int(existing["pid"])
|
||||||
|
except (KeyError, TypeError, ValueError):
|
||||||
|
existing_pid = None
|
||||||
|
|
||||||
|
if existing_pid == os.getpid() and existing.get("start_time") == record.get("start_time"):
|
||||||
|
_write_json_file(lock_path, record)
|
||||||
|
return True, existing
|
||||||
|
|
||||||
|
stale = existing_pid is None
|
||||||
|
if not stale:
|
||||||
|
try:
|
||||||
|
os.kill(existing_pid, 0)
|
||||||
|
except (ProcessLookupError, PermissionError):
|
||||||
|
stale = True
|
||||||
|
else:
|
||||||
|
current_start = _get_process_start_time(existing_pid)
|
||||||
|
if (
|
||||||
|
existing.get("start_time") is not None
|
||||||
|
and current_start is not None
|
||||||
|
and current_start != existing.get("start_time")
|
||||||
|
):
|
||||||
|
stale = True
|
||||||
|
if stale:
|
||||||
|
try:
|
||||||
|
lock_path.unlink(missing_ok=True)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
return False, existing
|
||||||
|
|
||||||
|
try:
|
||||||
|
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
||||||
|
except FileExistsError:
|
||||||
|
return False, _read_json_file(lock_path)
|
||||||
|
try:
|
||||||
|
with os.fdopen(fd, "w", encoding="utf-8") as handle:
|
||||||
|
json.dump(record, handle)
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
lock_path.unlink(missing_ok=True)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
raise
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
|
||||||
|
def release_scoped_lock(scope: str, identity: str) -> None:
|
||||||
|
"""Release a previously-acquired scope lock when owned by this process."""
|
||||||
|
lock_path = _get_scope_lock_path(scope, identity)
|
||||||
|
existing = _read_json_file(lock_path)
|
||||||
|
if not existing:
|
||||||
|
return
|
||||||
|
if existing.get("pid") != os.getpid():
|
||||||
|
return
|
||||||
|
if existing.get("start_time") != _get_process_start_time(os.getpid()):
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
lock_path.unlink(missing_ok=True)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def get_running_pid() -> Optional[int]:
|
def get_running_pid() -> Optional[int]:
|
||||||
"""Return the PID of a running gateway instance, or ``None``.
|
"""Return the PID of a running gateway instance, or ``None``.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -367,6 +367,13 @@ def systemd_status(deep: bool = False):
|
||||||
print("✗ Gateway service is stopped")
|
print("✗ Gateway service is stopped")
|
||||||
print(" Run: hermes gateway start")
|
print(" Run: hermes gateway start")
|
||||||
|
|
||||||
|
runtime_lines = _runtime_health_lines()
|
||||||
|
if runtime_lines:
|
||||||
|
print()
|
||||||
|
print("Recent gateway health:")
|
||||||
|
for line in runtime_lines:
|
||||||
|
print(f" {line}")
|
||||||
|
|
||||||
if deep:
|
if deep:
|
||||||
print_systemd_linger_guidance()
|
print_systemd_linger_guidance()
|
||||||
else:
|
else:
|
||||||
|
|
@ -693,6 +700,35 @@ def _platform_status(platform: dict) -> str:
|
||||||
return "not configured"
|
return "not configured"
|
||||||
|
|
||||||
|
|
||||||
|
def _runtime_health_lines() -> list[str]:
|
||||||
|
"""Summarize the latest persisted gateway runtime health state."""
|
||||||
|
try:
|
||||||
|
from gateway.status import read_runtime_status
|
||||||
|
except Exception:
|
||||||
|
return []
|
||||||
|
|
||||||
|
state = read_runtime_status()
|
||||||
|
if not state:
|
||||||
|
return []
|
||||||
|
|
||||||
|
lines: list[str] = []
|
||||||
|
gateway_state = state.get("gateway_state")
|
||||||
|
exit_reason = state.get("exit_reason")
|
||||||
|
platforms = state.get("platforms", {}) or {}
|
||||||
|
|
||||||
|
for platform, pdata in platforms.items():
|
||||||
|
if pdata.get("state") == "fatal":
|
||||||
|
message = pdata.get("error_message") or "unknown error"
|
||||||
|
lines.append(f"⚠ {platform}: {message}")
|
||||||
|
|
||||||
|
if gateway_state == "startup_failed" and exit_reason:
|
||||||
|
lines.append(f"⚠ Last startup issue: {exit_reason}")
|
||||||
|
elif gateway_state == "stopped" and exit_reason:
|
||||||
|
lines.append(f"⚠ Last shutdown reason: {exit_reason}")
|
||||||
|
|
||||||
|
return lines
|
||||||
|
|
||||||
|
|
||||||
def _setup_standard_platform(platform: dict):
|
def _setup_standard_platform(platform: dict):
|
||||||
"""Interactive setup for Telegram, Discord, or Slack."""
|
"""Interactive setup for Telegram, Discord, or Slack."""
|
||||||
emoji = platform["emoji"]
|
emoji = platform["emoji"]
|
||||||
|
|
@ -1186,11 +1222,23 @@ def gateway_command(args):
|
||||||
if pids:
|
if pids:
|
||||||
print(f"✓ Gateway is running (PID: {', '.join(map(str, pids))})")
|
print(f"✓ Gateway is running (PID: {', '.join(map(str, pids))})")
|
||||||
print(" (Running manually, not as a system service)")
|
print(" (Running manually, not as a system service)")
|
||||||
|
runtime_lines = _runtime_health_lines()
|
||||||
|
if runtime_lines:
|
||||||
|
print()
|
||||||
|
print("Recent gateway health:")
|
||||||
|
for line in runtime_lines:
|
||||||
|
print(f" {line}")
|
||||||
print()
|
print()
|
||||||
print("To install as a service:")
|
print("To install as a service:")
|
||||||
print(" hermes gateway install")
|
print(" hermes gateway install")
|
||||||
else:
|
else:
|
||||||
print("✗ Gateway is not running")
|
print("✗ Gateway is not running")
|
||||||
|
runtime_lines = _runtime_health_lines()
|
||||||
|
if runtime_lines:
|
||||||
|
print()
|
||||||
|
print("Recent gateway health:")
|
||||||
|
for line in runtime_lines:
|
||||||
|
print(f" {line}")
|
||||||
print()
|
print()
|
||||||
print("To start:")
|
print("To start:")
|
||||||
print(" hermes gateway # Run in foreground")
|
print(" hermes gateway # Run in foreground")
|
||||||
|
|
|
||||||
46
tests/gateway/test_runner_fatal_adapter.py
Normal file
46
tests/gateway/test_runner_fatal_adapter.py
Normal file
|
|
@ -0,0 +1,46 @@
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
||||||
|
from gateway.platforms.base import BasePlatformAdapter
|
||||||
|
from gateway.run import GatewayRunner
|
||||||
|
|
||||||
|
|
||||||
|
class _FatalAdapter(BasePlatformAdapter):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(PlatformConfig(enabled=True, token="token"), Platform.TELEGRAM)
|
||||||
|
|
||||||
|
async def connect(self) -> bool:
|
||||||
|
self._set_fatal_error(
|
||||||
|
"telegram_token_lock",
|
||||||
|
"Another local Hermes gateway is already using this Telegram bot token.",
|
||||||
|
retryable=False,
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def disconnect(self) -> None:
|
||||||
|
self._mark_disconnected()
|
||||||
|
|
||||||
|
async def send(self, chat_id, content, reply_to=None, metadata=None):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def get_chat_info(self, chat_id):
|
||||||
|
return {"id": chat_id}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_runner_requests_clean_exit_for_nonretryable_startup_conflict(monkeypatch, tmp_path):
|
||||||
|
config = GatewayConfig(
|
||||||
|
platforms={
|
||||||
|
Platform.TELEGRAM: PlatformConfig(enabled=True, token="token")
|
||||||
|
},
|
||||||
|
sessions_dir=tmp_path / "sessions",
|
||||||
|
)
|
||||||
|
runner = GatewayRunner(config)
|
||||||
|
|
||||||
|
monkeypatch.setattr(runner, "_create_adapter", lambda platform, platform_config: _FatalAdapter())
|
||||||
|
|
||||||
|
ok = await runner.start()
|
||||||
|
|
||||||
|
assert ok is True
|
||||||
|
assert runner.should_exit_cleanly is True
|
||||||
|
assert "already using this Telegram bot token" in runner.exit_reason
|
||||||
|
|
@ -25,3 +25,77 @@ class TestGatewayPidState:
|
||||||
|
|
||||||
assert status.get_running_pid() is None
|
assert status.get_running_pid() is None
|
||||||
assert not pid_path.exists()
|
assert not pid_path.exists()
|
||||||
|
|
||||||
|
|
||||||
|
class TestGatewayRuntimeStatus:
|
||||||
|
def test_write_runtime_status_records_platform_failure(self, tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||||
|
|
||||||
|
status.write_runtime_status(
|
||||||
|
gateway_state="startup_failed",
|
||||||
|
exit_reason="telegram conflict",
|
||||||
|
platform="telegram",
|
||||||
|
platform_state="fatal",
|
||||||
|
error_code="telegram_polling_conflict",
|
||||||
|
error_message="another poller is active",
|
||||||
|
)
|
||||||
|
|
||||||
|
payload = status.read_runtime_status()
|
||||||
|
assert payload["gateway_state"] == "startup_failed"
|
||||||
|
assert payload["exit_reason"] == "telegram conflict"
|
||||||
|
assert payload["platforms"]["telegram"]["state"] == "fatal"
|
||||||
|
assert payload["platforms"]["telegram"]["error_code"] == "telegram_polling_conflict"
|
||||||
|
assert payload["platforms"]["telegram"]["error_message"] == "another poller is active"
|
||||||
|
|
||||||
|
|
||||||
|
class TestScopedLocks:
|
||||||
|
def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
|
||||||
|
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
|
||||||
|
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
lock_path.write_text(json.dumps({
|
||||||
|
"pid": 99999,
|
||||||
|
"start_time": 123,
|
||||||
|
"kind": "hermes-gateway",
|
||||||
|
}))
|
||||||
|
|
||||||
|
monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
|
||||||
|
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
|
||||||
|
|
||||||
|
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
|
||||||
|
|
||||||
|
assert acquired is False
|
||||||
|
assert existing["pid"] == 99999
|
||||||
|
|
||||||
|
def test_acquire_scoped_lock_replaces_stale_record(self, tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
|
||||||
|
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
|
||||||
|
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
lock_path.write_text(json.dumps({
|
||||||
|
"pid": 99999,
|
||||||
|
"start_time": 123,
|
||||||
|
"kind": "hermes-gateway",
|
||||||
|
}))
|
||||||
|
|
||||||
|
def fake_kill(pid, sig):
|
||||||
|
raise ProcessLookupError
|
||||||
|
|
||||||
|
monkeypatch.setattr(status.os, "kill", fake_kill)
|
||||||
|
|
||||||
|
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
|
||||||
|
|
||||||
|
assert acquired is True
|
||||||
|
payload = json.loads(lock_path.read_text())
|
||||||
|
assert payload["pid"] == os.getpid()
|
||||||
|
assert payload["metadata"]["platform"] == "telegram"
|
||||||
|
|
||||||
|
def test_release_scoped_lock_only_removes_current_owner(self, tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
|
||||||
|
|
||||||
|
acquired, _ = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
|
||||||
|
assert acquired is True
|
||||||
|
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
|
||||||
|
assert lock_path.exists()
|
||||||
|
|
||||||
|
status.release_scoped_lock("telegram-bot-token", "secret")
|
||||||
|
assert not lock_path.exists()
|
||||||
|
|
|
||||||
100
tests/gateway/test_telegram_conflict.py
Normal file
100
tests/gateway/test_telegram_conflict.py
Normal file
|
|
@ -0,0 +1,100 @@
|
||||||
|
import asyncio
|
||||||
|
import sys
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from gateway.config import PlatformConfig
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_telegram_mock():
|
||||||
|
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
|
||||||
|
return
|
||||||
|
|
||||||
|
telegram_mod = MagicMock()
|
||||||
|
telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
|
||||||
|
telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
|
||||||
|
telegram_mod.constants.ChatType.GROUP = "group"
|
||||||
|
telegram_mod.constants.ChatType.SUPERGROUP = "supergroup"
|
||||||
|
telegram_mod.constants.ChatType.CHANNEL = "channel"
|
||||||
|
telegram_mod.constants.ChatType.PRIVATE = "private"
|
||||||
|
|
||||||
|
for name in ("telegram", "telegram.ext", "telegram.constants"):
|
||||||
|
sys.modules.setdefault(name, telegram_mod)
|
||||||
|
|
||||||
|
|
||||||
|
_ensure_telegram_mock()
|
||||||
|
|
||||||
|
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_connect_rejects_same_host_token_lock(monkeypatch):
|
||||||
|
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token"))
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"gateway.status.acquire_scoped_lock",
|
||||||
|
lambda scope, identity, metadata=None: (False, {"pid": 4242}),
|
||||||
|
)
|
||||||
|
|
||||||
|
ok = await adapter.connect()
|
||||||
|
|
||||||
|
assert ok is False
|
||||||
|
assert adapter.fatal_error_code == "telegram_token_lock"
|
||||||
|
assert adapter.has_fatal_error is True
|
||||||
|
assert "already using this Telegram bot token" in adapter.fatal_error_message
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_polling_conflict_stops_polling_and_notifies_handler(monkeypatch):
|
||||||
|
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token"))
|
||||||
|
fatal_handler = AsyncMock()
|
||||||
|
adapter.set_fatal_error_handler(fatal_handler)
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"gateway.status.acquire_scoped_lock",
|
||||||
|
lambda scope, identity, metadata=None: (True, None),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"gateway.status.release_scoped_lock",
|
||||||
|
lambda scope, identity: None,
|
||||||
|
)
|
||||||
|
|
||||||
|
captured = {}
|
||||||
|
|
||||||
|
async def fake_start_polling(**kwargs):
|
||||||
|
captured["error_callback"] = kwargs["error_callback"]
|
||||||
|
|
||||||
|
updater = SimpleNamespace(
|
||||||
|
start_polling=AsyncMock(side_effect=fake_start_polling),
|
||||||
|
stop=AsyncMock(),
|
||||||
|
)
|
||||||
|
bot = SimpleNamespace(set_my_commands=AsyncMock())
|
||||||
|
app = SimpleNamespace(
|
||||||
|
bot=bot,
|
||||||
|
updater=updater,
|
||||||
|
add_handler=MagicMock(),
|
||||||
|
initialize=AsyncMock(),
|
||||||
|
start=AsyncMock(),
|
||||||
|
)
|
||||||
|
builder = MagicMock()
|
||||||
|
builder.token.return_value = builder
|
||||||
|
builder.build.return_value = app
|
||||||
|
monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder)))
|
||||||
|
|
||||||
|
ok = await adapter.connect()
|
||||||
|
|
||||||
|
assert ok is True
|
||||||
|
assert callable(captured["error_callback"])
|
||||||
|
|
||||||
|
conflict = type("Conflict", (Exception,), {})
|
||||||
|
captured["error_callback"](conflict("Conflict: terminated by other getUpdates request; make sure that only one bot instance is running"))
|
||||||
|
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
assert adapter.fatal_error_code == "telegram_polling_conflict"
|
||||||
|
assert adapter.has_fatal_error is True
|
||||||
|
updater.stop.assert_awaited()
|
||||||
|
fatal_handler.assert_awaited_once()
|
||||||
22
tests/hermes_cli/test_gateway_runtime_health.py
Normal file
22
tests/hermes_cli/test_gateway_runtime_health.py
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
from hermes_cli.gateway import _runtime_health_lines
|
||||||
|
|
||||||
|
|
||||||
|
def test_runtime_health_lines_include_fatal_platform_and_startup_reason(monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"gateway.status.read_runtime_status",
|
||||||
|
lambda: {
|
||||||
|
"gateway_state": "startup_failed",
|
||||||
|
"exit_reason": "telegram conflict",
|
||||||
|
"platforms": {
|
||||||
|
"telegram": {
|
||||||
|
"state": "fatal",
|
||||||
|
"error_message": "another poller is active",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
lines = _runtime_health_lines()
|
||||||
|
|
||||||
|
assert "⚠ telegram: another poller is active" in lines
|
||||||
|
assert "⚠ Last startup issue: telegram conflict" in lines
|
||||||
Loading…
Add table
Add a link
Reference in a new issue