mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
Wraps every sync->async coroutine-scheduling site in the codebase with a new agent.async_utils.safe_schedule_threadsafe() helper that closes the coroutine on scheduling failure (closed loop, shutdown race, etc.) instead of leaking it as 'coroutine was never awaited' RuntimeWarnings plus reference leaks. 22 production call sites migrated across the codebase: - acp_adapter/events.py, acp_adapter/permissions.py - agent/lsp/manager.py - cron/scheduler.py (media + text delivery paths) - gateway/platforms/feishu.py (5 sites, via existing _submit_on_loop helper which now delegates to safe_schedule_threadsafe) - gateway/run.py (10 sites: telegram rename, agent:step hook, status callback, interim+bg-review, clarify send, exec-approval button+text, temp-bubble cleanup, channel-directory refresh) - plugins/memory/hindsight, plugins/platforms/google_chat - tools/browser_supervisor.py (3), browser_cdp_tool.py, computer_use/cua_backend.py, slash_confirm.py - tools/environments/modal.py (_AsyncWorker) - tools/mcp_tool.py (2 + 8 _run_on_mcp_loop callers converted to factory-style so the coroutine is never constructed on a dead loop) - tui_gateway/ws.py Tests: new tests/agent/test_async_utils.py covers helper behavior under live loop, dead loop, None loop, and scheduling exceptions. Regression tests added at three PR-original sites (acp events, acp permissions, mcp loop runner) mirroring contributor's intent. Live-tested end-to-end: - Helper stress test: 1500 schedules across live/dead/race scenarios, zero leaked coroutines - Race exercised: 5000 schedules with loop killed mid-flight, 100 ok / 4900 None returns, zero leaks - hermes chat -q with terminal tool call (exercises step_callback bridge) - MCP probe against failing subprocess servers + factory path - Real gateway daemon boot + SIGINT shutdown across multiple platform adapter inits - WSTransport 100 live + 50 dead-loop writes - Cron delivery path live + dead loop Salvages PR #2657 — adopts contributor's intent over a much wider site list and a single centralized helper instead of inline try/except at each site. 3 of the original PR's 6 sites no longer exist on main (environments/patches.py deleted, DingTalk refactored to native async); the equivalent fix lives in tools/environments/modal.py instead. Co-authored-by: JithendraNara <jithendranaidunara@gmail.com>
167 lines
5.6 KiB
Python
167 lines
5.6 KiB
Python
"""Generic slash-command confirmation primitive (gateway-side).
|
|
|
|
Slash commands that have a non-destructive but expensive side effect worth
|
|
surfacing to the user (currently only ``/reload-mcp``, which invalidates
|
|
the provider prompt cache) route through this module.
|
|
|
|
Two delivery paths:
|
|
|
|
1. Button UI — adapters that override ``send_slash_confirm`` render
|
|
three inline buttons (Approve Once / Always Approve / Cancel). The
|
|
button callback calls ``resolve(session_key, confirm_id, choice)``.
|
|
|
|
2. Text fallback — adapters without button UIs get a plain text prompt.
|
|
Users reply with ``/approve``, ``/always``, or ``/cancel``; the
|
|
gateway's ``_handle_message`` intercepts those replies and calls
|
|
``resolve()`` directly.
|
|
|
|
State is stored module-level (like ``tools.approval``) so platform
|
|
adapters can resolve callbacks without needing a backreference to the
|
|
``GatewayRunner`` instance. The CLI path (``cli.py``) uses a local
|
|
synchronous variant — see ``_prompt_slash_confirm`` there.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import threading
|
|
import time
|
|
from typing import Any, Awaitable, Callable, Dict, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Pending confirmations keyed by gateway session_key. Each entry:
|
|
# {
|
|
# "confirm_id": str,
|
|
# "command": str, # e.g. "reload-mcp"
|
|
# "handler": Callable[[str], Awaitable[Optional[str]]],
|
|
# "created_at": float, # time.time()
|
|
# }
|
|
_pending: Dict[str, Dict[str, Any]] = {}
|
|
_lock = threading.RLock()
|
|
|
|
# Default timeout — a pending confirm older than this is discarded when
|
|
# the next message arrives for the same session. Buttons work up until
|
|
# the adapter drops the callback_data (Telegram: ~48h; Discord: ephemeral;
|
|
# Slack: 3s ack + long-lived actions).
|
|
DEFAULT_TIMEOUT_SECONDS = 300
|
|
|
|
|
|
def register(
|
|
session_key: str,
|
|
confirm_id: str,
|
|
command: str,
|
|
handler: Callable[[str], Awaitable[Optional[str]]],
|
|
) -> None:
|
|
"""Register a pending slash-command confirmation.
|
|
|
|
Overwrites any prior pending confirm for the same ``session_key`` — the
|
|
user invoking a new confirmable command supersedes the stale one.
|
|
"""
|
|
with _lock:
|
|
_pending[session_key] = {
|
|
"confirm_id": confirm_id,
|
|
"command": command,
|
|
"handler": handler,
|
|
"created_at": time.time(),
|
|
}
|
|
|
|
|
|
def get_pending(session_key: str) -> Optional[Dict[str, Any]]:
|
|
"""Return the pending confirm dict for a session, or None."""
|
|
with _lock:
|
|
entry = _pending.get(session_key)
|
|
return dict(entry) if entry else None
|
|
|
|
|
|
def clear(session_key: str) -> None:
|
|
"""Drop the pending confirm for ``session_key`` without running it."""
|
|
with _lock:
|
|
_pending.pop(session_key, None)
|
|
|
|
|
|
def clear_if_stale(session_key: str, timeout: float = DEFAULT_TIMEOUT_SECONDS) -> bool:
|
|
"""Drop the pending confirm if older than ``timeout`` seconds.
|
|
|
|
Returns True if an entry was dropped.
|
|
"""
|
|
with _lock:
|
|
entry = _pending.get(session_key)
|
|
if not entry:
|
|
return False
|
|
if time.time() - float(entry.get("created_at", 0) or 0) > timeout:
|
|
_pending.pop(session_key, None)
|
|
return True
|
|
return False
|
|
|
|
|
|
async def resolve(
|
|
session_key: str,
|
|
confirm_id: str,
|
|
choice: str,
|
|
timeout: float = DEFAULT_TIMEOUT_SECONDS,
|
|
) -> Optional[str]:
|
|
"""Resolve a pending confirm.
|
|
|
|
``choice`` must be one of ``"once"``, ``"always"``, or ``"cancel"``.
|
|
Returns the handler's output string (to be sent as a follow-up
|
|
message), or ``None`` if the confirm was stale, already resolved, or
|
|
the confirm_id doesn't match.
|
|
|
|
Safe to call from an asyncio callback (button click) or from the
|
|
gateway's message intercept path.
|
|
"""
|
|
with _lock:
|
|
entry = _pending.get(session_key)
|
|
if not entry:
|
|
return None
|
|
if entry.get("confirm_id") != confirm_id:
|
|
# Stale confirm_id — superseded by a newer prompt on the same session.
|
|
return None
|
|
# Pop before we run the handler to prevent duplicate callbacks
|
|
# (e.g. button double-click) from running it twice.
|
|
_pending.pop(session_key, None)
|
|
if time.time() - float(entry.get("created_at", 0) or 0) > timeout:
|
|
return None
|
|
handler = entry.get("handler")
|
|
command = entry.get("command", "?")
|
|
|
|
if not handler:
|
|
return None
|
|
try:
|
|
result = await handler(choice)
|
|
except Exception as exc:
|
|
logger.error(
|
|
"Slash-confirm handler for /%s raised: %s",
|
|
command, exc, exc_info=True,
|
|
)
|
|
return f"❌ Error handling confirmation: {exc}"
|
|
return result if isinstance(result, str) else None
|
|
|
|
|
|
def resolve_sync_compat(
|
|
loop: asyncio.AbstractEventLoop,
|
|
session_key: str,
|
|
confirm_id: str,
|
|
choice: str,
|
|
) -> Optional[str]:
|
|
"""Synchronous helper: schedule resolve() on a loop and wait for the result.
|
|
|
|
Used by platform callback paths that run on a different thread than the
|
|
event loop (e.g. Discord's button click handler in some configurations).
|
|
Prefer the async ``resolve()`` from an async context.
|
|
"""
|
|
try:
|
|
from agent.async_utils import safe_schedule_threadsafe
|
|
fut = safe_schedule_threadsafe(
|
|
resolve(session_key, confirm_id, choice), loop,
|
|
logger=logger,
|
|
log_message="resolve_sync_compat scheduling failed",
|
|
)
|
|
if fut is None:
|
|
return None
|
|
return fut.result(timeout=30)
|
|
except Exception as exc:
|
|
logger.error("resolve_sync_compat failed: %s", exc)
|
|
return None
|