diff --git a/agent/prompt_builder.py b/agent/prompt_builder.py index 321d46a8b5..08b8fe0a6a 100644 --- a/agent/prompt_builder.py +++ b/agent/prompt_builder.py @@ -487,7 +487,7 @@ def _parse_skill_file(skill_file: Path) -> tuple[bool, dict, str]: (True, {}, "") to err on the side of showing the skill. """ try: - raw = skill_file.read_text(encoding="utf-8")[:2000] + raw = skill_file.read_text(encoding="utf-8") frontmatter, _ = parse_frontmatter(raw) if not skill_matches_platform(frontmatter): @@ -495,7 +495,7 @@ def _parse_skill_file(skill_file: Path) -> tuple[bool, dict, str]: return True, frontmatter, extract_skill_description(frontmatter) except Exception as e: - logger.debug("Failed to parse skill file %s: %s", skill_file, e) + logger.warning("Failed to parse skill file %s: %s", skill_file, e) return True, {}, "" @@ -558,9 +558,10 @@ def build_skills_system_prompt( # ── Layer 1: in-process LRU cache ───────────────────────────────── # Include the resolved platform so per-platform disabled-skill lists # produce distinct cache entries (gateway serves multiple platforms). + from gateway.session_context import get_session_env _platform_hint = ( os.environ.get("HERMES_PLATFORM") - or os.environ.get("HERMES_SESSION_PLATFORM") + or get_session_env("HERMES_SESSION_PLATFORM") or "" ) cache_key = ( diff --git a/agent/skill_utils.py b/agent/skill_utils.py index 6b06a19e36..ba606b358d 100644 --- a/agent/skill_utils.py +++ b/agent/skill_utils.py @@ -145,10 +145,11 @@ def get_disabled_skill_names(platform: str | None = None) -> Set[str]: if not isinstance(skills_cfg, dict): return set() + from gateway.session_context import get_session_env resolved_platform = ( platform or os.getenv("HERMES_PLATFORM") - or os.getenv("HERMES_SESSION_PLATFORM") + or get_session_env("HERMES_SESSION_PLATFORM") ) if resolved_platform: platform_disabled = (skills_cfg.get("platform_disabled") or {}).get( diff --git a/gateway/run.py b/gateway/run.py index c617e6fa4d..741b84628c 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2442,8 +2442,8 @@ class GatewayRunner: # Build session context context = build_session_context(source, self.config, session_entry) - # Set environment variables for tools - self._set_session_env(context) + # Set session context variables for tools (task-local, concurrency-safe) + _session_env_tokens = self._set_session_env(context) # Read privacy.redact_pii from config (re-read per message) _redact_pii = False @@ -3276,8 +3276,8 @@ class GatewayRunner: "Try again or use /reset to start a fresh session." ) finally: - # Clear session env - self._clear_session_env() + # Restore session context variables to their pre-handler state + self._clear_session_env(_session_env_tokens) def _format_session_info(self) -> str: """Resolve current model config and return a formatted info block. @@ -6176,20 +6176,27 @@ class GatewayRunner: return True - def _set_session_env(self, context: SessionContext) -> None: - """Set environment variables for the current session.""" - os.environ["HERMES_SESSION_PLATFORM"] = context.source.platform.value - os.environ["HERMES_SESSION_CHAT_ID"] = context.source.chat_id - if context.source.chat_name: - os.environ["HERMES_SESSION_CHAT_NAME"] = context.source.chat_name - if context.source.thread_id: - os.environ["HERMES_SESSION_THREAD_ID"] = str(context.source.thread_id) - - def _clear_session_env(self) -> None: - """Clear session environment variables.""" - for var in ["HERMES_SESSION_PLATFORM", "HERMES_SESSION_CHAT_ID", "HERMES_SESSION_CHAT_NAME", "HERMES_SESSION_THREAD_ID"]: - if var in os.environ: - del os.environ[var] + def _set_session_env(self, context: SessionContext) -> list: + """Set session context variables for the current async task. + + Uses ``contextvars`` instead of ``os.environ`` so that concurrent + gateway messages cannot overwrite each other's session state. + + Returns a list of reset tokens; pass them to ``_clear_session_env`` + in a ``finally`` block. + """ + from gateway.session_context import set_session_vars + return set_session_vars( + platform=context.source.platform.value, + chat_id=context.source.chat_id, + chat_name=context.source.chat_name or "", + thread_id=str(context.source.thread_id) if context.source.thread_id else "", + ) + + def _clear_session_env(self, tokens: list) -> None: + """Restore session context variables to their pre-handler values.""" + from gateway.session_context import clear_session_vars + clear_session_vars(tokens) async def _enrich_message_with_vision( self, diff --git a/gateway/session_context.py b/gateway/session_context.py new file mode 100644 index 0000000000..775cd8698b --- /dev/null +++ b/gateway/session_context.py @@ -0,0 +1,113 @@ +""" +Session-scoped context variables for the Hermes gateway. + +Replaces the previous ``os.environ``-based session state +(``HERMES_SESSION_PLATFORM``, ``HERMES_SESSION_CHAT_ID``, etc.) with +Python's ``contextvars.ContextVar``. + +**Why this matters** + +The gateway processes messages concurrently via ``asyncio``. When two +messages arrive at the same time the old code did: + + os.environ["HERMES_SESSION_THREAD_ID"] = str(context.source.thread_id) + +Because ``os.environ`` is *process-global*, Message A's value was +silently overwritten by Message B before Message A's agent finished +running. Background-task notifications and tool calls therefore routed +to the wrong thread. + +``contextvars.ContextVar`` values are *task-local*: each ``asyncio`` +task (and any ``run_in_executor`` thread it spawns) gets its own copy, +so concurrent messages never interfere. + +**Backward compatibility** + +The public helper ``get_session_env(name, default="")`` mirrors the old +``os.getenv("HERMES_SESSION_*", ...)`` calls. Existing tool code only +needs to replace the import + call site: + + # before + import os + platform = os.getenv("HERMES_SESSION_PLATFORM", "") + + # after + from gateway.session_context import get_session_env + platform = get_session_env("HERMES_SESSION_PLATFORM", "") +""" + +from contextvars import ContextVar + +# --------------------------------------------------------------------------- +# Per-task session variables +# --------------------------------------------------------------------------- + +_SESSION_PLATFORM: ContextVar[str] = ContextVar("HERMES_SESSION_PLATFORM", default="") +_SESSION_CHAT_ID: ContextVar[str] = ContextVar("HERMES_SESSION_CHAT_ID", default="") +_SESSION_CHAT_NAME: ContextVar[str] = ContextVar("HERMES_SESSION_CHAT_NAME", default="") +_SESSION_THREAD_ID: ContextVar[str] = ContextVar("HERMES_SESSION_THREAD_ID", default="") + +_VAR_MAP = { + "HERMES_SESSION_PLATFORM": _SESSION_PLATFORM, + "HERMES_SESSION_CHAT_ID": _SESSION_CHAT_ID, + "HERMES_SESSION_CHAT_NAME": _SESSION_CHAT_NAME, + "HERMES_SESSION_THREAD_ID": _SESSION_THREAD_ID, +} + + +def set_session_vars( + platform: str = "", + chat_id: str = "", + chat_name: str = "", + thread_id: str = "", +) -> list: + """Set all session context variables and return reset tokens. + + Call ``clear_session_vars(tokens)`` in a ``finally`` block to restore + the previous values when the handler exits. + + Returns a list of ``Token`` objects (one per variable) that can be + passed to ``clear_session_vars``. + """ + tokens = [ + _SESSION_PLATFORM.set(platform), + _SESSION_CHAT_ID.set(chat_id), + _SESSION_CHAT_NAME.set(chat_name), + _SESSION_THREAD_ID.set(thread_id), + ] + return tokens + + +def clear_session_vars(tokens: list) -> None: + """Restore session context variables to their pre-handler values.""" + if not tokens: + return + vars_in_order = [ + _SESSION_PLATFORM, + _SESSION_CHAT_ID, + _SESSION_CHAT_NAME, + _SESSION_THREAD_ID, + ] + for var, token in zip(vars_in_order, tokens): + var.reset(token) + + +def get_session_env(name: str, default: str = "") -> str: + """Read a session context variable by its legacy ``HERMES_SESSION_*`` name. + + Drop-in replacement for ``os.getenv("HERMES_SESSION_*", default)``. + + Resolution order: + 1. Context variable (set by the gateway for concurrency-safe access) + 2. ``os.environ`` (used by CLI, cron scheduler, and tests) + 3. *default* + """ + import os + + var = _VAR_MAP.get(name) + if var is not None: + value = var.get() + if value: + return value + # Fall back to os.environ for CLI, cron, and test compatibility + return os.getenv(name, default) diff --git a/tests/gateway/test_session_env.py b/tests/gateway/test_session_env.py index 596df89ecf..a7f1345b77 100644 --- a/tests/gateway/test_session_env.py +++ b/tests/gateway/test_session_env.py @@ -3,9 +3,15 @@ import os from gateway.config import Platform from gateway.run import GatewayRunner from gateway.session import SessionContext, SessionSource +from gateway.session_context import ( + get_session_env, + set_session_vars, + clear_session_vars, +) -def test_set_session_env_includes_thread_id(monkeypatch): +def test_set_session_env_sets_contextvars(monkeypatch): + """_set_session_env should populate contextvars, not os.environ.""" runner = object.__new__(GatewayRunner) source = SessionSource( platform=Platform.TELEGRAM, @@ -21,25 +27,93 @@ def test_set_session_env_includes_thread_id(monkeypatch): monkeypatch.delenv("HERMES_SESSION_CHAT_NAME", raising=False) monkeypatch.delenv("HERMES_SESSION_THREAD_ID", raising=False) - runner._set_session_env(context) + tokens = runner._set_session_env(context) - assert os.getenv("HERMES_SESSION_PLATFORM") == "telegram" - assert os.getenv("HERMES_SESSION_CHAT_ID") == "-1001" - assert os.getenv("HERMES_SESSION_CHAT_NAME") == "Group" - assert os.getenv("HERMES_SESSION_THREAD_ID") == "17585" + # Values should be readable via get_session_env (contextvar path) + assert get_session_env("HERMES_SESSION_PLATFORM") == "telegram" + assert get_session_env("HERMES_SESSION_CHAT_ID") == "-1001" + assert get_session_env("HERMES_SESSION_CHAT_NAME") == "Group" + assert get_session_env("HERMES_SESSION_THREAD_ID") == "17585" + + # os.environ should NOT be touched + assert os.getenv("HERMES_SESSION_PLATFORM") is None + assert os.getenv("HERMES_SESSION_THREAD_ID") is None + + # Clean up + runner._clear_session_env(tokens) -def test_clear_session_env_removes_thread_id(monkeypatch): +def test_clear_session_env_restores_previous_state(monkeypatch): + """_clear_session_env should restore contextvars to their pre-handler values.""" runner = object.__new__(GatewayRunner) - monkeypatch.setenv("HERMES_SESSION_PLATFORM", "telegram") - monkeypatch.setenv("HERMES_SESSION_CHAT_ID", "-1001") - monkeypatch.setenv("HERMES_SESSION_CHAT_NAME", "Group") - monkeypatch.setenv("HERMES_SESSION_THREAD_ID", "17585") + monkeypatch.delenv("HERMES_SESSION_PLATFORM", raising=False) + monkeypatch.delenv("HERMES_SESSION_CHAT_ID", raising=False) + monkeypatch.delenv("HERMES_SESSION_CHAT_NAME", raising=False) + monkeypatch.delenv("HERMES_SESSION_THREAD_ID", raising=False) - runner._clear_session_env() + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1001", + chat_name="Group", + chat_type="group", + thread_id="17585", + ) + context = SessionContext(source=source, connected_platforms=[], home_channels={}) - assert os.getenv("HERMES_SESSION_PLATFORM") is None - assert os.getenv("HERMES_SESSION_CHAT_ID") is None - assert os.getenv("HERMES_SESSION_CHAT_NAME") is None - assert os.getenv("HERMES_SESSION_THREAD_ID") is None + tokens = runner._set_session_env(context) + assert get_session_env("HERMES_SESSION_PLATFORM") == "telegram" + + runner._clear_session_env(tokens) + + # After clear, contextvars should return to defaults (empty) + assert get_session_env("HERMES_SESSION_PLATFORM") == "" + assert get_session_env("HERMES_SESSION_CHAT_ID") == "" + assert get_session_env("HERMES_SESSION_CHAT_NAME") == "" + assert get_session_env("HERMES_SESSION_THREAD_ID") == "" + + +def test_get_session_env_falls_back_to_os_environ(monkeypatch): + """get_session_env should fall back to os.environ when contextvar is unset.""" + monkeypatch.setenv("HERMES_SESSION_PLATFORM", "discord") + + # No contextvar set — should read from os.environ + assert get_session_env("HERMES_SESSION_PLATFORM") == "discord" + + # Now set a contextvar — should prefer it + tokens = set_session_vars(platform="telegram") + assert get_session_env("HERMES_SESSION_PLATFORM") == "telegram" + + # Restore — should fall back to os.environ again + clear_session_vars(tokens) + assert get_session_env("HERMES_SESSION_PLATFORM") == "discord" + + +def test_get_session_env_default_when_nothing_set(monkeypatch): + """get_session_env returns default when neither contextvar nor env is set.""" + monkeypatch.delenv("HERMES_SESSION_PLATFORM", raising=False) + + assert get_session_env("HERMES_SESSION_PLATFORM") == "" + assert get_session_env("HERMES_SESSION_PLATFORM", "fallback") == "fallback" + + +def test_set_session_env_handles_missing_optional_fields(): + """_set_session_env should handle None chat_name and thread_id gracefully.""" + runner = object.__new__(GatewayRunner) + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1001", + chat_name=None, + chat_type="private", + thread_id=None, + ) + context = SessionContext(source=source, connected_platforms=[], home_channels={}) + + tokens = runner._set_session_env(context) + + assert get_session_env("HERMES_SESSION_PLATFORM") == "telegram" + assert get_session_env("HERMES_SESSION_CHAT_ID") == "-1001" + assert get_session_env("HERMES_SESSION_CHAT_NAME") == "" + assert get_session_env("HERMES_SESSION_THREAD_ID") == "" + + runner._clear_session_env(tokens) diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 8f746d1be9..3018b8731f 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -64,14 +64,15 @@ def _scan_cron_prompt(prompt: str) -> str: def _origin_from_env() -> Optional[Dict[str, str]]: - origin_platform = os.getenv("HERMES_SESSION_PLATFORM") - origin_chat_id = os.getenv("HERMES_SESSION_CHAT_ID") + from gateway.session_context import get_session_env + origin_platform = get_session_env("HERMES_SESSION_PLATFORM") + origin_chat_id = get_session_env("HERMES_SESSION_CHAT_ID") if origin_platform and origin_chat_id: return { "platform": origin_platform, "chat_id": origin_chat_id, - "chat_name": os.getenv("HERMES_SESSION_CHAT_NAME"), - "thread_id": os.getenv("HERMES_SESSION_THREAD_ID"), + "chat_name": get_session_env("HERMES_SESSION_CHAT_NAME") or None, + "thread_id": get_session_env("HERMES_SESSION_THREAD_ID") or None, } return None diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index 91f752b413..0287b5e040 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -212,7 +212,8 @@ def _handle_send(args): if isinstance(result, dict) and result.get("success") and mirror_text: try: from gateway.mirror import mirror_to_session - source_label = os.getenv("HERMES_SESSION_PLATFORM", "cli") + from gateway.session_context import get_session_env + source_label = get_session_env("HERMES_SESSION_PLATFORM", "cli") if mirror_to_session(platform_name, chat_id, mirror_text, source_label=source_label, thread_id=thread_id): result["mirrored"] = True except Exception: @@ -1023,7 +1024,8 @@ async def _send_feishu(pconfig, chat_id, message, media_files=None, thread_id=No def _check_send_message(): """Gate send_message on gateway running (always available on messaging platforms).""" - platform = os.getenv("HERMES_SESSION_PLATFORM", "") + from gateway.session_context import get_session_env + platform = get_session_env("HERMES_SESSION_PLATFORM", "") if platform and platform != "local": return True try: diff --git a/tools/skills_tool.py b/tools/skills_tool.py index 1c7182e838..085ed00550 100644 --- a/tools/skills_tool.py +++ b/tools/skills_tool.py @@ -347,7 +347,8 @@ def _capture_required_environment_variables( def _is_gateway_surface() -> bool: if os.getenv("HERMES_GATEWAY_SESSION"): return True - return bool(os.getenv("HERMES_SESSION_PLATFORM")) + from gateway.session_context import get_session_env + return bool(get_session_env("HERMES_SESSION_PLATFORM")) def _get_terminal_backend_name() -> str: diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index d57078f528..42415a5f14 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -1420,10 +1420,11 @@ def terminal_tool( # In gateway mode, auto-register a fast watcher so the # gateway can detect completion and trigger a new agent # turn. CLI mode uses the completion_queue directly. - _gw_platform = os.getenv("HERMES_SESSION_PLATFORM", "") + from gateway.session_context import get_session_env as _gse + _gw_platform = _gse("HERMES_SESSION_PLATFORM", "") if _gw_platform and not check_interval: - _gw_chat_id = os.getenv("HERMES_SESSION_CHAT_ID", "") - _gw_thread_id = os.getenv("HERMES_SESSION_THREAD_ID", "") + _gw_chat_id = _gse("HERMES_SESSION_CHAT_ID", "") + _gw_thread_id = _gse("HERMES_SESSION_THREAD_ID", "") proc_session.watcher_platform = _gw_platform proc_session.watcher_chat_id = _gw_chat_id proc_session.watcher_thread_id = _gw_thread_id @@ -1445,9 +1446,10 @@ def terminal_tool( result_data["check_interval_note"] = ( f"Requested {check_interval}s raised to minimum 30s" ) - watcher_platform = os.getenv("HERMES_SESSION_PLATFORM", "") - watcher_chat_id = os.getenv("HERMES_SESSION_CHAT_ID", "") - watcher_thread_id = os.getenv("HERMES_SESSION_THREAD_ID", "") + from gateway.session_context import get_session_env as _gse2 + watcher_platform = _gse2("HERMES_SESSION_PLATFORM", "") + watcher_chat_id = _gse2("HERMES_SESSION_CHAT_ID", "") + watcher_thread_id = _gse2("HERMES_SESSION_THREAD_ID", "") # Store on session for checkpoint persistence proc_session.watcher_platform = watcher_platform diff --git a/tools/tts_tool.py b/tools/tts_tool.py index 85fa4974db..be8bc11e37 100644 --- a/tools/tts_tool.py +++ b/tools/tts_tool.py @@ -480,7 +480,8 @@ def text_to_speech_tool( # Telegram voice bubbles require Opus (.ogg); OpenAI and ElevenLabs can # produce Opus natively (no ffmpeg needed). Edge TTS always outputs MP3 # and needs ffmpeg for conversion. - platform = os.getenv("HERMES_SESSION_PLATFORM", "").lower() + from gateway.session_context import get_session_env + platform = get_session_env("HERMES_SESSION_PLATFORM", "").lower() want_opus = (platform == "telegram") # Determine output path