diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index a39e09be33..b6d6e110a7 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -1191,8 +1191,18 @@ def _get_cached_client( cache_key = (provider, async_mode, base_url or "", api_key or "") with _client_cache_lock: if cache_key in _client_cache: - cached_client, cached_default = _client_cache[cache_key] - return cached_client, model or cached_default + cached_client, cached_default, cached_loop = _client_cache[cache_key] + if async_mode: + # Async clients are bound to the event loop that created them. + # A cached async client whose loop has been closed will raise + # "Event loop is closed" when httpx tries to clean up its + # transport. Discard the stale client and create a fresh one. + if cached_loop is not None and cached_loop.is_closed(): + del _client_cache[cache_key] + else: + return cached_client, model or cached_default + else: + return cached_client, model or cached_default # Build outside the lock client, default_model = resolve_provider_client( provider, @@ -1202,11 +1212,20 @@ def _get_cached_client( explicit_api_key=api_key, ) if client is not None: + # For async clients, remember which loop they were created on so we + # can detect stale entries later. + bound_loop = None + if async_mode: + try: + import asyncio as _aio + bound_loop = _aio.get_event_loop() + except RuntimeError: + pass with _client_cache_lock: if cache_key not in _client_cache: - _client_cache[cache_key] = (client, default_model) + _client_cache[cache_key] = (client, default_model, bound_loop) else: - client, default_model = _client_cache[cache_key] + client, default_model, _ = _client_cache[cache_key] return client, model or default_model diff --git a/model_tools.py b/model_tools.py index 23849817a5..0c24aefdaa 100644 --- a/model_tools.py +++ b/model_tools.py @@ -24,6 +24,7 @@ import json import asyncio import os import logging +import threading from typing import Dict, Any, List, Optional, Tuple from tools.registry import registry @@ -36,6 +37,25 @@ logger = logging.getLogger(__name__) # Async Bridging (single source of truth -- used by registry.dispatch too) # ============================================================================= +_tool_loop = None # persistent loop for the main (CLI) thread +_tool_loop_lock = threading.Lock() + + +def _get_tool_loop(): + """Return a long-lived event loop for running async tool handlers. + + Using a persistent loop (instead of asyncio.run() which creates and + *closes* a fresh loop every time) prevents "Event loop is closed" + errors that occur when cached httpx/AsyncOpenAI clients attempt to + close their transport on a dead loop during garbage collection. + """ + global _tool_loop + with _tool_loop_lock: + if _tool_loop is None or _tool_loop.is_closed(): + _tool_loop = asyncio.new_event_loop() + return _tool_loop + + def _run_async(coro): """Run an async coroutine from a sync context. @@ -44,6 +64,10 @@ def _run_async(coro): disposable thread so asyncio.run() can create its own loop without conflicting. + For the common CLI path (no running loop), we use a persistent event + loop so that cached async clients (httpx / AsyncOpenAI) remain bound + to a live loop and don't trigger "Event loop is closed" on GC. + This is the single source of truth for sync->async bridging in tool handlers. The RL paths (agent_loop.py, tool_context.py) also provide outer thread-pool wrapping as defense-in-depth, but each handler is @@ -59,7 +83,9 @@ def _run_async(coro): with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: future = pool.submit(asyncio.run, coro) return future.result(timeout=300) - return asyncio.run(coro) + + tool_loop = _get_tool_loop() + return tool_loop.run_until_complete(coro) # ============================================================================= diff --git a/tests/test_model_tools_async_bridge.py b/tests/test_model_tools_async_bridge.py new file mode 100644 index 0000000000..6597ca8705 --- /dev/null +++ b/tests/test_model_tools_async_bridge.py @@ -0,0 +1,211 @@ +"""Regression tests for the _run_async() event-loop lifecycle. + +These tests verify the fix for GitHub issue #2104: + "Event loop is closed" after vision_analyze used as first call in session. + +Root cause: asyncio.run() creates and *closes* a fresh event loop on every +call. Cached httpx/AsyncOpenAI clients that were bound to the now-dead loop +would crash with RuntimeError("Event loop is closed") when garbage-collected. + +The fix replaces asyncio.run() with a persistent event loop in _run_async(). +""" + +import asyncio +import json +import threading +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +async def _get_current_loop(): + """Return the running event loop from inside a coroutine.""" + return asyncio.get_event_loop() + + +async def _create_and_return_transport(): + """Simulate an async client creating a transport on the current loop. + + Returns a simple asyncio.Future bound to the running loop so we can + later check whether the loop is still alive. + """ + loop = asyncio.get_event_loop() + fut = loop.create_future() + fut.set_result("ok") + return loop, fut + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestRunAsyncLoopLifecycle: + """Verify _run_async() keeps the event loop alive after returning.""" + + def test_loop_not_closed_after_run_async(self): + """The loop used by _run_async must still be open after the call.""" + from model_tools import _run_async + + loop = _run_async(_get_current_loop()) + + assert not loop.is_closed(), ( + "_run_async() closed the event loop — cached async clients will " + "crash with 'Event loop is closed' on GC (issue #2104)" + ) + + def test_same_loop_reused_across_calls(self): + """Consecutive _run_async calls should reuse the same loop.""" + from model_tools import _run_async + + loop1 = _run_async(_get_current_loop()) + loop2 = _run_async(_get_current_loop()) + + assert loop1 is loop2, ( + "_run_async() created a new loop on the second call — cached " + "async clients from the first call would be orphaned" + ) + + def test_cached_transport_survives_between_calls(self): + """A transport/future created in call 1 must be valid in call 2.""" + from model_tools import _run_async + + loop, fut = _run_async(_create_and_return_transport()) + + assert not loop.is_closed() + assert fut.result() == "ok" + + loop2 = _run_async(_get_current_loop()) + assert loop2 is loop, "Loop changed between calls" + assert not loop.is_closed(), "Loop closed before second call" + + +class TestRunAsyncWithRunningLoop: + """When a loop is already running, _run_async falls back to a thread.""" + + @pytest.mark.asyncio + async def test_run_async_from_async_context(self): + """_run_async should still work when called from inside an + already-running event loop (gateway / Atropos path).""" + from model_tools import _run_async + + async def _simple(): + return 42 + + result = await asyncio.get_event_loop().run_in_executor( + None, _run_async, _simple() + ) + assert result == 42 + + +# --------------------------------------------------------------------------- +# Integration: full vision_analyze dispatch chain +# --------------------------------------------------------------------------- + +def _mock_vision_response(): + """Build a fake LLM response matching async_call_llm's return shape.""" + message = SimpleNamespace(content="A cat sitting on a chair.") + choice = SimpleNamespace(index=0, message=message, finish_reason="stop") + return SimpleNamespace(choices=[choice], model="test/vision", usage=None) + + +class TestVisionDispatchLoopSafety: + """Simulate the full registry.dispatch('vision_analyze') chain and + verify the event loop stays alive afterwards — the exact scenario + from issue #2104.""" + + def test_vision_dispatch_keeps_loop_alive(self, tmp_path): + """After dispatching vision_analyze via the registry, the event + loop must remain open so cached async clients don't crash on GC.""" + from model_tools import _run_async, _get_tool_loop + from tools.registry import registry + + fake_response = _mock_vision_response() + + with ( + patch( + "tools.vision_tools.async_call_llm", + new_callable=AsyncMock, + return_value=fake_response, + ), + patch( + "tools.vision_tools._download_image", + new_callable=AsyncMock, + side_effect=lambda url, dest, **kw: _write_fake_image(dest), + ), + patch( + "tools.vision_tools._validate_image_url", + return_value=True, + ), + patch( + "tools.vision_tools._image_to_base64_data_url", + return_value="data:image/jpeg;base64,abc", + ), + ): + result_json = registry.dispatch( + "vision_analyze", + {"image_url": "https://example.com/cat.png", "question": "What is this?"}, + ) + + result = json.loads(result_json) + assert result.get("success") is True, f"dispatch failed: {result}" + assert "cat" in result.get("analysis", "").lower() + + loop = _get_tool_loop() + assert not loop.is_closed(), ( + "Event loop closed after vision_analyze dispatch — cached async " + "clients will crash with 'Event loop is closed' (issue #2104)" + ) + + def test_two_consecutive_vision_dispatches(self, tmp_path): + """Two back-to-back vision_analyze dispatches must both succeed + and share the same loop (simulates 'first call fails, second + works' from the issue report).""" + from model_tools import _get_tool_loop + from tools.registry import registry + + fake_response = _mock_vision_response() + + with ( + patch( + "tools.vision_tools.async_call_llm", + new_callable=AsyncMock, + return_value=fake_response, + ), + patch( + "tools.vision_tools._download_image", + new_callable=AsyncMock, + side_effect=lambda url, dest, **kw: _write_fake_image(dest), + ), + patch( + "tools.vision_tools._validate_image_url", + return_value=True, + ), + patch( + "tools.vision_tools._image_to_base64_data_url", + return_value="data:image/jpeg;base64,abc", + ), + ): + args = {"image_url": "https://example.com/cat.png", "question": "Describe"} + + r1 = json.loads(registry.dispatch("vision_analyze", args)) + loop_after_first = _get_tool_loop() + + r2 = json.loads(registry.dispatch("vision_analyze", args)) + loop_after_second = _get_tool_loop() + + assert r1.get("success") is True + assert r2.get("success") is True + assert loop_after_first is loop_after_second, "Loop changed between dispatches" + assert not loop_after_second.is_closed() + + +def _write_fake_image(dest): + """Write minimal bytes so vision_analyze_tool thinks download succeeded.""" + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(b"\xff\xd8\xff" + b"\x00" * 16) + return dest