diff --git a/gateway/run.py b/gateway/run.py index 84a5c61acdb..cfbabf1a717 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -25,6 +25,7 @@ except ModuleNotFoundError: pass import asyncio +import concurrent.futures import dataclasses import inspect import json @@ -2641,6 +2642,8 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew self._restart_command_source: Optional[SessionSource] = None self._stop_task: Optional[asyncio.Task] = None self._restart_task: Optional[asyncio.Task] = None + self._executor_lock = threading.Lock() + self._executor: Optional[concurrent.futures.ThreadPoolExecutor] = None # Track running agents per session for interrupt support # Key: session_key, Value: AIAgent instance @@ -7303,6 +7306,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew _db.close() except Exception as _e: logger.debug("SessionDB close error: %s", _e) + GatewayRunner._shutdown_executor(self) logger.info( "Shutdown phase: SessionDB close done at +%.2fs", _phase_elapsed(), @@ -13396,7 +13400,47 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew """Run blocking work in the thread pool while preserving session contextvars.""" loop = asyncio.get_running_loop() ctx = copy_context() - return await loop.run_in_executor(None, ctx.run, func, *args) + return await loop.run_in_executor( + self._get_executor(), + ctx.run, + func, + *args, + ) + + def _get_executor(self) -> concurrent.futures.ThreadPoolExecutor: + """Return the gateway-owned executor for blocking agent work.""" + lock = getattr(self, "_executor_lock", None) + if lock is None: + lock = threading.Lock() + self._executor_lock = lock + + with lock: + executor = getattr(self, "_executor", None) + if executor is None or getattr(executor, "_shutdown", False): + executor = concurrent.futures.ThreadPoolExecutor( + max_workers=10, + thread_name_prefix="hermes-gateway", + ) + self._executor = executor + return executor + + def _shutdown_executor(self) -> None: + """Stop the gateway-owned executor without touching the loop default.""" + lock = getattr(self, "_executor_lock", None) + if lock is None: + return + + with lock: + executor = getattr(self, "_executor", None) + self._executor = None + + if executor is None: + return + + try: + executor.shutdown(wait=False, cancel_futures=True) + except TypeError: + executor.shutdown(wait=False) def _decide_image_input_mode(self) -> str: """Resolve the image-input routing for the currently active model. diff --git a/tests/gateway/test_session_env.py b/tests/gateway/test_session_env.py index b0797467d45..a47653db988 100644 --- a/tests/gateway/test_session_env.py +++ b/tests/gateway/test_session_env.py @@ -317,6 +317,7 @@ async def test_run_in_executor_with_context_preserves_session_env(monkeypatch): ) finally: runner._clear_session_env(tokens) + runner._shutdown_executor() assert result == { "platform": "telegram", @@ -334,7 +335,10 @@ async def test_run_in_executor_with_context_forwards_args(): def add(a, b): return a + b - result = await runner._run_in_executor_with_context(add, 3, 7) + try: + result = await runner._run_in_executor_with_context(add, 3, 7) + finally: + runner._shutdown_executor() assert result == 10 @@ -346,5 +350,45 @@ async def test_run_in_executor_with_context_propagates_exceptions(): def blow_up(): raise ValueError("boom") - with pytest.raises(ValueError, match="boom"): - await runner._run_in_executor_with_context(blow_up) + try: + with pytest.raises(ValueError, match="boom"): + await runner._run_in_executor_with_context(blow_up) + finally: + runner._shutdown_executor() + + +@pytest.mark.asyncio +async def test_run_in_executor_with_context_survives_default_executor_shutdown(): + """Gateway agent work should not depend on asyncio's default executor.""" + runner = object.__new__(GatewayRunner) + loop = asyncio.get_running_loop() + + await loop.run_in_executor(None, lambda: None) + await loop.shutdown_default_executor() + + try: + result = await runner._run_in_executor_with_context(lambda: "ok") + finally: + runner._shutdown_executor() + + assert result == "ok" + + +@pytest.mark.asyncio +async def test_run_in_executor_with_context_recreates_shutdown_gateway_executor(): + """A stopped gateway-owned executor should be replaced on the next use.""" + runner = object.__new__(GatewayRunner) + + try: + first = await runner._run_in_executor_with_context(lambda: "first") + first_executor = runner._executor + runner._shutdown_executor() + + second = await runner._run_in_executor_with_context(lambda: "second") + second_executor = runner._executor + finally: + runner._shutdown_executor() + + assert first == "first" + assert second == "second" + assert second_executor is not first_executor