fix(gateway): replace os.environ session state with contextvars for concurrency safety

When two gateway messages arrived concurrently, _set_session_env wrote
HERMES_SESSION_PLATFORM/CHAT_ID/CHAT_NAME/THREAD_ID into the process-global
os.environ. Because asyncio tasks share the same process, Message B would
overwrite Message A's values mid-flight, causing background-task notifications
and tool calls to route to the wrong thread/chat.

Replace os.environ with Python's contextvars.ContextVar. Each asyncio task
(and any run_in_executor thread it spawns) gets its own copy, so concurrent
messages never interfere.

Changes:
- New gateway/session_context.py with ContextVar definitions, set/clear/get
  helpers, and os.environ fallback for CLI/cron/test backward compatibility
- gateway/run.py: _set_session_env returns reset tokens, _clear_session_env
  accepts them for proper cleanup in finally blocks
- All tool consumers updated: cronjob_tools, send_message_tool, skills_tool,
  terminal_tool (both notify_on_complete AND check_interval blocks), tts_tool,
  agent/skill_utils, agent/prompt_builder
- Tests updated for new contextvar-based API

Fixes #7358

Co-authored-by: teknium1 <127238744+teknium1@users.noreply.github.com>
This commit is contained in:
0xFrank-eth 2026-04-10 16:50:56 -07:00 committed by Teknium
parent dab5ec8245
commit e8034e2f6a
10 changed files with 255 additions and 52 deletions

View file

@ -2442,8 +2442,8 @@ class GatewayRunner:
# Build session context
context = build_session_context(source, self.config, session_entry)
# Set environment variables for tools
self._set_session_env(context)
# Set session context variables for tools (task-local, concurrency-safe)
_session_env_tokens = self._set_session_env(context)
# Read privacy.redact_pii from config (re-read per message)
_redact_pii = False
@ -3276,8 +3276,8 @@ class GatewayRunner:
"Try again or use /reset to start a fresh session."
)
finally:
# Clear session env
self._clear_session_env()
# Restore session context variables to their pre-handler state
self._clear_session_env(_session_env_tokens)
def _format_session_info(self) -> str:
"""Resolve current model config and return a formatted info block.
@ -6176,20 +6176,27 @@ class GatewayRunner:
return True
def _set_session_env(self, context: SessionContext) -> None:
"""Set environment variables for the current session."""
os.environ["HERMES_SESSION_PLATFORM"] = context.source.platform.value
os.environ["HERMES_SESSION_CHAT_ID"] = context.source.chat_id
if context.source.chat_name:
os.environ["HERMES_SESSION_CHAT_NAME"] = context.source.chat_name
if context.source.thread_id:
os.environ["HERMES_SESSION_THREAD_ID"] = str(context.source.thread_id)
def _clear_session_env(self) -> None:
"""Clear session environment variables."""
for var in ["HERMES_SESSION_PLATFORM", "HERMES_SESSION_CHAT_ID", "HERMES_SESSION_CHAT_NAME", "HERMES_SESSION_THREAD_ID"]:
if var in os.environ:
del os.environ[var]
def _set_session_env(self, context: SessionContext) -> list:
"""Set session context variables for the current async task.
Uses ``contextvars`` instead of ``os.environ`` so that concurrent
gateway messages cannot overwrite each other's session state.
Returns a list of reset tokens; pass them to ``_clear_session_env``
in a ``finally`` block.
"""
from gateway.session_context import set_session_vars
return set_session_vars(
platform=context.source.platform.value,
chat_id=context.source.chat_id,
chat_name=context.source.chat_name or "",
thread_id=str(context.source.thread_id) if context.source.thread_id else "",
)
def _clear_session_env(self, tokens: list) -> None:
"""Restore session context variables to their pre-handler values."""
from gateway.session_context import clear_session_vars
clear_session_vars(tokens)
async def _enrich_message_with_vision(
self,