From b0435cc1648bf6a89e81206db0512e897af0ad4e Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Wed, 29 Apr 2026 04:56:33 -0700 Subject: [PATCH] fix(model_tools): cancel coroutine on timeout so worker thread exits + log full traceback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _run_async() bridges sync tool handlers to async code. When the handler is invoked from inside a running event loop (gateway / nested async), it spawns a worker thread and blocks on future.result(timeout=300). Before this change, a coroutine that ran past 300s leaked its worker thread: - future.cancel() is a no-op on a running ThreadPoolExecutor future (cancel only works on not-yet-started work). - pool.shutdown(wait=False, cancel_futures=True) let the caller proceed but the worker kept running the coroutine until it returned on its own. Every tool timeout leaked one thread. In long-lived gateway / RL sessions this is cumulative. The fix replaces bare asyncio.run() with a worker wrapper that creates its own event loop. On timeout, _run_async schedules task.cancel() on that loop via call_soon_threadsafe, then shuts the pool down with wait=False so the caller returns immediately. The coroutine observes CancelledError at its next await and the worker thread exits cleanly. Also switches logger.error() to logger.exception() in the top-level handle_function_call() except block so tool failures produce full stack traces in errors.log instead of just the message. Related: #17420 (contributor flagged the leak; the original fix used pool.shutdown(wait=True) which would have converted the leak into a hang — caller blocks forever on the same stuck coroutine). Credit for identifying the leak goes to the contributor. Co-authored-by: 0z! <162235745+0z1-ghb@users.noreply.github.com> --- model_tools.py | 51 +++++++++++-- scripts/release.py | 1 + tests/test_model_tools_async_bridge.py | 99 +++++++++++++++++++++++--- 3 files changed, 135 insertions(+), 16 deletions(-) diff --git a/model_tools.py b/model_tools.py index d85a8b8efd..66aaaf8f79 100644 --- a/model_tools.py +++ b/model_tools.py @@ -107,17 +107,58 @@ def _run_async(coro): loop = None if loop and loop.is_running(): - # Inside an async context (gateway, RL env) — run in a fresh thread. + # Inside an async context (gateway, RL env) — run in a fresh thread + # with its own event loop we own a reference to, so on timeout we + # can cancel the task inside that loop (ThreadPoolExecutor.cancel() + # only works on not-yet-started futures — it's a no-op on a running + # worker, which previously leaked the thread on every 300 s timeout). import concurrent.futures + + worker_loop: Optional[asyncio.AbstractEventLoop] = None + loop_ready = threading.Event() + + def _run_in_worker(): + nonlocal worker_loop + worker_loop = asyncio.new_event_loop() + loop_ready.set() + try: + asyncio.set_event_loop(worker_loop) + return worker_loop.run_until_complete(coro) + finally: + try: + # Cancel anything still pending (e.g. task cancelled + # externally via call_soon_threadsafe on timeout). + pending = asyncio.all_tasks(worker_loop) + for t in pending: + t.cancel() + if pending: + worker_loop.run_until_complete( + asyncio.gather(*pending, return_exceptions=True) + ) + except Exception: + pass + worker_loop.close() + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) - future = pool.submit(asyncio.run, coro) + future = pool.submit(_run_in_worker) try: return future.result(timeout=300) except concurrent.futures.TimeoutError: - future.cancel() + # Cancel the coroutine inside its own loop so the worker thread + # can wind down instead of running forever. + if loop_ready.wait(timeout=1.0) and worker_loop is not None: + try: + for t in asyncio.all_tasks(worker_loop): + worker_loop.call_soon_threadsafe(t.cancel) + except RuntimeError: + # Loop already closed — nothing to cancel. + pass raise finally: - pool.shutdown(wait=False, cancel_futures=True) + # wait=False: don't block the caller on a stuck coroutine. We've + # already requested cancellation above; the worker will exit + # once the coroutine observes it (usually at the next await). + pool.shutdown(wait=False) # If we're on a worker thread (e.g., parallel tool execution in # delegate_task), use a per-thread persistent loop. This avoids @@ -737,7 +778,7 @@ def handle_function_call( except Exception as e: error_msg = f"Error executing {function_name}: {str(e)}" - logger.error(error_msg) + logger.exception(error_msg) return json.dumps({"error": error_msg}, ensure_ascii=False) diff --git a/scripts/release.py b/scripts/release.py index bee51a498a..aa4bccb909 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -60,6 +60,7 @@ AUTHOR_MAP = { "johnnncenaaa77@gmail.com": "johnncenae", "thomasjhon6666@gmail.com": "ThomassJonax", "focusflow.app.help@gmail.com": "yes999zc", + "162235745+0z1-ghb@users.noreply.github.com": "0z1-ghb", "yes999zc@163.com": "yes999zc", "343873859@qq.com": "DrStrangerUJN", "uzmpsk.dilekakbas@gmail.com": "dlkakbs", diff --git a/tests/test_model_tools_async_bridge.py b/tests/test_model_tools_async_bridge.py index d6266d7c36..ed0a85cd35 100644 --- a/tests/test_model_tools_async_bridge.py +++ b/tests/test_model_tools_async_bridge.py @@ -199,20 +199,22 @@ class TestRunAsyncWithRunningLoop: @pytest.mark.asyncio async def test_timeout_uses_nonblocking_executor_shutdown(self, monkeypatch): - """A timeout in the running-loop branch must not wait for the worker. + """A timeout in the running-loop branch must not block the caller. - ThreadPoolExecutor's context manager performs shutdown(wait=True). - If _run_async relies on that path after future.result(timeout=...) - times out, the timeout does not bound wall-clock time because the - caller still waits for the stuck coroutine's thread to finish. + If shutdown ever waits for a stuck worker, a tool coroutine that + ignores (or can't observe) cancellation would hang the whole agent. + Guard: the caller must raise TimeoutError and pool.shutdown must be + called with wait=False. The worker's own event loop handles cleanup + (cancellation is scheduled via call_soon_threadsafe before the + caller returns). """ import concurrent.futures from model_tools import _run_async events = { - "cancelled": False, "result_timeout": None, "shutdown_calls": [], + "submitted_fn": None, } class TimeoutFuture: @@ -221,7 +223,6 @@ class TestRunAsyncWithRunningLoop: raise concurrent.futures.TimeoutError() def cancel(self): - events["cancelled"] = True return True class FakeExecutor: @@ -236,8 +237,10 @@ class TestRunAsyncWithRunningLoop: return False def submit(self, fn, *args, **kwargs): - if args and hasattr(args[0], "close"): - args[0].close() + # Record which function got submitted -- should be the + # in-function worker wrapper, not bare asyncio.run, so we + # know _run_async is using a loop it owns and can cancel. + events["submitted_fn"] = getattr(fn, "__name__", repr(fn)) return TimeoutFuture() def shutdown(self, wait=True, cancel_futures=False): @@ -256,8 +259,82 @@ class TestRunAsyncWithRunningLoop: _run_async(_never_finishes()) assert events["result_timeout"] == 300 - assert events["cancelled"] is True - assert events["shutdown_calls"] == [(False, True)] + # The worker wrapper creates its own event loop so _run_async can + # cancel the task on timeout — this must NOT be bare asyncio.run. + assert events["submitted_fn"] != "run", ( + "_run_async submitted asyncio.run directly — it must submit a " + "worker wrapper that owns the event loop so timeouts can cancel " + "the task" + ) + # Critical: shutdown must NOT wait. If wait=True, a stuck coroutine + # would freeze the caller (converts a thread leak into a hang). + assert events["shutdown_calls"], "shutdown was never called" + for wait, _cancel in events["shutdown_calls"]: + assert wait is False, ( + f"shutdown called with wait={wait} — a stuck tool coroutine " + f"would hang the caller indefinitely" + ) + + @pytest.mark.asyncio + async def test_timeout_cancels_coroutine_in_worker_loop(self, monkeypatch): + """On timeout, the worker's event loop must receive a cancel request + so the coroutine stops and the thread exits — not leaked. + + Before the fix, future.cancel() on a running ThreadPoolExecutor + future is a no-op, so the worker thread kept running the coroutine + to completion (leaking one thread per tool-timeout). + """ + from model_tools import _run_async + + # Shrink the 300s internal timeout by patching future.result. + # We do this surgically: let everything else run for real so the + # worker loop actually exists and can observe cancellation. + import concurrent.futures as _cf + + real_pool_cls = _cf.ThreadPoolExecutor + + class FastTimeoutPool(real_pool_cls): + def __init__(self, *a, **kw): + super().__init__(*a, **kw) + + # Patch future.result to time out after 1s instead of 300s. + real_result = _cf.Future.result + + def fast_result(self, timeout=None): + return real_result(self, timeout=1.0 if timeout == 300 else timeout) + + monkeypatch.setattr(_cf.Future, "result", fast_result) + + cancel_observed = threading.Event() + + async def _slow_cancellable(): + try: + await asyncio.sleep(60) + except asyncio.CancelledError: + cancel_observed.set() + raise + + import time as _time + t0 = _time.time() + with pytest.raises(_cf.TimeoutError): + _run_async(_slow_cancellable()) + elapsed = _time.time() - t0 + + # Caller must return fast (no hang waiting for the coro). + assert elapsed < 3.0, ( + f"_run_async blocked caller for {elapsed:.1f}s — should return " + f"on timeout regardless of whether the coroutine has finished" + ) + + # Worker thread must cancel the task (not leak). + deadline = _time.time() + 5 + while not cancel_observed.is_set() and _time.time() < deadline: + _time.sleep(0.05) + assert cancel_observed.is_set(), ( + "Coroutine never received CancelledError — worker thread leaked " + "(ThreadPoolExecutor.cancel() is a no-op on a running future; " + "_run_async must cancel the task inside its worker loop)" + ) # ---------------------------------------------------------------------------