fix(gateway): use owned executor for agent work

This commit is contained in:
konsisumer 2026-06-04 17:22:21 +02:00 committed by Teknium
parent 52a09d8faf
commit 1011c07966
2 changed files with 92 additions and 4 deletions

View file

@ -25,6 +25,7 @@ except ModuleNotFoundError:
pass
import asyncio
import concurrent.futures
import dataclasses
import inspect
import json
@ -2641,6 +2642,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
self._restart_command_source: Optional[SessionSource] = None
self._stop_task: Optional[asyncio.Task] = None
self._restart_task: Optional[asyncio.Task] = None
self._executor_lock = threading.Lock()
self._executor: Optional[concurrent.futures.ThreadPoolExecutor] = None
# Track running agents per session for interrupt support
# Key: session_key, Value: AIAgent instance
@ -7303,6 +7306,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
_db.close()
except Exception as _e:
logger.debug("SessionDB close error: %s", _e)
GatewayRunner._shutdown_executor(self)
logger.info(
"Shutdown phase: SessionDB close done at +%.2fs",
_phase_elapsed(),
@ -13396,7 +13400,47 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"""Run blocking work in the thread pool while preserving session contextvars."""
loop = asyncio.get_running_loop()
ctx = copy_context()
return await loop.run_in_executor(None, ctx.run, func, *args)
return await loop.run_in_executor(
self._get_executor(),
ctx.run,
func,
*args,
)
def _get_executor(self) -> concurrent.futures.ThreadPoolExecutor:
"""Return the gateway-owned executor for blocking agent work."""
lock = getattr(self, "_executor_lock", None)
if lock is None:
lock = threading.Lock()
self._executor_lock = lock
with lock:
executor = getattr(self, "_executor", None)
if executor is None or getattr(executor, "_shutdown", False):
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=10,
thread_name_prefix="hermes-gateway",
)
self._executor = executor
return executor
def _shutdown_executor(self) -> None:
"""Stop the gateway-owned executor without touching the loop default."""
lock = getattr(self, "_executor_lock", None)
if lock is None:
return
with lock:
executor = getattr(self, "_executor", None)
self._executor = None
if executor is None:
return
try:
executor.shutdown(wait=False, cancel_futures=True)
except TypeError:
executor.shutdown(wait=False)
def _decide_image_input_mode(self) -> str:
"""Resolve the image-input routing for the currently active model.

View file

@ -317,6 +317,7 @@ async def test_run_in_executor_with_context_preserves_session_env(monkeypatch):
)
finally:
runner._clear_session_env(tokens)
runner._shutdown_executor()
assert result == {
"platform": "telegram",
@ -334,7 +335,10 @@ async def test_run_in_executor_with_context_forwards_args():
def add(a, b):
return a + b
result = await runner._run_in_executor_with_context(add, 3, 7)
try:
result = await runner._run_in_executor_with_context(add, 3, 7)
finally:
runner._shutdown_executor()
assert result == 10
@ -346,5 +350,45 @@ async def test_run_in_executor_with_context_propagates_exceptions():
def blow_up():
raise ValueError("boom")
with pytest.raises(ValueError, match="boom"):
await runner._run_in_executor_with_context(blow_up)
try:
with pytest.raises(ValueError, match="boom"):
await runner._run_in_executor_with_context(blow_up)
finally:
runner._shutdown_executor()
@pytest.mark.asyncio
async def test_run_in_executor_with_context_survives_default_executor_shutdown():
"""Gateway agent work should not depend on asyncio's default executor."""
runner = object.__new__(GatewayRunner)
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, lambda: None)
await loop.shutdown_default_executor()
try:
result = await runner._run_in_executor_with_context(lambda: "ok")
finally:
runner._shutdown_executor()
assert result == "ok"
@pytest.mark.asyncio
async def test_run_in_executor_with_context_recreates_shutdown_gateway_executor():
"""A stopped gateway-owned executor should be replaced on the next use."""
runner = object.__new__(GatewayRunner)
try:
first = await runner._run_in_executor_with_context(lambda: "first")
first_executor = runner._executor
runner._shutdown_executor()
second = await runner._run_in_executor_with_context(lambda: "second")
second_executor = runner._executor
finally:
runner._shutdown_executor()
assert first == "first"
assert second == "second"
assert second_executor is not first_executor