fix: use per-thread persistent event loops in worker threads

Replace asyncio.run() with thread-local persistent event loops for
worker threads (e.g., delegate_task's ThreadPoolExecutor). asyncio.run()
creates and closes a fresh loop on every call, leaving cached
httpx/AsyncOpenAI clients bound to a dead loop — causing 'Event loop is
closed' errors during GC when parallel subagents clean up connections.

The fix mirrors the main thread's _get_tool_loop() pattern but uses
threading.local() so each worker thread gets its own long-lived loop,
avoiding both cross-thread contention and the create-destroy lifecycle.

Added 4 regression tests covering worker loop persistence, reuse,
per-thread isolation, and separation from the main thread's loop.
This commit is contained in:
emozilla 2026-03-20 15:41:06 -04:00
parent aafe86d81a
commit ab6abc2c13
2 changed files with 130 additions and 7 deletions

View file

@ -39,6 +39,7 @@ logger = logging.getLogger(__name__)
_tool_loop = None # persistent loop for the main (CLI) thread _tool_loop = None # persistent loop for the main (CLI) thread
_tool_loop_lock = threading.Lock() _tool_loop_lock = threading.Lock()
_worker_thread_local = threading.local() # per-worker-thread persistent loops
def _get_tool_loop(): def _get_tool_loop():
@ -56,6 +57,28 @@ def _get_tool_loop():
return _tool_loop return _tool_loop
def _get_worker_loop():
"""Return a persistent event loop for the current worker thread.
Each worker thread (e.g., delegate_task's ThreadPoolExecutor threads)
gets its own long-lived loop stored in thread-local storage. This
prevents the "Event loop is closed" errors that occurred when
asyncio.run() was used per-call: asyncio.run() creates a loop, runs
the coroutine, then *closes* the loop but cached httpx/AsyncOpenAI
clients remain bound to that now-dead loop and raise RuntimeError
during garbage collection or subsequent use.
By keeping the loop alive for the thread's lifetime, cached clients
stay valid and their cleanup runs on a live loop.
"""
loop = getattr(_worker_thread_local, 'loop', None)
if loop is None or loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
_worker_thread_local.loop = loop
return loop
def _run_async(coro): def _run_async(coro):
"""Run an async coroutine from a sync context. """Run an async coroutine from a sync context.
@ -68,9 +91,10 @@ def _run_async(coro):
loop so that cached async clients (httpx / AsyncOpenAI) remain bound loop so that cached async clients (httpx / AsyncOpenAI) remain bound
to a live loop and don't trigger "Event loop is closed" on GC. to a live loop and don't trigger "Event loop is closed" on GC.
When called from a worker thread (parallel tool execution), we detect When called from a worker thread (parallel tool execution), we use a
that we're NOT on the main thread and use asyncio.run() with a fresh per-thread persistent loop to avoid both contention with the main
loop to avoid contention on the shared persistent loop. thread's shared loop AND the "Event loop is closed" errors caused by
asyncio.run()'s create-and-destroy lifecycle.
This is the single source of truth for sync->async bridging in tool This is the single source of truth for sync->async bridging in tool
handlers. The RL paths (agent_loop.py, tool_context.py) also provide handlers. The RL paths (agent_loop.py, tool_context.py) also provide
@ -89,11 +113,14 @@ def _run_async(coro):
future = pool.submit(asyncio.run, coro) future = pool.submit(asyncio.run, coro)
return future.result(timeout=300) return future.result(timeout=300)
# If we're on a worker thread (e.g., parallel tool execution), # If we're on a worker thread (e.g., parallel tool execution in
# use asyncio.run() with its own loop to avoid contending with the # delegate_task), use a per-thread persistent loop. This avoids
# shared persistent loop from another parallel worker. # contention with the main thread's shared loop while keeping cached
# httpx/AsyncOpenAI clients bound to a live loop for the thread's
# lifetime — preventing "Event loop is closed" on GC cleanup.
if threading.current_thread() is not threading.main_thread(): if threading.current_thread() is not threading.main_thread():
return asyncio.run(coro) worker_loop = _get_worker_loop()
return worker_loop.run_until_complete(coro)
tool_loop = _get_tool_loop() tool_loop = _get_tool_loop()
return tool_loop.run_until_complete(coro) return tool_loop.run_until_complete(coro)

View file

@ -84,6 +84,102 @@ class TestRunAsyncLoopLifecycle:
assert not loop.is_closed(), "Loop closed before second call" assert not loop.is_closed(), "Loop closed before second call"
class TestRunAsyncWorkerThread:
"""Verify worker threads get persistent per-thread loops (delegate_task fix)."""
def test_worker_thread_loop_not_closed(self):
"""A worker thread's loop must stay open after _run_async returns,
so cached httpx/AsyncOpenAI clients don't crash on GC."""
from concurrent.futures import ThreadPoolExecutor
from model_tools import _run_async
def _run_on_worker():
loop = _run_async(_get_current_loop())
still_open = not loop.is_closed()
return loop, still_open
with ThreadPoolExecutor(max_workers=1) as pool:
loop, still_open = pool.submit(_run_on_worker).result()
assert still_open, (
"Worker thread's event loop was closed after _run_async — "
"cached async clients will crash with 'Event loop is closed'"
)
def test_worker_thread_reuses_loop_across_calls(self):
"""Multiple _run_async calls on the same worker thread should
reuse the same persistent loop (not create-and-destroy each time)."""
from concurrent.futures import ThreadPoolExecutor
from model_tools import _run_async
def _run_twice_on_worker():
loop1 = _run_async(_get_current_loop())
loop2 = _run_async(_get_current_loop())
return loop1, loop2
with ThreadPoolExecutor(max_workers=1) as pool:
loop1, loop2 = pool.submit(_run_twice_on_worker).result()
assert loop1 is loop2, (
"Worker thread created different loops for consecutive calls — "
"cached clients from the first call would be orphaned"
)
assert not loop1.is_closed()
def test_parallel_workers_get_separate_loops(self):
"""Different worker threads must get their own loops to avoid
contention (the original reason for the worker-thread branch)."""
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from model_tools import _run_async
barrier = threading.Barrier(3, timeout=5)
def _get_loop_id():
# Use a barrier to force all 3 threads to be alive simultaneously,
# ensuring the ThreadPoolExecutor actually uses 3 distinct threads.
loop = _run_async(_get_current_loop())
barrier.wait()
return id(loop), not loop.is_closed(), threading.current_thread().ident
with ThreadPoolExecutor(max_workers=3) as pool:
futures = [pool.submit(_get_loop_id) for _ in range(3)]
results = [f.result() for f in as_completed(futures)]
loop_ids = {r[0] for r in results}
thread_ids = {r[2] for r in results}
all_open = all(r[1] for r in results)
assert all_open, "At least one worker thread's loop was closed"
# The barrier guarantees 3 distinct threads were used
assert len(thread_ids) == 3, f"Expected 3 threads, got {len(thread_ids)}"
# Each thread should have its own loop
assert len(loop_ids) == 3, (
f"Expected 3 distinct loops for 3 parallel workers, "
f"got {len(loop_ids)} — workers may be contending on a shared loop"
)
def test_worker_loop_separate_from_main_loop(self):
"""Worker thread loops must be different from the main thread's
persistent loop to avoid cross-thread contention."""
from concurrent.futures import ThreadPoolExecutor
from model_tools import _run_async, _get_tool_loop
main_loop = _get_tool_loop()
def _get_worker_loop_id():
loop = _run_async(_get_current_loop())
return id(loop)
with ThreadPoolExecutor(max_workers=1) as pool:
worker_loop_id = pool.submit(_get_worker_loop_id).result()
assert worker_loop_id != id(main_loop), (
"Worker thread used the main thread's loop — this would cause "
"cross-thread contention on the event loop"
)
class TestRunAsyncWithRunningLoop: class TestRunAsyncWithRunningLoop:
"""When a loop is already running, _run_async falls back to a thread.""" """When a loop is already running, _run_async falls back to a thread."""