mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(core): ensure non-blocking executor shutdown on async timeout
This commit is contained in:
parent
d6ed35d047
commit
76c454914a
2 changed files with 70 additions and 2 deletions
|
|
@ -108,9 +108,15 @@ def _run_async(coro):
|
||||||
if loop and loop.is_running():
|
if loop and loop.is_running():
|
||||||
# Inside an async context (gateway, RL env) — run in a fresh thread.
|
# Inside an async context (gateway, RL env) — run in a fresh thread.
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||||
future = pool.submit(asyncio.run, coro)
|
future = pool.submit(asyncio.run, coro)
|
||||||
|
try:
|
||||||
return future.result(timeout=300)
|
return future.result(timeout=300)
|
||||||
|
except concurrent.futures.TimeoutError:
|
||||||
|
future.cancel()
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
pool.shutdown(wait=False, cancel_futures=True)
|
||||||
|
|
||||||
# If we're on a worker thread (e.g., parallel tool execution in
|
# If we're on a worker thread (e.g., parallel tool execution in
|
||||||
# delegate_task), use a per-thread persistent loop. This avoids
|
# delegate_task), use a per-thread persistent loop. This avoids
|
||||||
|
|
|
||||||
|
|
@ -197,6 +197,68 @@ class TestRunAsyncWithRunningLoop:
|
||||||
)
|
)
|
||||||
assert result == 42
|
assert result == 42
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_timeout_uses_nonblocking_executor_shutdown(self, monkeypatch):
|
||||||
|
"""A timeout in the running-loop branch must not wait for the worker.
|
||||||
|
|
||||||
|
ThreadPoolExecutor's context manager performs shutdown(wait=True).
|
||||||
|
If _run_async relies on that path after future.result(timeout=...)
|
||||||
|
times out, the timeout does not bound wall-clock time because the
|
||||||
|
caller still waits for the stuck coroutine's thread to finish.
|
||||||
|
"""
|
||||||
|
import concurrent.futures
|
||||||
|
from model_tools import _run_async
|
||||||
|
|
||||||
|
events = {
|
||||||
|
"cancelled": False,
|
||||||
|
"result_timeout": None,
|
||||||
|
"shutdown_calls": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
class TimeoutFuture:
|
||||||
|
def result(self, timeout=None):
|
||||||
|
events["result_timeout"] = timeout
|
||||||
|
raise concurrent.futures.TimeoutError()
|
||||||
|
|
||||||
|
def cancel(self):
|
||||||
|
events["cancelled"] = True
|
||||||
|
return True
|
||||||
|
|
||||||
|
class FakeExecutor:
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb):
|
||||||
|
self.shutdown(wait=True)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def submit(self, fn, *args, **kwargs):
|
||||||
|
if args and hasattr(args[0], "close"):
|
||||||
|
args[0].close()
|
||||||
|
return TimeoutFuture()
|
||||||
|
|
||||||
|
def shutdown(self, wait=True, cancel_futures=False):
|
||||||
|
events["shutdown_calls"].append((wait, cancel_futures))
|
||||||
|
|
||||||
|
async def _never_finishes():
|
||||||
|
await asyncio.sleep(999)
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
concurrent.futures,
|
||||||
|
"ThreadPoolExecutor",
|
||||||
|
FakeExecutor,
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(concurrent.futures.TimeoutError):
|
||||||
|
_run_async(_never_finishes())
|
||||||
|
|
||||||
|
assert events["result_timeout"] == 300
|
||||||
|
assert events["cancelled"] is True
|
||||||
|
assert events["shutdown_calls"] == [(False, True)]
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Integration: full vision_analyze dispatch chain
|
# Integration: full vision_analyze dispatch chain
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue