diff --git a/acp_adapter/events.py b/acp_adapter/events.py index 1257f902ebb..f0442ca2e8f 100644 --- a/acp_adapter/events.py +++ b/acp_adapter/events.py @@ -31,10 +31,17 @@ def _send_update( update: Any, ) -> None: """Fire-and-forget an ACP session update from a worker thread.""" + from agent.async_utils import safe_schedule_threadsafe + + future = safe_schedule_threadsafe( + conn.session_update(session_id, update), + loop, + logger=logger, + log_message="Failed to send ACP update", + ) + if future is None: + return try: - future = asyncio.run_coroutine_threadsafe( - conn.session_update(session_id, update), loop - ) future.result(timeout=5) except Exception: logger.debug("Failed to send ACP update", exc_info=True) diff --git a/acp_adapter/permissions.py b/acp_adapter/permissions.py index 44aead28742..76474e55dac 100644 --- a/acp_adapter/permissions.py +++ b/acp_adapter/permissions.py @@ -111,21 +111,28 @@ def make_approval_callback( allow_permanent: bool = True, **_: object, ) -> str: + from agent.async_utils import safe_schedule_threadsafe + options = _build_permission_options(allow_permanent=allow_permanent) - future = None + tool_call = _build_permission_tool_call(command, description) + coro = request_permission_fn( + session_id=session_id, + tool_call=tool_call, + options=options, + ) + future = safe_schedule_threadsafe( + coro, loop, + logger=logger, + log_message="Permission request: failed to schedule on loop", + ) + if future is None: + return "deny" + try: - tool_call = _build_permission_tool_call(command, description) - coro = request_permission_fn( - session_id=session_id, - tool_call=tool_call, - options=options, - ) - future = asyncio.run_coroutine_threadsafe(coro, loop) response = future.result(timeout=timeout) except (FutureTimeout, Exception) as exc: - if future is not None: - future.cancel() + future.cancel() logger.warning("Permission request timed out or failed: %s", exc) return "deny" diff --git a/agent/async_utils.py b/agent/async_utils.py new file mode 100644 index 00000000000..d268e1a3a84 --- /dev/null +++ b/agent/async_utils.py @@ -0,0 +1,68 @@ +"""Async/sync bridging helpers. + +The codebase has ~30 sites that schedule a coroutine onto an event loop from a +worker thread via :func:`asyncio.run_coroutine_threadsafe`. That function can +raise :class:`RuntimeError` (e.g. the loop was closed during a shutdown race), +and when it does the coroutine object is never awaited and never closed — +which triggers a ``"coroutine '' was never awaited"`` RuntimeWarning and +leaks the coroutine's frame until GC. + +:func:`safe_schedule_threadsafe` wraps the call, closes the coroutine on +scheduling failure, and returns ``None`` (instead of a half-formed future) so +callers can branch cleanly: + + fut = safe_schedule_threadsafe(coro, loop) + if fut is None: + return # or fallback behavior + fut.result(timeout=5) + +The helper deliberately does NOT also handle ``future.result()`` failures — +that is a separate concern. Once the loop has accepted the coroutine, its +lifecycle belongs to the loop, not the scheduling thread. +""" +from __future__ import annotations + +import asyncio +import logging +from concurrent.futures import Future +from typing import Any, Coroutine, Optional + + +_DEFAULT_LOGGER = logging.getLogger(__name__) + + +def safe_schedule_threadsafe( + coro: Coroutine[Any, Any, Any], + loop: Optional[asyncio.AbstractEventLoop], + *, + logger: Optional[logging.Logger] = None, + log_message: str = "Failed to schedule coroutine on loop", + log_level: int = logging.DEBUG, +) -> Optional[Future]: + """Schedule ``coro`` on ``loop`` from a sync context, leak-safe. + + Returns the :class:`concurrent.futures.Future` on success, or ``None`` if + the loop is missing or :func:`asyncio.run_coroutine_threadsafe` raised + (e.g. the loop was closed during a shutdown race). In all failure paths + the coroutine is :meth:`close`-d so it does not trigger + ``"coroutine was never awaited"`` warnings or leak its frame. + + Callers retain full control over what to do with the returned future + (call ``.result(timeout=...)``, attach ``add_done_callback``, ignore it + fire-and-forget, etc.). + """ + log = logger if logger is not None else _DEFAULT_LOGGER + + if loop is None: + if asyncio.iscoroutine(coro): + coro.close() + log.log(log_level, "%s: loop is None", log_message) + return None + + try: + return asyncio.run_coroutine_threadsafe(coro, loop) + except Exception as exc: + if asyncio.iscoroutine(coro): + coro.close() + log.log(log_level, "%s: %s", log_message, exc) + return None diff --git a/agent/lsp/manager.py b/agent/lsp/manager.py index 34c0b0ba92b..7f5feaa170f 100644 --- a/agent/lsp/manager.py +++ b/agent/lsp/manager.py @@ -107,9 +107,14 @@ class _BackgroundLoop: Returns the coroutine's result, or raises its exception. """ + from agent.async_utils import safe_schedule_threadsafe if self._loop is None: + if asyncio.iscoroutine(coro): + coro.close() raise RuntimeError("background loop not started") - fut: ConcurrentFuture = asyncio.run_coroutine_threadsafe(coro, self._loop) + fut = safe_schedule_threadsafe(coro, self._loop) + if fut is None: + raise RuntimeError("background loop not running") try: return fut.result(timeout=timeout) except Exception: diff --git a/cron/scheduler.py b/cron/scheduler.py index b585ef2e42b..d470e8c2c74 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -464,7 +464,14 @@ def _send_media_via_adapter( else: coro = adapter.send_document(chat_id=chat_id, file_path=media_path, metadata=metadata) - future = asyncio.run_coroutine_threadsafe(coro, loop) + from agent.async_utils import safe_schedule_threadsafe + future = safe_schedule_threadsafe(coro, loop) + if future is None: + logger.warning( + "Job '%s': cannot send media %s, gateway loop unavailable", + job.get("id", "?"), media_path, + ) + return try: result = future.result(timeout=30) except TimeoutError: @@ -585,22 +592,26 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option text_to_send = cleaned_delivery_content.strip() adapter_ok = True if text_to_send: - future = asyncio.run_coroutine_threadsafe( + from agent.async_utils import safe_schedule_threadsafe + future = safe_schedule_threadsafe( runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata), loop, ) - try: - send_result = future.result(timeout=60) - except TimeoutError: - future.cancel() - raise - if send_result and not getattr(send_result, "success", True): - err = getattr(send_result, "error", "unknown") - logger.warning( - "Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone", - job["id"], platform_name, chat_id, err, - ) - adapter_ok = False # fall through to standalone path + if future is None: + adapter_ok = False + else: + try: + send_result = future.result(timeout=60) + except TimeoutError: + future.cancel() + raise + if send_result and not getattr(send_result, "success", True): + err = getattr(send_result, "error", "unknown") + logger.warning( + "Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone", + job["id"], platform_name, chat_id, err, + ) + adapter_ok = False # fall through to standalone path # Send extracted media files as native attachments via the live adapter if adapter_ok and media_files: diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index 8d60046d35d..a9b0447080d 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -2273,11 +2273,7 @@ class FeishuAdapter(BasePlatformAdapter): daemon=True, ).start() return - future = asyncio.run_coroutine_threadsafe( - self._handle_message_event_data(data), - loop, - ) - future.add_done_callback(self._log_background_failure) + self._submit_on_loop(loop, self._handle_message_event_data(data)) def _enqueue_pending_inbound_event(self, data: Any) -> bool: """Append an event to the pending-inbound queue. @@ -2353,16 +2349,12 @@ class FeishuAdapter(BasePlatformAdapter): dispatched = 0 requeue: List[Any] = [] for event in batch: - try: - fut = asyncio.run_coroutine_threadsafe( - self._handle_message_event_data(event), - loop, - ) - fut.add_done_callback(self._log_background_failure) + if self._submit_on_loop( + loop, self._handle_message_event_data(event) + ): dispatched += 1 - except RuntimeError: - # Loop closed between check and submit — requeue - # and poll again. + else: + # Loop closed/unavailable — requeue and poll again. requeue.append(event) if requeue: with self._pending_inbound_lock: @@ -2466,11 +2458,10 @@ class FeishuAdapter(BasePlatformAdapter): if not self._loop_accepts_callbacks(loop): logger.warning("[Feishu] Dropping drive comment event before adapter loop is ready") return - future = asyncio.run_coroutine_threadsafe( - handle_drive_comment_event(self._client, data, self_open_id=self._bot_open_id), + self._submit_on_loop( loop, + handle_drive_comment_event(self._client, data, self_open_id=self._bot_open_id), ) - future.add_done_callback(self._log_background_failure) def _on_reaction_event(self, event_type: str, data: Any) -> None: """Route user reactions on bot messages as synthetic text events.""" @@ -2498,11 +2489,7 @@ class FeishuAdapter(BasePlatformAdapter): or bool(getattr(loop, "is_closed", lambda: False)()) ): return - future = asyncio.run_coroutine_threadsafe( - self._handle_reaction_event(event_type, data), - loop, - ) - future.add_done_callback(self._log_background_failure) + self._submit_on_loop(loop, self._handle_reaction_event(event_type, data)) def _on_card_action_trigger(self, data: Any) -> Any: """Handle card-action callback from the Feishu SDK (synchronous). @@ -2548,11 +2535,14 @@ class FeishuAdapter(BasePlatformAdapter): def _submit_on_loop(self, loop: Any, coro: Any) -> bool: """Schedule background work on the adapter loop with shared failure logging.""" - try: - future = asyncio.run_coroutine_threadsafe(coro, loop) - except Exception: - coro.close() - logger.warning("[Feishu] Failed to schedule background callback work", exc_info=True) + from agent.async_utils import safe_schedule_threadsafe + future = safe_schedule_threadsafe( + coro, loop, + logger=logger, + log_message="[Feishu] Failed to schedule background callback work", + log_level=logging.WARNING, + ) + if future is None: return False future.add_done_callback(self._log_background_failure) return True diff --git a/gateway/run.py b/gateway/run.py index 5e8fce8e18d..f41357673f7 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -50,6 +50,7 @@ from typing import Dict, Optional, Any, List, Union # gateway is a long-running daemon, so its boot cost matters less than # preserving the established test-patch surface. from agent.account_usage import fetch_account_usage, render_account_usage_lines +from agent.async_utils import safe_schedule_threadsafe from agent.i18n import t from hermes_cli.config import cfg_get @@ -11217,10 +11218,14 @@ class GatewayRunner: copied_source = dataclasses.replace(source) except Exception: copied_source = source - future = asyncio.run_coroutine_threadsafe( + future = safe_schedule_threadsafe( self._rename_telegram_topic_for_session_title(copied_source, session_id, title), loop, + logger=logger, + log_message="Telegram topic title rename failed to schedule", ) + if future is None: + return def _log_rename_failure(fut) -> None: try: fut.result() @@ -14810,29 +14815,28 @@ class GatewayRunner: def _step_callback_sync(iteration: int, prev_tools: list) -> None: if not _run_still_current(): return - try: - # prev_tools may be list[str] or list[dict] with "name"/"result" - # keys. Normalise to keep "tool_names" backward-compatible for - # user-authored hooks that do ', '.join(tool_names)'. - _names: list[str] = [] - for _t in (prev_tools or []): - if isinstance(_t, dict): - _names.append(_t.get("name") or "") - else: - _names.append(str(_t)) - asyncio.run_coroutine_threadsafe( - _hooks_ref.emit("agent:step", { - "platform": source.platform.value if source.platform else "", - "user_id": source.user_id, - "session_id": session_id, - "iteration": iteration, - "tool_names": _names, - "tools": prev_tools, - }), - _loop_for_step, - ) - except Exception as _e: - logger.debug("agent:step hook error: %s", _e) + # prev_tools may be list[str] or list[dict] with "name"/"result" + # keys. Normalise to keep "tool_names" backward-compatible for + # user-authored hooks that do ', '.join(tool_names)'. + _names: list[str] = [] + for _t in (prev_tools or []): + if isinstance(_t, dict): + _names.append(_t.get("name") or "") + else: + _names.append(str(_t)) + safe_schedule_threadsafe( + _hooks_ref.emit("agent:step", { + "platform": source.platform.value if source.platform else "", + "user_id": source.user_id, + "session_id": session_id, + "iteration": iteration, + "tool_names": _names, + "tools": prev_tools, + }), + _loop_for_step, + logger=logger, + log_message="agent:step hook scheduling error", + ) # Bridge sync status_callback → async adapter.send for context pressure _status_adapter = self.adapters.get(source.platform) @@ -14852,27 +14856,28 @@ class GatewayRunner: def _status_callback_sync(event_type: str, message: str) -> None: if not _status_adapter or not _run_still_current(): return - try: - _fut = asyncio.run_coroutine_threadsafe( - _status_adapter.send( - _status_chat_id, - message, - metadata=_status_thread_metadata, - ), - _loop_for_step, - ) - if _cleanup_progress: - def _track_status_id(fut) -> None: - try: - res = fut.result() - except Exception: - return - mid = getattr(res, "message_id", None) - if getattr(res, "success", False) and mid: - _cleanup_msg_ids.append(str(mid)) - _fut.add_done_callback(_track_status_id) - except Exception as _e: - logger.debug("status_callback error (%s): %s", event_type, _e) + _fut = safe_schedule_threadsafe( + _status_adapter.send( + _status_chat_id, + message, + metadata=_status_thread_metadata, + ), + _loop_for_step, + logger=logger, + log_message=f"status_callback ({event_type}) scheduling error", + ) + if _fut is None: + return + if _cleanup_progress: + def _track_status_id(fut) -> None: + try: + res = fut.result() + except Exception: + return + mid = getattr(res, "message_id", None) + if getattr(res, "success", False) and mid: + _cleanup_msg_ids.append(str(mid)) + _fut.add_done_callback(_track_status_id) def run_sync(): # The conditional re-assignment of `message` further below @@ -15026,17 +15031,16 @@ class GatewayRunner: return if already_streamed or not _status_adapter or not str(text or "").strip(): return - try: - asyncio.run_coroutine_threadsafe( - _status_adapter.send( - _status_chat_id, - text, - metadata=_status_thread_metadata, - ), - _loop_for_step, - ) - except Exception as _e: - logger.debug("interim_assistant_callback error: %s", _e) + safe_schedule_threadsafe( + _status_adapter.send( + _status_chat_id, + text, + metadata=_status_thread_metadata, + ), + _loop_for_step, + logger=logger, + log_message="interim_assistant_callback scheduling error", + ) turn_route = self._resolve_turn_agent_config(message, model, runtime_kwargs) @@ -15125,17 +15129,16 @@ class GatewayRunner: def _deliver_bg_review_message(message: str) -> None: if not _status_adapter or not _run_still_current(): return - try: - asyncio.run_coroutine_threadsafe( - _status_adapter.send( - _status_chat_id, - message, - metadata=_status_thread_metadata, - ), - _loop_for_step, - ) - except Exception as _e: - logger.debug("background_review_callback error: %s", _e) + safe_schedule_threadsafe( + _status_adapter.send( + _status_chat_id, + message, + metadata=_status_thread_metadata, + ), + _loop_for_step, + logger=logger, + log_message="background_review_callback scheduling error", + ) def _release_bg_review_messages() -> None: _bg_review_release.set() @@ -15207,23 +15210,28 @@ class GatewayRunner: pass send_ok = False - try: - fut = asyncio.run_coroutine_threadsafe( - _status_adapter.send_clarify( - chat_id=_status_chat_id, - question=question, - choices=list(choices) if choices else None, - clarify_id=clarify_id, - session_key=session_key or "", - metadata=_status_thread_metadata, - ), - _loop_for_step, - ) - result = fut.result(timeout=15) - send_ok = bool(getattr(result, "success", False)) - except Exception as exc: - logger.warning("Clarify send failed: %s", exc) + fut = safe_schedule_threadsafe( + _status_adapter.send_clarify( + chat_id=_status_chat_id, + question=question, + choices=list(choices) if choices else None, + clarify_id=clarify_id, + session_key=session_key or "", + metadata=_status_thread_metadata, + ), + _loop_for_step, + logger=logger, + log_message="Clarify send failed to schedule", + ) + if fut is None: send_ok = False + else: + try: + result = fut.result(timeout=15) + send_ok = bool(getattr(result, "success", False)) + except Exception as exc: + logger.warning("Clarify send failed: %s", exc) + send_ok = False if not send_ok: # Couldn't deliver the prompt — clean up and return @@ -15343,7 +15351,7 @@ class GatewayRunner: # false positives from MagicMock auto-attribute creation in tests. if getattr(type(_status_adapter), "send_exec_approval", None) is not None: try: - _approval_result = asyncio.run_coroutine_threadsafe( + _approval_fut = safe_schedule_threadsafe( _status_adapter.send_exec_approval( chat_id=_status_chat_id, command=cmd, @@ -15352,7 +15360,12 @@ class GatewayRunner: metadata=_status_thread_metadata, ), _loop_for_step, - ).result(timeout=15) + logger=logger, + log_message="send_exec_approval scheduling error", + ) + if _approval_fut is None: + raise RuntimeError("send_exec_approval: loop unavailable") + _approval_result = _approval_fut.result(timeout=15) if _approval_result.success: return logger.warning( @@ -15374,14 +15387,18 @@ class GatewayRunner: f"for the session, `/approve always` to approve permanently, or `/deny` to cancel." ) try: - asyncio.run_coroutine_threadsafe( + _approval_send_fut = safe_schedule_threadsafe( _status_adapter.send( _status_chat_id, msg, metadata=_status_thread_metadata, ), _loop_for_step, - ).result(timeout=15) + logger=logger, + log_message="Approval text-send scheduling error", + ) + if _approval_send_fut is not None: + _approval_send_fut.result(timeout=15) except Exception as _e: logger.error("Failed to send approval request: %s", _e) @@ -16343,7 +16360,11 @@ class GatewayRunner: except Exception: pass try: - asyncio.run_coroutine_threadsafe(_delete_all(), _loop_snapshot) + safe_schedule_threadsafe( + _delete_all(), _loop_snapshot, + logger=logger, + log_message="Temp bubble cleanup scheduling error", + ) except Exception: pass @@ -16400,10 +16421,13 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in # this ticker runs in a background thread. Schedule onto # the gateway event loop and wait briefly for completion # so refresh failures are still logged via the except. - fut = asyncio.run_coroutine_threadsafe( - build_channel_directory(adapters), loop + fut = safe_schedule_threadsafe( + build_channel_directory(adapters), loop, + logger=logger, + log_message="Channel directory refresh scheduling error", ) - fut.result(timeout=30) + if fut is not None: + fut.result(timeout=30) except Exception as e: logger.debug("Channel directory refresh error: %s", e) diff --git a/plugins/memory/hindsight/__init__.py b/plugins/memory/hindsight/__init__.py index 3a42a320453..52b1ac247f1 100644 --- a/plugins/memory/hindsight/__init__.py +++ b/plugins/memory/hindsight/__init__.py @@ -221,8 +221,11 @@ def _get_loop() -> asyncio.AbstractEventLoop: def _run_sync(coro, timeout: float = _DEFAULT_TIMEOUT): """Schedule *coro* on the shared loop and block until done.""" + from agent.async_utils import safe_schedule_threadsafe loop = _get_loop() - future = asyncio.run_coroutine_threadsafe(coro, loop) + future = safe_schedule_threadsafe(coro, loop) + if future is None: + raise RuntimeError("Hindsight loop unavailable") return future.result(timeout=timeout) diff --git a/plugins/platforms/google_chat/adapter.py b/plugins/platforms/google_chat/adapter.py index 1d58e801f46..d8777bf7101 100644 --- a/plugins/platforms/google_chat/adapter.py +++ b/plugins/platforms/google_chat/adapter.py @@ -670,10 +670,18 @@ class GoogleChatAdapter(BasePlatformAdapter): logger.warning("[GoogleChat] Loop not accepting callbacks; dropping event") return try: - future = asyncio.run_coroutine_threadsafe(coro, loop) + from agent.async_utils import safe_schedule_threadsafe + future = safe_schedule_threadsafe( + coro, loop, + logger=logger, + log_message="[GoogleChat] Failed to schedule background callback", + log_level=logging.WARNING, + ) except RuntimeError: logger.warning("[GoogleChat] Loop closed between check and submit") return + if future is None: + return future.add_done_callback(self._log_background_failure) # ------------------------------------------------------------------ diff --git a/scripts/release.py b/scripts/release.py index 740b79091b1..c9cd9c173c0 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -62,6 +62,7 @@ AUTHOR_MAP = { "nidhi2894@gmail.com": "nidhi-singh02", "30312689+aashizpoudel@users.noreply.github.com": "aashizpoudel", "oleksii.lisikh@gmail.com": "olisikh", + "jithendranaidunara@gmail.com": "JithendraNara", "jeremy@geocaching.com": "outdoorsea", "leone.parise@gmail.com": "leoneparise", "mr@shu.io": "mrshu", diff --git a/tests/acp/test_events.py b/tests/acp/test_events.py index c9f91a181ed..56a2687226c 100644 --- a/tests/acp/test_events.py +++ b/tests/acp/test_events.py @@ -1,6 +1,8 @@ """Tests for acp_adapter.events — callback factories for ACP notifications.""" import asyncio +import gc +import warnings from concurrent.futures import Future from unittest.mock import AsyncMock, MagicMock, patch @@ -10,6 +12,7 @@ import acp from acp.schema import ToolCallStart, ToolCallProgress, AgentThoughtChunk, AgentMessageChunk from acp_adapter.events import ( + _send_update, make_message_cb, make_step_cb, make_thinking_cb, @@ -325,3 +328,46 @@ class TestMessageCallback: cb("") mock_rcts.assert_not_called() + + +# --------------------------------------------------------------------------- +# Scheduler-failure regression +# --------------------------------------------------------------------------- + +class TestSendUpdate: + def test_scheduler_failure_closes_update_coroutine(self, event_loop_fixture): + """If run_coroutine_threadsafe raises, _send_update must close the coro.""" + created = {"coro": None} + + async def _session_update(session_id, update): + return None + + conn = MagicMock() + + def _capture_update(session_id, update): + created["coro"] = _session_update(session_id, update) + return created["coro"] + + conn.session_update = _capture_update + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + with patch( + "agent.async_utils.asyncio.run_coroutine_threadsafe", + side_effect=RuntimeError("scheduler down"), + ): + _send_update(conn, "session-1", event_loop_fixture, {"type": "noop"}) + gc.collect() + + assert created["coro"] is not None + assert created["coro"].cr_frame is None + # Only count warnings about THIS test's coroutine; other tests in the + # same xdist worker (or stdlib mock internals) may emit unrelated + # "coroutine was never awaited" warnings that bleed through. + runtime_warnings = [ + w for w in caught + if issubclass(w.category, RuntimeWarning) + and "was never awaited" in str(w.message) + and "_session_update" in str(w.message) + ] + assert runtime_warnings == [] diff --git a/tests/acp/test_permissions.py b/tests/acp/test_permissions.py index 8bbdeeb392a..b4c121829dc 100644 --- a/tests/acp/test_permissions.py +++ b/tests/acp/test_permissions.py @@ -38,7 +38,7 @@ def _invoke_callback( scheduled["loop"] = passed_loop return future - with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", side_effect=_schedule): + with patch("agent.async_utils.asyncio.run_coroutine_threadsafe", side_effect=_schedule): cb = make_approval_callback(request_permission, loop, session_id="s1", timeout=timeout) if use_prompt_path: result = prompt_dangerous_approval( @@ -135,7 +135,7 @@ class TestApprovalBridge: scheduled["loop"] = passed_loop return future - with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", side_effect=_schedule): + with patch("agent.async_utils.asyncio.run_coroutine_threadsafe", side_effect=_schedule): cb = make_approval_callback(request_permission, loop, session_id="s1", timeout=0.01) result = cb("rm -rf /", "dangerous command") @@ -159,10 +159,53 @@ class TestApprovalBridge: scheduled["loop"] = passed_loop return future - with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", side_effect=_schedule): + with patch("agent.async_utils.asyncio.run_coroutine_threadsafe", side_effect=_schedule): cb = make_approval_callback(request_permission, loop, session_id="s1", timeout=1.0) result = cb("echo hi", "demo") scheduled["coro"].close() assert result == "deny" + + +# --------------------------------------------------------------------------- +# Scheduler-failure regression +# --------------------------------------------------------------------------- + +import gc # noqa: E402 +import warnings # noqa: E402 + + +class TestSchedulerFailure: + def test_scheduler_failure_closes_permission_coroutine(self): + """If run_coroutine_threadsafe raises, the coro is closed and we return 'deny'.""" + loop = MagicMock(spec=asyncio.AbstractEventLoop) + created = {"coro": None} + + async def _response_coro(**kwargs): + return _make_response(AllowedOutcome(option_id="allow_once", outcome="selected")) + + def _request_permission(**kwargs): + created["coro"] = _response_coro(**kwargs) + return created["coro"] + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + with patch( + "agent.async_utils.asyncio.run_coroutine_threadsafe", + side_effect=RuntimeError("scheduler down"), + ): + cb = make_approval_callback(_request_permission, loop, session_id="s1", timeout=0.01) + result = cb("rm -rf /", "dangerous") + gc.collect() + + assert result == "deny" + assert created["coro"] is not None + assert created["coro"].cr_frame is None + runtime_warnings = [ + w for w in caught + if issubclass(w.category, RuntimeWarning) + and "was never awaited" in str(w.message) + and "_response_coro" in str(w.message) + ] + assert runtime_warnings == [] diff --git a/tests/agent/test_async_utils.py b/tests/agent/test_async_utils.py new file mode 100644 index 00000000000..33ce84ee0c6 --- /dev/null +++ b/tests/agent/test_async_utils.py @@ -0,0 +1,157 @@ +"""Tests for agent.async_utils.safe_schedule_threadsafe.""" + +from __future__ import annotations + +import asyncio +import gc +import warnings +from concurrent.futures import Future +from unittest.mock import patch + +import pytest + +from agent.async_utils import safe_schedule_threadsafe + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _no_unawaited_warnings(caught, *, coro_name: str = "") -> bool: + """Return True if no "X was never awaited" warning slipped through. + + When *coro_name* is provided, only warnings naming that coroutine are + counted — xdist workers may emit unrelated unawaited-coroutine warnings + (e.g. ``AsyncMockMixin._execute_mock_call``) from concurrent tests. + """ + bad = [ + w for w in caught + if issubclass(w.category, RuntimeWarning) + and "was never awaited" in str(w.message) + and (not coro_name or coro_name in str(w.message)) + ] + return not bad + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestSafeScheduleThreadsafe: + def test_returns_future_on_success(self): + loop = asyncio.new_event_loop() + try: + import threading + ready = threading.Event() + stop = threading.Event() + + def _runner(): + asyncio.set_event_loop(loop) + ready.set() + loop.run_until_complete(_wait_for_stop(stop)) + + async def _wait_for_stop(ev): + while not ev.is_set(): + await asyncio.sleep(0.005) + + t = threading.Thread(target=_runner, daemon=True) + t.start() + ready.wait(timeout=2) + + async def _sample(): + return 42 + + fut = safe_schedule_threadsafe(_sample(), loop) + assert isinstance(fut, Future) + assert fut.result(timeout=2) == 42 + + stop.set() + t.join(timeout=2) + finally: + if loop.is_running(): + loop.call_soon_threadsafe(loop.stop) + loop.close() + + def test_closed_loop_returns_none_and_closes_coroutine(self): + loop = asyncio.new_event_loop() + loop.close() + + async def _sample(): + return "ok" + + coro = _sample() + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + result = safe_schedule_threadsafe(coro, loop) + del coro + gc.collect() + + assert result is None + assert _no_unawaited_warnings(caught, coro_name='_sample') + + def test_none_loop_returns_none_and_closes_coroutine(self): + async def _sample(): + return "ok" + + coro = _sample() + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + result = safe_schedule_threadsafe(coro, None) + del coro + gc.collect() + + assert result is None + assert _no_unawaited_warnings(caught, coro_name='_sample') + + def test_scheduling_exception_closes_coroutine(self): + """If run_coroutine_threadsafe raises, close the coroutine and return None.""" + # A loop that *looks* open but raises on submission + loop = asyncio.new_event_loop() + try: + async def _sample(): + return "ok" + + coro = _sample() + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + with patch( + "agent.async_utils.asyncio.run_coroutine_threadsafe", + side_effect=RuntimeError("scheduler down"), + ): + result = safe_schedule_threadsafe(coro, loop) + del coro + gc.collect() + + assert result is None + assert _no_unawaited_warnings(caught, coro_name='_sample') + finally: + loop.close() + + def test_logs_at_specified_level(self, caplog): + import logging + loop = asyncio.new_event_loop() + loop.close() + + async def _sample(): + return None + + custom = logging.getLogger("test_async_utils") + with caplog.at_level(logging.WARNING, logger="test_async_utils"): + result = safe_schedule_threadsafe( + _sample(), loop, + logger=custom, + log_message="custom-msg", + log_level=logging.WARNING, + ) + + assert result is None + assert any("custom-msg" in rec.message for rec in caplog.records) + + def test_non_coroutine_arg_does_not_crash(self): + """Defensive: even if the caller hands us something weird, don't blow up.""" + loop = asyncio.new_event_loop() + loop.close() + + # Pass a non-coroutine sentinel + result = safe_schedule_threadsafe("not-a-coroutine", loop) # type: ignore[arg-type] + assert result is None diff --git a/tests/tools/test_mcp_probe.py b/tests/tools/test_mcp_probe.py index 46459e44c87..89d4d1478d1 100644 --- a/tests/tools/test_mcp_probe.py +++ b/tests/tools/test_mcp_probe.py @@ -69,7 +69,8 @@ class TestProbeMcpServerTools: patch("tools.mcp_tool._stop_mcp_loop"): # Simulate running the async probe - def run_coro(coro, timeout=120): + def run_coro(coro_or_factory, timeout=120): + coro = coro_or_factory() if callable(coro_or_factory) else coro_or_factory loop = asyncio.new_event_loop() try: return loop.run_until_complete(coro) @@ -110,7 +111,8 @@ class TestProbeMcpServerTools: patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \ patch("tools.mcp_tool._stop_mcp_loop"): - def run_coro(coro, timeout=120): + def run_coro(coro_or_factory, timeout=120): + coro = coro_or_factory() if callable(coro_or_factory) else coro_or_factory loop = asyncio.new_event_loop() try: return loop.run_until_complete(coro) @@ -144,7 +146,8 @@ class TestProbeMcpServerTools: patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \ patch("tools.mcp_tool._stop_mcp_loop"): - def run_coro(coro, timeout=120): + def run_coro(coro_or_factory, timeout=120): + coro = coro_or_factory() if callable(coro_or_factory) else coro_or_factory loop = asyncio.new_event_loop() try: return loop.run_until_complete(coro) @@ -198,7 +201,8 @@ class TestProbeMcpServerTools: patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \ patch("tools.mcp_tool._stop_mcp_loop"): - def run_coro(coro, timeout=120): + def run_coro(coro_or_factory, timeout=120): + coro = coro_or_factory() if callable(coro_or_factory) else coro_or_factory loop = asyncio.new_event_loop() try: return loop.run_until_complete(coro) diff --git a/tests/tools/test_mcp_structured_content.py b/tests/tools/test_mcp_structured_content.py index 2870ce1e860..f4cda00f9f0 100644 --- a/tests/tools/test_mcp_structured_content.py +++ b/tests/tools/test_mcp_structured_content.py @@ -31,7 +31,8 @@ class _FakeCallToolResult: self.structuredContent = structuredContent -def _fake_run_on_mcp_loop(coro, timeout=30): +def _fake_run_on_mcp_loop(coro_or_factory, timeout=30): + coro = coro_or_factory() if callable(coro_or_factory) else coro_or_factory """Run an MCP coroutine directly in a fresh event loop.""" loop = asyncio.new_event_loop() try: diff --git a/tests/tools/test_mcp_tool.py b/tests/tools/test_mcp_tool.py index 5558a0df48c..7f6c3f6704c 100644 --- a/tests/tools/test_mcp_tool.py +++ b/tests/tools/test_mcp_tool.py @@ -397,6 +397,77 @@ class TestCheckFunction: _servers.pop("test_server", None) +# --------------------------------------------------------------------------- +# MCP loop runner +# --------------------------------------------------------------------------- + +class TestRunOnMcpLoop: + def test_scheduler_failure_closes_factory_coroutine(self): + """If run_coroutine_threadsafe raises, the factory's coroutine is closed.""" + import gc + import warnings + import tools.mcp_tool as mcp + + created = {"coro": None} + + async def _sample(): + return "ok" + + def factory(): + created["coro"] = _sample() + return created["coro"] + + fake_loop = MagicMock() + fake_loop.is_running.return_value = True + + with patch.object(mcp, "_mcp_loop", fake_loop): + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + with patch( + "agent.async_utils.asyncio.run_coroutine_threadsafe", + side_effect=RuntimeError("scheduler down"), + ): + with pytest.raises(RuntimeError): + mcp._run_on_mcp_loop(factory) + gc.collect() + + assert created["coro"] is not None + assert created["coro"].cr_frame is None + runtime_warnings = [ + w for w in caught + if issubclass(w.category, RuntimeWarning) + and "was never awaited" in str(w.message) + and "_sample" in str(w.message) + ] + assert runtime_warnings == [] + + def test_dead_loop_closes_passed_coroutine(self): + """If loop is None, a passed coroutine (not factory) is closed.""" + import gc + import warnings + import tools.mcp_tool as mcp + + async def _sample(): + return "ok" + + coro = _sample() + with patch.object(mcp, "_mcp_loop", None): + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + with pytest.raises(RuntimeError, match="not running"): + mcp._run_on_mcp_loop(coro) + gc.collect() + + assert coro.cr_frame is None + runtime_warnings = [ + w for w in caught + if issubclass(w.category, RuntimeWarning) + and "was never awaited" in str(w.message) + and "_sample" in str(w.message) + ] + assert runtime_warnings == [] + + # --------------------------------------------------------------------------- # Tool handler # --------------------------------------------------------------------------- @@ -406,7 +477,8 @@ class TestToolHandler: def _patch_mcp_loop(self, coro_side_effect=None): """Return a patch for _run_on_mcp_loop that runs the coroutine directly.""" - def fake_run(coro, timeout=30): + def fake_run(coro_or_factory, timeout=30): + coro = coro_or_factory() if callable(coro_or_factory) else coro_or_factory return asyncio.run(coro) if coro_side_effect: return patch("tools.mcp_tool._run_on_mcp_loop", side_effect=coro_side_effect) @@ -485,7 +557,8 @@ class TestToolHandler: try: handler = _make_tool_handler("test_srv", "greet", 120) - def _interrupting_run(coro, timeout=30): + def _interrupting_run(coro_or_factory, timeout=30): + coro = coro_or_factory() if callable(coro_or_factory) else coro_or_factory coro.close() raise InterruptedError("User sent a new message") with patch( @@ -1792,7 +1865,8 @@ class TestUtilityHandlers: def _patch_mcp_loop(self): """Return a patch for _run_on_mcp_loop that runs the coroutine directly.""" - def fake_run(coro, timeout=30): + def fake_run(coro_or_factory, timeout=30): + coro = coro_or_factory() if callable(coro_or_factory) else coro_or_factory return asyncio.run(coro) return patch("tools.mcp_tool._run_on_mcp_loop", side_effect=fake_run) diff --git a/tools/browser_cdp_tool.py b/tools/browser_cdp_tool.py index 8e829556a57..f10a1541923 100644 --- a/tools/browser_cdp_tool.py +++ b/tools/browser_cdp_tool.py @@ -274,7 +274,13 @@ def _browser_cdp_via_supervisor( ) try: - fut = _asyncio.run_coroutine_threadsafe(_do_cdp(), loop) + from agent.async_utils import safe_schedule_threadsafe + fut = safe_schedule_threadsafe(_do_cdp(), loop) + if fut is None: + return tool_error( + "CDP call via supervisor failed: loop unavailable", + cdp_docs=CDP_DOCS_URL, + ) result_msg = fut.result(timeout=timeout + 2) except Exception as exc: return tool_error( diff --git a/tools/browser_supervisor.py b/tools/browser_supervisor.py index af8d40ee185..73dd3e51bb5 100644 --- a/tools/browser_supervisor.py +++ b/tools/browser_supervisor.py @@ -368,11 +368,13 @@ class CDPSupervisor: pass try: - fut = asyncio.run_coroutine_threadsafe(_close_ws(), loop) - try: - fut.result(timeout=2.0) - except Exception: - pass + from agent.async_utils import safe_schedule_threadsafe + fut = safe_schedule_threadsafe(_close_ws(), loop) + if fut is not None: + try: + fut.result(timeout=2.0) + except Exception: + pass except RuntimeError: pass # loop already shutting down if self._thread is not None: @@ -451,7 +453,10 @@ class CDPSupervisor: ) try: - fut = asyncio.run_coroutine_threadsafe(_do_respond(), loop) + from agent.async_utils import safe_schedule_threadsafe + fut = safe_schedule_threadsafe(_do_respond(), loop) + if fut is None: + return {"ok": False, "error": "Browser supervisor loop unavailable"} fut.result(timeout=timeout) except Exception as e: return {"ok": False, "error": f"{type(e).__name__}: {e}"} @@ -507,7 +512,10 @@ class CDPSupervisor: ) try: - fut = asyncio.run_coroutine_threadsafe(_do_eval(), loop) + from agent.async_utils import safe_schedule_threadsafe + fut = safe_schedule_threadsafe(_do_eval(), loop) + if fut is None: + return {"ok": False, "error": "Browser supervisor loop unavailable"} response = fut.result(timeout=timeout + 1) except Exception as exc: return {"ok": False, "error": f"{type(exc).__name__}: {exc}"} diff --git a/tools/computer_use/cua_backend.py b/tools/computer_use/cua_backend.py index df1162c5d79..96aab60f8c7 100644 --- a/tools/computer_use/cua_backend.py +++ b/tools/computer_use/cua_backend.py @@ -183,9 +183,14 @@ class _AsyncBridge: raise RuntimeError("cua-driver asyncio bridge failed to start") def run(self, coro, timeout: Optional[float] = 30.0) -> Any: + from agent.async_utils import safe_schedule_threadsafe if not self._loop or not self._thread or not self._thread.is_alive(): + if asyncio.iscoroutine(coro): + coro.close() + raise RuntimeError("cua-driver bridge not started") + fut = safe_schedule_threadsafe(coro, self._loop) + if fut is None: raise RuntimeError("cua-driver bridge not started") - fut: Future = asyncio.run_coroutine_threadsafe(coro, self._loop) return fut.result(timeout=timeout) def stop(self) -> None: diff --git a/tools/environments/modal.py b/tools/environments/modal.py index 1a230d85603..3137b322113 100644 --- a/tools/environments/modal.py +++ b/tools/environments/modal.py @@ -144,9 +144,14 @@ class _AsyncWorker: self._loop.run_forever() def run_coroutine(self, coro, timeout=600): + from agent.async_utils import safe_schedule_threadsafe if self._loop is None or self._loop.is_closed(): + if asyncio.iscoroutine(coro): + coro.close() + raise RuntimeError("AsyncWorker loop is not running") + future = safe_schedule_threadsafe(coro, self._loop) + if future is None: raise RuntimeError("AsyncWorker loop is not running") - future = asyncio.run_coroutine_threadsafe(coro, self._loop) return future.result(timeout=timeout) def stop(self): diff --git a/tools/mcp_tool.py b/tools/mcp_tool.py index c2668395e5d..ba104cc4273 100644 --- a/tools/mcp_tool.py +++ b/tools/mcp_tool.py @@ -1781,7 +1781,7 @@ def _handle_auth_error_and_retry( return await manager.handle_401(server_name, None) try: - recovered = _run_on_mcp_loop(_recover(), timeout=10) + recovered = _run_on_mcp_loop(_recover, timeout=10) except Exception as rec_exc: logger.warning( "MCP OAuth '%s': recovery attempt failed: %s", @@ -2054,19 +2054,35 @@ def _ensure_mcp_loop(): _mcp_thread.start() -def _run_on_mcp_loop(coro, timeout: float = 30): +def _run_on_mcp_loop(coro_or_factory, timeout: float = 30): """Schedule a coroutine on the MCP event loop and block until done. + Accepts either a coroutine object or a zero-arg callable that returns one. + Callers can pass a factory to avoid constructing coroutine objects when + the MCP loop is unavailable (which would otherwise leak the coroutine + frame and emit ``"coroutine was never awaited"`` warnings). + Poll in short intervals so the calling agent thread can honor user interrupts while the MCP work is still running on the background loop. """ from tools.interrupt import is_interrupted + from agent.async_utils import safe_schedule_threadsafe with _lock: loop = _mcp_loop if loop is None or not loop.is_running(): + if asyncio.iscoroutine(coro_or_factory): + coro_or_factory.close() raise RuntimeError("MCP event loop is not running") - future = asyncio.run_coroutine_threadsafe(coro, loop) + + coro = coro_or_factory() if callable(coro_or_factory) else coro_or_factory + future = safe_schedule_threadsafe( + coro, loop, + logger=logger, + log_message="MCP scheduling failed", + ) + if future is None: + raise RuntimeError("MCP event loop unavailable (failed to schedule)") start_time = time.monotonic() deadline = None if timeout is None else start_time + timeout @@ -2263,7 +2279,7 @@ def _make_tool_handler(server_name: str, tool_name: str, tool_timeout: float): return json.dumps({"result": text_result}, ensure_ascii=False) def _call_once(): - return _run_on_mcp_loop(_call(), timeout=tool_timeout) + return _run_on_mcp_loop(_call, timeout=tool_timeout) try: result = _call_once() @@ -2343,7 +2359,7 @@ def _make_list_resources_handler(server_name: str, tool_timeout: float): return json.dumps({"resources": resources}, ensure_ascii=False) def _call_once(): - return _run_on_mcp_loop(_call(), timeout=tool_timeout) + return _run_on_mcp_loop(_call, timeout=tool_timeout) try: return _call_once() @@ -2403,7 +2419,7 @@ def _make_read_resource_handler(server_name: str, tool_timeout: float): return json.dumps({"result": "\n".join(parts) if parts else ""}, ensure_ascii=False) def _call_once(): - return _run_on_mcp_loop(_call(), timeout=tool_timeout) + return _run_on_mcp_loop(_call, timeout=tool_timeout) try: return _call_once() @@ -2466,7 +2482,7 @@ def _make_list_prompts_handler(server_name: str, tool_timeout: float): return json.dumps({"prompts": prompts}, ensure_ascii=False) def _call_once(): - return _run_on_mcp_loop(_call(), timeout=tool_timeout) + return _run_on_mcp_loop(_call, timeout=tool_timeout) try: return _call_once() @@ -2537,7 +2553,7 @@ def _make_get_prompt_handler(server_name: str, tool_timeout: float): return json.dumps(resp, ensure_ascii=False) def _call_once(): - return _run_on_mcp_loop(_call(), timeout=tool_timeout) + return _run_on_mcp_loop(_call, timeout=tool_timeout) try: return _call_once() @@ -3121,7 +3137,7 @@ def register_mcp_servers(servers: Dict[str, dict]) -> List[str]: if _was_interrupted: _set_interrupt(False) try: - _run_on_mcp_loop(_discover_all(), timeout=120) + _run_on_mcp_loop(_discover_all, timeout=120) finally: if _was_interrupted: _set_interrupt(True) @@ -3289,7 +3305,7 @@ def probe_mcp_server_tools() -> Dict[str, List[tuple]]: ) try: - _run_on_mcp_loop(_probe_all(), timeout=120) + _run_on_mcp_loop(_probe_all, timeout=120) except Exception as exc: logger.debug("MCP probe failed: %s", exc) finally: @@ -3329,11 +3345,17 @@ def shutdown_mcp_servers(): with _lock: loop = _mcp_loop if loop is not None and loop.is_running(): - try: - future = asyncio.run_coroutine_threadsafe(_shutdown(), loop) - future.result(timeout=15) - except Exception as exc: - logger.debug("Error during MCP shutdown: %s", exc) + from agent.async_utils import safe_schedule_threadsafe + future = safe_schedule_threadsafe( + _shutdown(), loop, + logger=logger, + log_message="MCP shutdown: failed to schedule", + ) + if future is not None: + try: + future.result(timeout=15) + except Exception as exc: + logger.debug("Error during MCP shutdown: %s", exc) _stop_mcp_loop() diff --git a/tools/slash_confirm.py b/tools/slash_confirm.py index 81c15263527..21db18fe319 100644 --- a/tools/slash_confirm.py +++ b/tools/slash_confirm.py @@ -153,9 +153,14 @@ def resolve_sync_compat( Prefer the async ``resolve()`` from an async context. """ try: - fut = asyncio.run_coroutine_threadsafe( + from agent.async_utils import safe_schedule_threadsafe + fut = safe_schedule_threadsafe( resolve(session_key, confirm_id, choice), loop, + logger=logger, + log_message="resolve_sync_compat scheduling failed", ) + if fut is None: + return None return fut.result(timeout=30) except Exception as exc: logger.error("resolve_sync_compat failed: %s", exc) diff --git a/tui_gateway/ws.py b/tui_gateway/ws.py index 1661811dbd6..a5879ef3a1c 100644 --- a/tui_gateway/ws.py +++ b/tui_gateway/ws.py @@ -83,7 +83,11 @@ class WSTransport: return True try: - fut = asyncio.run_coroutine_threadsafe(self._safe_send(line), self._loop) + from agent.async_utils import safe_schedule_threadsafe + fut = safe_schedule_threadsafe(self._safe_send(line), self._loop) + if fut is None: + self._closed = True + return False fut.result(timeout=_WS_WRITE_TIMEOUT_S) return not self._closed except Exception as exc: