import asyncio import os import pytest from gateway.config import Platform from gateway.run import GatewayRunner from gateway.session import SessionContext, SessionSource from gateway.session_context import ( get_session_env, set_session_vars, clear_session_vars, _VAR_MAP, _UNSET, ) @pytest.fixture(autouse=True) def _reset_contextvars(): """Reset all session contextvars to _UNSET between tests. In production each asyncio.Task gets a fresh context copy where the defaults are _UNSET. In tests all functions share the same thread context, so a clear_session_vars() from test A (which sets vars to "") would leak into test B. This fixture ensures each test starts clean. """ yield for var in _VAR_MAP.values(): # Can't use var.reset() without a token; just set back to sentinel. var.set(_UNSET) def test_set_session_env_sets_contextvars(monkeypatch): """_set_session_env should populate contextvars, not os.environ.""" runner = object.__new__(GatewayRunner) source = SessionSource( platform=Platform.TELEGRAM, chat_id="-1001", chat_name="Group", chat_type="group", user_id="123456", user_name="alice", thread_id="17585", ) context = SessionContext(source=source, connected_platforms=[], home_channels={}) monkeypatch.delenv("HERMES_SESSION_PLATFORM", raising=False) monkeypatch.delenv("HERMES_SESSION_CHAT_ID", raising=False) monkeypatch.delenv("HERMES_SESSION_CHAT_NAME", raising=False) monkeypatch.delenv("HERMES_SESSION_USER_ID", raising=False) monkeypatch.delenv("HERMES_SESSION_USER_NAME", raising=False) monkeypatch.delenv("HERMES_SESSION_THREAD_ID", raising=False) tokens = runner._set_session_env(context) # Values should be readable via get_session_env (contextvar path) assert get_session_env("HERMES_SESSION_PLATFORM") == "telegram" assert get_session_env("HERMES_SESSION_CHAT_ID") == "-1001" assert get_session_env("HERMES_SESSION_CHAT_NAME") == "Group" assert get_session_env("HERMES_SESSION_USER_ID") == "123456" assert get_session_env("HERMES_SESSION_USER_NAME") == "alice" assert get_session_env("HERMES_SESSION_THREAD_ID") == "17585" # os.environ should NOT be touched assert os.getenv("HERMES_SESSION_PLATFORM") is None assert os.getenv("HERMES_SESSION_THREAD_ID") is None # Clean up runner._clear_session_env(tokens) def test_clear_session_env_restores_previous_state(monkeypatch): """_clear_session_env should restore contextvars to their pre-handler values.""" runner = object.__new__(GatewayRunner) monkeypatch.delenv("HERMES_SESSION_PLATFORM", raising=False) monkeypatch.delenv("HERMES_SESSION_CHAT_ID", raising=False) monkeypatch.delenv("HERMES_SESSION_CHAT_NAME", raising=False) monkeypatch.delenv("HERMES_SESSION_USER_ID", raising=False) monkeypatch.delenv("HERMES_SESSION_USER_NAME", raising=False) monkeypatch.delenv("HERMES_SESSION_THREAD_ID", raising=False) source = SessionSource( platform=Platform.TELEGRAM, chat_id="-1001", chat_name="Group", chat_type="group", user_id="123456", user_name="alice", thread_id="17585", ) context = SessionContext(source=source, connected_platforms=[], home_channels={}) tokens = runner._set_session_env(context) assert get_session_env("HERMES_SESSION_PLATFORM") == "telegram" assert get_session_env("HERMES_SESSION_USER_ID") == "123456" runner._clear_session_env(tokens) # After clear, contextvars should return to defaults (empty) assert get_session_env("HERMES_SESSION_PLATFORM") == "" assert get_session_env("HERMES_SESSION_CHAT_ID") == "" assert get_session_env("HERMES_SESSION_CHAT_NAME") == "" assert get_session_env("HERMES_SESSION_USER_ID") == "" assert get_session_env("HERMES_SESSION_USER_NAME") == "" assert get_session_env("HERMES_SESSION_THREAD_ID") == "" def test_get_session_env_falls_back_to_os_environ(monkeypatch): """get_session_env should fall back to os.environ when contextvar is unset.""" monkeypatch.setenv("HERMES_SESSION_PLATFORM", "discord") # No contextvar set — should read from os.environ assert get_session_env("HERMES_SESSION_PLATFORM") == "discord" # Now set a contextvar — should prefer it tokens = set_session_vars(platform="telegram") assert get_session_env("HERMES_SESSION_PLATFORM") == "telegram" # After clear — should return "" (explicitly cleared), NOT fall back # to os.environ. This is the fix for #10304: stale os.environ values # must not leak through after a gateway session is cleaned up. clear_session_vars(tokens) assert get_session_env("HERMES_SESSION_PLATFORM") == "" def test_get_session_env_default_when_nothing_set(monkeypatch): """get_session_env returns default when neither contextvar nor env is set.""" monkeypatch.delenv("HERMES_SESSION_PLATFORM", raising=False) assert get_session_env("HERMES_SESSION_PLATFORM") == "" assert get_session_env("HERMES_SESSION_PLATFORM", "fallback") == "fallback" def test_set_session_env_handles_missing_optional_fields(): """_set_session_env should handle None chat_name and thread_id gracefully.""" runner = object.__new__(GatewayRunner) source = SessionSource( platform=Platform.TELEGRAM, chat_id="-1001", chat_name=None, chat_type="private", thread_id=None, ) context = SessionContext(source=source, connected_platforms=[], home_channels={}) tokens = runner._set_session_env(context) assert get_session_env("HERMES_SESSION_PLATFORM") == "telegram" assert get_session_env("HERMES_SESSION_CHAT_ID") == "-1001" assert get_session_env("HERMES_SESSION_CHAT_NAME") == "" assert get_session_env("HERMES_SESSION_THREAD_ID") == "" runner._clear_session_env(tokens) # --------------------------------------------------------------------------- # SESSION_KEY contextvars tests # --------------------------------------------------------------------------- def test_session_key_set_via_contextvars(monkeypatch): """set_session_vars should set HERMES_SESSION_KEY via contextvars.""" monkeypatch.delenv("HERMES_SESSION_KEY", raising=False) tokens = set_session_vars( platform="telegram", chat_id="-1001", session_key="tg:-1001:17585", ) assert get_session_env("HERMES_SESSION_KEY") == "tg:-1001:17585" clear_session_vars(tokens) assert get_session_env("HERMES_SESSION_KEY") == "" def test_session_key_falls_back_to_os_environ(monkeypatch): """get_session_env for SESSION_KEY should fall back to os.environ.""" monkeypatch.setenv("HERMES_SESSION_KEY", "env-session-123") # No contextvar set — should read from os.environ assert get_session_env("HERMES_SESSION_KEY") == "env-session-123" # Set contextvar — should prefer it tokens = set_session_vars(session_key="ctx-session-456") assert get_session_env("HERMES_SESSION_KEY") == "ctx-session-456" # After clear — should return "" (explicitly cleared), not os.environ (#10304) clear_session_vars(tokens) assert get_session_env("HERMES_SESSION_KEY") == "" def test_set_session_env_includes_session_key(): """_set_session_env should propagate session_key from SessionContext.""" runner = object.__new__(GatewayRunner) source = SessionSource( platform=Platform.TELEGRAM, chat_id="-1001", chat_name="Group", chat_type="group", thread_id="17585", ) context = SessionContext( source=source, connected_platforms=[], home_channels={}, session_key="tg:-1001:17585", ) # Capture baseline value before setting (may be non-empty from another # test in the same pytest-xdist worker sharing the context). tokens = runner._set_session_env(context) assert get_session_env("HERMES_SESSION_KEY") == "tg:-1001:17585" runner._clear_session_env(tokens) # After clearing, the session key must not retain the value we just set. # The exact post-clear value depends on context propagation from other # tests, so only check that our value was removed, not what replaced it. assert get_session_env("HERMES_SESSION_KEY") != "tg:-1001:17585" def test_session_key_no_race_condition_with_contextvars(monkeypatch): """Prove contextvars isolates SESSION_KEY across concurrent async tasks. Two tasks set different session keys. With contextvars each task reads back its own value. With os.environ the second task would overwrite the first (the old bug). """ monkeypatch.delenv("HERMES_SESSION_KEY", raising=False) results = {} async def handler(key: str, delay: float): tokens = set_session_vars(session_key=key) try: await asyncio.sleep(delay) read_back = get_session_env("HERMES_SESSION_KEY") results[key] = read_back finally: clear_session_vars(tokens) async def run(): task_a = asyncio.create_task(handler("session-A", 0.15)) await asyncio.sleep(0.05) task_b = asyncio.create_task(handler("session-B", 0.05)) await asyncio.gather(task_a, task_b) asyncio.run(run()) # Both tasks must read back their own session key assert results["session-A"] == "session-A", ( f"Session A got '{results['session-A']}' instead of 'session-A' — race condition!" ) assert results["session-B"] == "session-B", ( f"Session B got '{results['session-B']}' instead of 'session-B' — race condition!" ) @pytest.mark.asyncio async def test_run_in_executor_with_context_preserves_session_env(monkeypatch): """Gateway executor work should inherit session contextvars for tool routing.""" runner = object.__new__(GatewayRunner) monkeypatch.delenv("HERMES_SESSION_PLATFORM", raising=False) monkeypatch.delenv("HERMES_SESSION_CHAT_ID", raising=False) monkeypatch.delenv("HERMES_SESSION_THREAD_ID", raising=False) monkeypatch.delenv("HERMES_SESSION_USER_ID", raising=False) source = SessionSource( platform=Platform.TELEGRAM, chat_id="2144471399", chat_type="dm", user_id="123456", user_name="alice", thread_id=None, ) context = SessionContext( source=source, connected_platforms=[], home_channels={}, session_key="agent:main:telegram:dm:2144471399", ) tokens = runner._set_session_env(context) try: result = await runner._run_in_executor_with_context( lambda: { "platform": get_session_env("HERMES_SESSION_PLATFORM"), "chat_id": get_session_env("HERMES_SESSION_CHAT_ID"), "user_id": get_session_env("HERMES_SESSION_USER_ID"), "session_key": get_session_env("HERMES_SESSION_KEY"), } ) finally: runner._clear_session_env(tokens) assert result == { "platform": "telegram", "chat_id": "2144471399", "user_id": "123456", "session_key": "agent:main:telegram:dm:2144471399", } @pytest.mark.asyncio async def test_run_in_executor_with_context_forwards_args(): """_run_in_executor_with_context should forward *args to the callable.""" runner = object.__new__(GatewayRunner) def add(a, b): return a + b result = await runner._run_in_executor_with_context(add, 3, 7) assert result == 10 @pytest.mark.asyncio async def test_run_in_executor_with_context_propagates_exceptions(): """Exceptions inside the executor should propagate to the caller.""" runner = object.__new__(GatewayRunner) def blow_up(): raise ValueError("boom") with pytest.raises(ValueError, match="boom"): await runner._run_in_executor_with_context(blow_up)