From 8021a735c283b1b9a062ba6e64dae0090214482b Mon Sep 17 00:00:00 2001 From: helix4u <4317663+helix4u@users.noreply.github.com> Date: Thu, 16 Apr 2026 14:27:54 +0530 Subject: [PATCH] fix(gateway): preserve notify context in executor threads Gateway executor work now inherits the active session contextvars via copy_context() so background process watchers retain the correct platform/chat/user/session metadata for routing completion events back to the originating chat. Cherry-picked from #10647 by @helix4u with: - Use asyncio.get_running_loop() instead of deprecated get_event_loop() - Strip trailing whitespace - Add *args forwarding test - Add exception propagation test --- gateway/run.py | 18 ++++---- tests/gateway/test_session_env.py | 69 +++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 7 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 67ec4d420..28a350a39 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -24,6 +24,7 @@ import signal import tempfile import threading import time +from contextvars import copy_context from pathlib import Path from datetime import datetime from typing import Dict, Optional, Any, List @@ -5715,8 +5716,7 @@ class GatewayRunner: task_id=task_id, ) - loop = asyncio.get_event_loop() - result = await loop.run_in_executor(None, run_sync) + result = await self._run_in_executor_with_context(run_sync) response = result.get("final_response", "") if result else "" if not response and result and result.get("error"): @@ -5898,8 +5898,7 @@ class GatewayRunner: task_id=task_id, ) - loop = asyncio.get_event_loop() - result = await loop.run_in_executor(None, run_sync) + result = await self._run_in_executor_with_context(run_sync) response = (result.get("final_response") or "") if result else "" if not response and result and result.get("error"): @@ -7318,7 +7317,13 @@ class GatewayRunner: """Restore session context variables to their pre-handler values.""" from gateway.session_context import clear_session_vars clear_session_vars(tokens) - + + async def _run_in_executor_with_context(self, func, *args): + """Run blocking work in the thread pool while preserving session contextvars.""" + loop = asyncio.get_running_loop() + ctx = copy_context() + return await loop.run_in_executor(None, ctx.run, func, *args) + async def _enrich_message_with_vision( self, user_text: str, @@ -9094,9 +9099,8 @@ class GatewayRunner: _agent_warning_raw = float(os.getenv("HERMES_AGENT_TIMEOUT_WARNING", 900)) _agent_warning = _agent_warning_raw if _agent_warning_raw > 0 else None _warning_fired = False - loop = asyncio.get_event_loop() _executor_task = asyncio.ensure_future( - loop.run_in_executor(None, run_sync) + self._run_in_executor_with_context(run_sync) ) _inactivity_timeout = False diff --git a/tests/gateway/test_session_env.py b/tests/gateway/test_session_env.py index 85899e2fd..c4765c144 100644 --- a/tests/gateway/test_session_env.py +++ b/tests/gateway/test_session_env.py @@ -251,3 +251,72 @@ def test_session_key_no_race_condition_with_contextvars(monkeypatch): assert results["session-B"] == "session-B", ( f"Session B got '{results['session-B']}' instead of 'session-B' — race condition!" ) + + +@pytest.mark.asyncio +async def test_run_in_executor_with_context_preserves_session_env(monkeypatch): + """Gateway executor work should inherit session contextvars for tool routing.""" + runner = object.__new__(GatewayRunner) + monkeypatch.delenv("HERMES_SESSION_PLATFORM", raising=False) + monkeypatch.delenv("HERMES_SESSION_CHAT_ID", raising=False) + monkeypatch.delenv("HERMES_SESSION_THREAD_ID", raising=False) + monkeypatch.delenv("HERMES_SESSION_USER_ID", raising=False) + + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="2144471399", + chat_type="dm", + user_id="123456", + user_name="alice", + thread_id=None, + ) + context = SessionContext( + source=source, + connected_platforms=[], + home_channels={}, + session_key="agent:main:telegram:dm:2144471399", + ) + + tokens = runner._set_session_env(context) + try: + result = await runner._run_in_executor_with_context( + lambda: { + "platform": get_session_env("HERMES_SESSION_PLATFORM"), + "chat_id": get_session_env("HERMES_SESSION_CHAT_ID"), + "user_id": get_session_env("HERMES_SESSION_USER_ID"), + "session_key": get_session_env("HERMES_SESSION_KEY"), + } + ) + finally: + runner._clear_session_env(tokens) + + assert result == { + "platform": "telegram", + "chat_id": "2144471399", + "user_id": "123456", + "session_key": "agent:main:telegram:dm:2144471399", + } + + +@pytest.mark.asyncio +async def test_run_in_executor_with_context_forwards_args(): + """_run_in_executor_with_context should forward *args to the callable.""" + runner = object.__new__(GatewayRunner) + + def add(a, b): + return a + b + + result = await runner._run_in_executor_with_context(add, 3, 7) + assert result == 10 + + +@pytest.mark.asyncio +async def test_run_in_executor_with_context_propagates_exceptions(): + """Exceptions inside the executor should propagate to the caller.""" + runner = object.__new__(GatewayRunner) + + def blow_up(): + raise ValueError("boom") + + with pytest.raises(ValueError, match="boom"): + await runner._run_in_executor_with_context(blow_up)