mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
fix(async): close unscheduled coroutines in all threadsafe bridges (#26584)
Wraps every sync->async coroutine-scheduling site in the codebase with a new agent.async_utils.safe_schedule_threadsafe() helper that closes the coroutine on scheduling failure (closed loop, shutdown race, etc.) instead of leaking it as 'coroutine was never awaited' RuntimeWarnings plus reference leaks. 22 production call sites migrated across the codebase: - acp_adapter/events.py, acp_adapter/permissions.py - agent/lsp/manager.py - cron/scheduler.py (media + text delivery paths) - gateway/platforms/feishu.py (5 sites, via existing _submit_on_loop helper which now delegates to safe_schedule_threadsafe) - gateway/run.py (10 sites: telegram rename, agent:step hook, status callback, interim+bg-review, clarify send, exec-approval button+text, temp-bubble cleanup, channel-directory refresh) - plugins/memory/hindsight, plugins/platforms/google_chat - tools/browser_supervisor.py (3), browser_cdp_tool.py, computer_use/cua_backend.py, slash_confirm.py - tools/environments/modal.py (_AsyncWorker) - tools/mcp_tool.py (2 + 8 _run_on_mcp_loop callers converted to factory-style so the coroutine is never constructed on a dead loop) - tui_gateway/ws.py Tests: new tests/agent/test_async_utils.py covers helper behavior under live loop, dead loop, None loop, and scheduling exceptions. Regression tests added at three PR-original sites (acp events, acp permissions, mcp loop runner) mirroring contributor's intent. Live-tested end-to-end: - Helper stress test: 1500 schedules across live/dead/race scenarios, zero leaked coroutines - Race exercised: 5000 schedules with loop killed mid-flight, 100 ok / 4900 None returns, zero leaks - hermes chat -q with terminal tool call (exercises step_callback bridge) - MCP probe against failing subprocess servers + factory path - Real gateway daemon boot + SIGINT shutdown across multiple platform adapter inits - WSTransport 100 live + 50 dead-loop writes - Cron delivery path live + dead loop Salvages PR #2657 — adopts contributor's intent over a much wider site list and a single centralized helper instead of inline try/except at each site. 3 of the original PR's 6 sites no longer exist on main (environments/patches.py deleted, DingTalk refactored to native async); the equivalent fix lives in tools/environments/modal.py instead. Co-authored-by: JithendraNara <jithendranaidunara@gmail.com>
This commit is contained in:
parent
931caf2b2d
commit
4e89c53082
23 changed files with 690 additions and 186 deletions
206
gateway/run.py
206
gateway/run.py
|
|
@ -50,6 +50,7 @@ from typing import Dict, Optional, Any, List, Union
|
|||
# gateway is a long-running daemon, so its boot cost matters less than
|
||||
# preserving the established test-patch surface.
|
||||
from agent.account_usage import fetch_account_usage, render_account_usage_lines
|
||||
from agent.async_utils import safe_schedule_threadsafe
|
||||
from agent.i18n import t
|
||||
from hermes_cli.config import cfg_get
|
||||
|
||||
|
|
@ -11217,10 +11218,14 @@ class GatewayRunner:
|
|||
copied_source = dataclasses.replace(source)
|
||||
except Exception:
|
||||
copied_source = source
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
future = safe_schedule_threadsafe(
|
||||
self._rename_telegram_topic_for_session_title(copied_source, session_id, title),
|
||||
loop,
|
||||
logger=logger,
|
||||
log_message="Telegram topic title rename failed to schedule",
|
||||
)
|
||||
if future is None:
|
||||
return
|
||||
def _log_rename_failure(fut) -> None:
|
||||
try:
|
||||
fut.result()
|
||||
|
|
@ -14810,29 +14815,28 @@ class GatewayRunner:
|
|||
def _step_callback_sync(iteration: int, prev_tools: list) -> None:
|
||||
if not _run_still_current():
|
||||
return
|
||||
try:
|
||||
# prev_tools may be list[str] or list[dict] with "name"/"result"
|
||||
# keys. Normalise to keep "tool_names" backward-compatible for
|
||||
# user-authored hooks that do ', '.join(tool_names)'.
|
||||
_names: list[str] = []
|
||||
for _t in (prev_tools or []):
|
||||
if isinstance(_t, dict):
|
||||
_names.append(_t.get("name") or "")
|
||||
else:
|
||||
_names.append(str(_t))
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
_hooks_ref.emit("agent:step", {
|
||||
"platform": source.platform.value if source.platform else "",
|
||||
"user_id": source.user_id,
|
||||
"session_id": session_id,
|
||||
"iteration": iteration,
|
||||
"tool_names": _names,
|
||||
"tools": prev_tools,
|
||||
}),
|
||||
_loop_for_step,
|
||||
)
|
||||
except Exception as _e:
|
||||
logger.debug("agent:step hook error: %s", _e)
|
||||
# prev_tools may be list[str] or list[dict] with "name"/"result"
|
||||
# keys. Normalise to keep "tool_names" backward-compatible for
|
||||
# user-authored hooks that do ', '.join(tool_names)'.
|
||||
_names: list[str] = []
|
||||
for _t in (prev_tools or []):
|
||||
if isinstance(_t, dict):
|
||||
_names.append(_t.get("name") or "")
|
||||
else:
|
||||
_names.append(str(_t))
|
||||
safe_schedule_threadsafe(
|
||||
_hooks_ref.emit("agent:step", {
|
||||
"platform": source.platform.value if source.platform else "",
|
||||
"user_id": source.user_id,
|
||||
"session_id": session_id,
|
||||
"iteration": iteration,
|
||||
"tool_names": _names,
|
||||
"tools": prev_tools,
|
||||
}),
|
||||
_loop_for_step,
|
||||
logger=logger,
|
||||
log_message="agent:step hook scheduling error",
|
||||
)
|
||||
|
||||
# Bridge sync status_callback → async adapter.send for context pressure
|
||||
_status_adapter = self.adapters.get(source.platform)
|
||||
|
|
@ -14852,27 +14856,28 @@ class GatewayRunner:
|
|||
def _status_callback_sync(event_type: str, message: str) -> None:
|
||||
if not _status_adapter or not _run_still_current():
|
||||
return
|
||||
try:
|
||||
_fut = asyncio.run_coroutine_threadsafe(
|
||||
_status_adapter.send(
|
||||
_status_chat_id,
|
||||
message,
|
||||
metadata=_status_thread_metadata,
|
||||
),
|
||||
_loop_for_step,
|
||||
)
|
||||
if _cleanup_progress:
|
||||
def _track_status_id(fut) -> None:
|
||||
try:
|
||||
res = fut.result()
|
||||
except Exception:
|
||||
return
|
||||
mid = getattr(res, "message_id", None)
|
||||
if getattr(res, "success", False) and mid:
|
||||
_cleanup_msg_ids.append(str(mid))
|
||||
_fut.add_done_callback(_track_status_id)
|
||||
except Exception as _e:
|
||||
logger.debug("status_callback error (%s): %s", event_type, _e)
|
||||
_fut = safe_schedule_threadsafe(
|
||||
_status_adapter.send(
|
||||
_status_chat_id,
|
||||
message,
|
||||
metadata=_status_thread_metadata,
|
||||
),
|
||||
_loop_for_step,
|
||||
logger=logger,
|
||||
log_message=f"status_callback ({event_type}) scheduling error",
|
||||
)
|
||||
if _fut is None:
|
||||
return
|
||||
if _cleanup_progress:
|
||||
def _track_status_id(fut) -> None:
|
||||
try:
|
||||
res = fut.result()
|
||||
except Exception:
|
||||
return
|
||||
mid = getattr(res, "message_id", None)
|
||||
if getattr(res, "success", False) and mid:
|
||||
_cleanup_msg_ids.append(str(mid))
|
||||
_fut.add_done_callback(_track_status_id)
|
||||
|
||||
def run_sync():
|
||||
# The conditional re-assignment of `message` further below
|
||||
|
|
@ -15026,17 +15031,16 @@ class GatewayRunner:
|
|||
return
|
||||
if already_streamed or not _status_adapter or not str(text or "").strip():
|
||||
return
|
||||
try:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
_status_adapter.send(
|
||||
_status_chat_id,
|
||||
text,
|
||||
metadata=_status_thread_metadata,
|
||||
),
|
||||
_loop_for_step,
|
||||
)
|
||||
except Exception as _e:
|
||||
logger.debug("interim_assistant_callback error: %s", _e)
|
||||
safe_schedule_threadsafe(
|
||||
_status_adapter.send(
|
||||
_status_chat_id,
|
||||
text,
|
||||
metadata=_status_thread_metadata,
|
||||
),
|
||||
_loop_for_step,
|
||||
logger=logger,
|
||||
log_message="interim_assistant_callback scheduling error",
|
||||
)
|
||||
|
||||
turn_route = self._resolve_turn_agent_config(message, model, runtime_kwargs)
|
||||
|
||||
|
|
@ -15125,17 +15129,16 @@ class GatewayRunner:
|
|||
def _deliver_bg_review_message(message: str) -> None:
|
||||
if not _status_adapter or not _run_still_current():
|
||||
return
|
||||
try:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
_status_adapter.send(
|
||||
_status_chat_id,
|
||||
message,
|
||||
metadata=_status_thread_metadata,
|
||||
),
|
||||
_loop_for_step,
|
||||
)
|
||||
except Exception as _e:
|
||||
logger.debug("background_review_callback error: %s", _e)
|
||||
safe_schedule_threadsafe(
|
||||
_status_adapter.send(
|
||||
_status_chat_id,
|
||||
message,
|
||||
metadata=_status_thread_metadata,
|
||||
),
|
||||
_loop_for_step,
|
||||
logger=logger,
|
||||
log_message="background_review_callback scheduling error",
|
||||
)
|
||||
|
||||
def _release_bg_review_messages() -> None:
|
||||
_bg_review_release.set()
|
||||
|
|
@ -15207,23 +15210,28 @@ class GatewayRunner:
|
|||
pass
|
||||
|
||||
send_ok = False
|
||||
try:
|
||||
fut = asyncio.run_coroutine_threadsafe(
|
||||
_status_adapter.send_clarify(
|
||||
chat_id=_status_chat_id,
|
||||
question=question,
|
||||
choices=list(choices) if choices else None,
|
||||
clarify_id=clarify_id,
|
||||
session_key=session_key or "",
|
||||
metadata=_status_thread_metadata,
|
||||
),
|
||||
_loop_for_step,
|
||||
)
|
||||
result = fut.result(timeout=15)
|
||||
send_ok = bool(getattr(result, "success", False))
|
||||
except Exception as exc:
|
||||
logger.warning("Clarify send failed: %s", exc)
|
||||
fut = safe_schedule_threadsafe(
|
||||
_status_adapter.send_clarify(
|
||||
chat_id=_status_chat_id,
|
||||
question=question,
|
||||
choices=list(choices) if choices else None,
|
||||
clarify_id=clarify_id,
|
||||
session_key=session_key or "",
|
||||
metadata=_status_thread_metadata,
|
||||
),
|
||||
_loop_for_step,
|
||||
logger=logger,
|
||||
log_message="Clarify send failed to schedule",
|
||||
)
|
||||
if fut is None:
|
||||
send_ok = False
|
||||
else:
|
||||
try:
|
||||
result = fut.result(timeout=15)
|
||||
send_ok = bool(getattr(result, "success", False))
|
||||
except Exception as exc:
|
||||
logger.warning("Clarify send failed: %s", exc)
|
||||
send_ok = False
|
||||
|
||||
if not send_ok:
|
||||
# Couldn't deliver the prompt — clean up and return
|
||||
|
|
@ -15343,7 +15351,7 @@ class GatewayRunner:
|
|||
# false positives from MagicMock auto-attribute creation in tests.
|
||||
if getattr(type(_status_adapter), "send_exec_approval", None) is not None:
|
||||
try:
|
||||
_approval_result = asyncio.run_coroutine_threadsafe(
|
||||
_approval_fut = safe_schedule_threadsafe(
|
||||
_status_adapter.send_exec_approval(
|
||||
chat_id=_status_chat_id,
|
||||
command=cmd,
|
||||
|
|
@ -15352,7 +15360,12 @@ class GatewayRunner:
|
|||
metadata=_status_thread_metadata,
|
||||
),
|
||||
_loop_for_step,
|
||||
).result(timeout=15)
|
||||
logger=logger,
|
||||
log_message="send_exec_approval scheduling error",
|
||||
)
|
||||
if _approval_fut is None:
|
||||
raise RuntimeError("send_exec_approval: loop unavailable")
|
||||
_approval_result = _approval_fut.result(timeout=15)
|
||||
if _approval_result.success:
|
||||
return
|
||||
logger.warning(
|
||||
|
|
@ -15374,14 +15387,18 @@ class GatewayRunner:
|
|||
f"for the session, `/approve always` to approve permanently, or `/deny` to cancel."
|
||||
)
|
||||
try:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
_approval_send_fut = safe_schedule_threadsafe(
|
||||
_status_adapter.send(
|
||||
_status_chat_id,
|
||||
msg,
|
||||
metadata=_status_thread_metadata,
|
||||
),
|
||||
_loop_for_step,
|
||||
).result(timeout=15)
|
||||
logger=logger,
|
||||
log_message="Approval text-send scheduling error",
|
||||
)
|
||||
if _approval_send_fut is not None:
|
||||
_approval_send_fut.result(timeout=15)
|
||||
except Exception as _e:
|
||||
logger.error("Failed to send approval request: %s", _e)
|
||||
|
||||
|
|
@ -16343,7 +16360,11 @@ class GatewayRunner:
|
|||
except Exception:
|
||||
pass
|
||||
try:
|
||||
asyncio.run_coroutine_threadsafe(_delete_all(), _loop_snapshot)
|
||||
safe_schedule_threadsafe(
|
||||
_delete_all(), _loop_snapshot,
|
||||
logger=logger,
|
||||
log_message="Temp bubble cleanup scheduling error",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
|
@ -16400,10 +16421,13 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
|
|||
# this ticker runs in a background thread. Schedule onto
|
||||
# the gateway event loop and wait briefly for completion
|
||||
# so refresh failures are still logged via the except.
|
||||
fut = asyncio.run_coroutine_threadsafe(
|
||||
build_channel_directory(adapters), loop
|
||||
fut = safe_schedule_threadsafe(
|
||||
build_channel_directory(adapters), loop,
|
||||
logger=logger,
|
||||
log_message="Channel directory refresh scheduling error",
|
||||
)
|
||||
fut.result(timeout=30)
|
||||
if fut is not None:
|
||||
fut.result(timeout=30)
|
||||
except Exception as e:
|
||||
logger.debug("Channel directory refresh error: %s", e)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue