From 76c454914a7c341bcff4b92da9b7d048d4e3b9ae Mon Sep 17 00:00:00 2001 From: Aslaaen Date: Wed, 22 Apr 2026 08:02:42 +0300 Subject: [PATCH] fix(core): ensure non-blocking executor shutdown on async timeout --- model_tools.py | 10 ++++- tests/test_model_tools_async_bridge.py | 62 ++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/model_tools.py b/model_tools.py index db4b46326..bee80f49b 100644 --- a/model_tools.py +++ b/model_tools.py @@ -108,9 +108,15 @@ def _run_async(coro): if loop and loop.is_running(): # Inside an async context (gateway, RL env) — run in a fresh thread. import concurrent.futures - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: - future = pool.submit(asyncio.run, coro) + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = pool.submit(asyncio.run, coro) + try: return future.result(timeout=300) + except concurrent.futures.TimeoutError: + future.cancel() + raise + finally: + pool.shutdown(wait=False, cancel_futures=True) # If we're on a worker thread (e.g., parallel tool execution in # delegate_task), use a per-thread persistent loop. This avoids diff --git a/tests/test_model_tools_async_bridge.py b/tests/test_model_tools_async_bridge.py index d7acb46ac..d6266d7c3 100644 --- a/tests/test_model_tools_async_bridge.py +++ b/tests/test_model_tools_async_bridge.py @@ -197,6 +197,68 @@ class TestRunAsyncWithRunningLoop: ) assert result == 42 + @pytest.mark.asyncio + async def test_timeout_uses_nonblocking_executor_shutdown(self, monkeypatch): + """A timeout in the running-loop branch must not wait for the worker. + + ThreadPoolExecutor's context manager performs shutdown(wait=True). + If _run_async relies on that path after future.result(timeout=...) + times out, the timeout does not bound wall-clock time because the + caller still waits for the stuck coroutine's thread to finish. + """ + import concurrent.futures + from model_tools import _run_async + + events = { + "cancelled": False, + "result_timeout": None, + "shutdown_calls": [], + } + + class TimeoutFuture: + def result(self, timeout=None): + events["result_timeout"] = timeout + raise concurrent.futures.TimeoutError() + + def cancel(self): + events["cancelled"] = True + return True + + class FakeExecutor: + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + self.shutdown(wait=True) + return False + + def submit(self, fn, *args, **kwargs): + if args and hasattr(args[0], "close"): + args[0].close() + return TimeoutFuture() + + def shutdown(self, wait=True, cancel_futures=False): + events["shutdown_calls"].append((wait, cancel_futures)) + + async def _never_finishes(): + await asyncio.sleep(999) + + monkeypatch.setattr( + concurrent.futures, + "ThreadPoolExecutor", + FakeExecutor, + ) + + with pytest.raises(concurrent.futures.TimeoutError): + _run_async(_never_finishes()) + + assert events["result_timeout"] == 300 + assert events["cancelled"] is True + assert events["shutdown_calls"] == [(False, True)] + # --------------------------------------------------------------------------- # Integration: full vision_analyze dispatch chain