diff --git a/agent/credential_pool.py b/agent/credential_pool.py index 04b22c76a68..b791ac4f82c 100644 --- a/agent/credential_pool.py +++ b/agent/credential_pool.py @@ -15,6 +15,7 @@ from typing import Any, Dict, List, Optional, Set, Tuple from hermes_constants import OPENROUTER_BASE_URL from hermes_cli.config import load_env +from agent.secret_scope import get_secret as _get_secret from agent.credential_persistence import ( is_borrowed_credential_source, sanitize_borrowed_credential_payload, @@ -1666,7 +1667,7 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup _env_file = load_env() def _env_val(key: str) -> str: - return (_env_file.get(key) or os.environ.get(key) or "").strip() + return (_env_file.get(key) or _get_secret(key, "") or "").strip() anthropic_api_key = _env_val("ANTHROPIC_API_KEY") anthropic_oauth_env = ( @@ -1952,7 +1953,7 @@ def _seed_from_env(provider: str, entries: List[PooledCredential]) -> Tuple[bool # changes to the .env file. def _get_env_prefer_dotenv(key: str) -> str: env_file = load_env() - val = env_file.get(key) or os.environ.get(key) or "" + val = env_file.get(key) or _get_secret(key, "") or "" return val.strip() # Honour user suppression — `hermes auth remove ` for an diff --git a/agent/secret_scope.py b/agent/secret_scope.py new file mode 100644 index 00000000000..26022ca9b0e --- /dev/null +++ b/agent/secret_scope.py @@ -0,0 +1,205 @@ +"""Profile-scoped credential resolution for multi-profile gateway multiplexing. + +The multiplexing gateway serves many profiles from one process. Each profile +has its own ``.env`` with its own provider keys and platform tokens, so we +**cannot** union them into the process-global ``os.environ`` (that would leak +profile A's keys to profile B's turns, and to every subprocess spawned with +``env=dict(os.environ)``). + +This module provides a fail-closed, context-local secret scope: + +- ``set_secret_scope(mapping)`` installs the active profile's secrets for the + current task (a contextvar, so it propagates into the agent's worker thread + via ``copy_context()`` exactly like the HERMES_HOME override). +- ``get_secret(name)`` reads from that scope. When multiplexing is **active** + and no scope is set, it RAISES rather than silently falling back to + ``os.environ`` — an un-migrated or newly-added call site fails loud at that + exact line instead of leaking another profile's value. When multiplexing is + **off** (the default), it transparently reads ``os.environ`` so the + single-profile gateway and every non-gateway caller behave exactly as before. + +Design rationale lives in ``docs/design/multiplexing-gateway.md`` (Workstream A). +""" +from __future__ import annotations + +import os +from contextvars import ContextVar, Token +from pathlib import Path +from typing import Dict, Mapping, Optional + + +# ── multiplex-active flag ──────────────────────────────────────────────── +# Process-global: set once at gateway startup when gateway.multiplex_profiles +# is true. Governs whether get_secret() fails closed on an unscoped read. +# A plain module global (not a contextvar): it describes the deployment mode, +# not a per-task value. +_MULTIPLEX_ACTIVE: bool = False + + +def set_multiplex_active(active: bool) -> None: + """Mark whether the process is running as a profile multiplexer. + + Called once at gateway startup. When True, ``get_secret`` fails closed on + an unscoped read instead of falling back to ``os.environ``. + """ + global _MULTIPLEX_ACTIVE + _MULTIPLEX_ACTIVE = bool(active) + + +def is_multiplex_active() -> bool: + """Return whether the process is running as a profile multiplexer.""" + return _MULTIPLEX_ACTIVE + + +# ── the secret scope contextvar ────────────────────────────────────────── +_SECRET_SCOPE: ContextVar[Optional[Mapping[str, str]]] = ContextVar( + "_SECRET_SCOPE", default=None +) + + +class UnscopedSecretError(RuntimeError): + """Raised when a secret is read in multiplex mode with no scope installed. + + This is the fail-closed signal: it means a credential read reached + ``get_secret`` without a profile scope active, which in a multiplexer would + otherwise leak whichever profile's value happened to be in ``os.environ``. + The fix is to wrap the call path in ``set_secret_scope(...)`` (the per-turn + / per-adapter profile scope), not to widen the allowlist. + """ + + +def set_secret_scope(secrets: Optional[Mapping[str, str]]) -> Token: + """Install the active profile's secret mapping for the current context. + + Returns a token for ``reset_secret_scope``. Pass ``None`` to clear. + """ + return _SECRET_SCOPE.set(secrets) + + +def reset_secret_scope(token: Token) -> None: + """Restore the previous secret scope.""" + _SECRET_SCOPE.reset(token) + + +def current_secret_scope() -> Optional[Mapping[str, str]]: + """Return the active secret mapping, or None when no scope is installed.""" + return _SECRET_SCOPE.get() + + +# ── genuinely-global env vars (NOT per-profile secrets) ────────────────── +# These are process/deployment-level settings, not profile credentials. They +# legitimately live in os.environ and must keep reading from it even in +# multiplex mode — routing them through the fail-closed path would wrongly +# crash. Anything matching is read from os.environ regardless of scope. +# +# Membership test is by exact name OR prefix (see _is_global_env). Keep this +# list tight: when in doubt a value is a profile secret, not a global. +_GLOBAL_ENV_EXACT = frozenset({ + # Hermes runtime / deployment + "HERMES_HOME", "HERMES_PROFILE", "HERMES_GATEWAY_LOCK_DIR", + "HERMES_MAX_ITERATIONS", "HERMES_MAX_TOKENS", "HERMES_API_TIMEOUT", + "HERMES_REDACT_SECRETS", "HERMES_NOUS_TIMEOUT_SECONDS", + "_HERMES_GATEWAY", + # OS / interpreter + "PATH", "HOME", "USER", "LANG", "LC_ALL", "TZ", "PWD", "SHELL", "TMPDIR", + "VIRTUAL_ENV", "PYTHONPATH", "SSL_CERT_FILE", + # Kanban paths (per-board, not per-profile-secret) + "HERMES_KANBAN_DB", "HERMES_KANBAN_WORKSPACES_ROOT", "HERMES_KANBAN_BOARD", +}) +_GLOBAL_ENV_PREFIXES = ( + "HERMES_KANBAN_", + "HERMES_TELEGRAM_", # tuning knobs (batch delays, fallback toggles) — NOT the token + "TERMINAL_", # terminal/sandbox backend settings +) + + +def _is_global_env(name: str) -> bool: + """Return True for genuinely process-global (non-profile-secret) env vars.""" + if name in _GLOBAL_ENV_EXACT: + return True + return any(name.startswith(p) for p in _GLOBAL_ENV_PREFIXES) + + +def get_secret(name: str, default: Optional[str] = None) -> Optional[str]: + """Resolve a credential by env-var name, honoring the active profile scope. + + Resolution order: + + 1. Genuinely-global vars (``_is_global_env``) always read ``os.environ`` — + they are deployment settings, not profile secrets. + 2. When a secret scope is installed (multiplexed turn), read from it; an + absent key returns ``default``. The scope is authoritative — we do NOT + fall through to ``os.environ``, because in a multiplexer ``os.environ`` + may hold another profile's value. + 3. No scope installed: + - multiplex INACTIVE (default deployment): read ``os.environ`` — + identical to the legacy ``os.getenv`` behavior every caller had before. + - multiplex ACTIVE: FAIL CLOSED. Raise ``UnscopedSecretError`` so the + missing scope is caught loudly instead of leaking a cross-profile value. + """ + if _is_global_env(name): + val = os.environ.get(name) + return val if val is not None else default + + scope = _SECRET_SCOPE.get() + if scope is not None: + val = scope.get(name) + return val if val is not None else default + + if _MULTIPLEX_ACTIVE: + raise UnscopedSecretError( + f"get_secret({name!r}) called with no profile secret scope active " + f"while multiplexing is on. This credential read must run inside a " + f"set_secret_scope(...) block (the per-turn / per-adapter profile " + f"scope). Reading os.environ here would risk leaking another " + f"profile's value. See docs/design/multiplexing-gateway.md " + f"(Workstream A)." + ) + + val = os.environ.get(name) + return val if val is not None else default + + +def load_env_file(env_path: Path) -> Dict[str, str]: + """Parse a ``.env`` file into a plain dict WITHOUT touching ``os.environ``. + + Used to load a profile's secrets into an isolated mapping for + ``set_secret_scope``. Mirrors python-dotenv's basic parsing (KEY=VALUE, + ``export`` prefix, ``#`` comments, optional matching quotes) but never + mutates the process environment — that isolation is the whole point. + """ + secrets: Dict[str, str] = {} + try: + text = env_path.read_text(encoding="utf-8") + except (FileNotFoundError, OSError, UnicodeDecodeError): + return secrets + + for raw in text.splitlines(): + line = raw.strip() + if not line or line.startswith("#"): + continue + if line.startswith("export "): + line = line[len("export "):].lstrip() + if "=" not in line: + continue + key, _, value = line.partition("=") + key = key.strip() + if not key: + continue + value = value.strip() + if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'): + value = value[1:-1] + secrets[key] = value + + return secrets + + +def build_profile_secret_scope(hermes_home: Path) -> Dict[str, str]: + """Build a profile's secret mapping from its ``/.env``. + + Returns a fresh dict (safe to install via ``set_secret_scope``). Genuinely + global vars are intentionally NOT copied in — ``get_secret`` reads those + from ``os.environ`` directly, so the scope holds only profile secrets. + """ + return load_env_file(Path(hermes_home) / ".env") + diff --git a/gateway/run.py b/gateway/run.py index c7037ec6b25..d0b27680ae9 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1186,13 +1186,31 @@ def _reload_runtime_env_preserving_config_authority() -> None: pick up rotated API keys. config.yaml remains authoritative for agent budget settings such as agent.max_turns; otherwise a stale HERMES_MAX_ITERATIONS in .env can replace the startup bridge on later turns. + + In multiplex mode this is a NO-OP for the credential reload: secrets come + from the per-turn ``set_secret_scope`` (installed by ``_profile_runtime_scope``) + which loads the routed profile's ``.env`` into an isolated mapping. Mutating + the process-global ``os.environ`` here would defeat that isolation and leak + the default profile's keys to every profile's turns and subprocesses. """ + from agent.secret_scope import is_multiplex_active + if is_multiplex_active(): + # Credentials are resolved from the active profile's secret scope, not + # os.environ. Still honor config.yaml's agent.max_turns bridge below + # using the scoped home, but never reload .env into global env. + _bridge_max_turns_from_config(_hermes_home) + return + load_hermes_dotenv( hermes_home=_hermes_home, project_env=Path(__file__).resolve().parents[1] / '.env', ) + _bridge_max_turns_from_config(_hermes_home) - config_path = _hermes_home / 'config.yaml' + +def _bridge_max_turns_from_config(home: "Path") -> None: + """Bridge config.yaml agent.max_turns into HERMES_MAX_ITERATIONS (a global).""" + config_path = home / 'config.yaml' if not config_path.exists(): return try: @@ -1218,6 +1236,44 @@ def _current_max_iterations() -> int: return 90 +from contextlib import contextmanager as _contextmanager + + +@_contextmanager +def _profile_runtime_scope(profile_home: "Path"): + """Scope config/skills/memory AND credentials to a profile for one turn. + + Combines the two seams the multiplexer needs: + 1. ``set_hermes_home_override`` — redirects ``get_hermes_home()`` (config, + skills, memory, SOUL, sessions) to the profile's home. Contextvar, so + it propagates into the agent worker thread via ``copy_context()``. + 2. ``set_secret_scope`` — installs the profile's ``.env`` secrets as the + authoritative credential source, so ``get_secret`` reads this profile's + keys and never the process-global ``os.environ`` (which in a + multiplexer may hold another profile's values). + + Only used on the multiplexed inbound path. Single-profile gateways never + enter this scope, so their behavior is unchanged. Loading the profile's + ``.env`` here does NOT mutate ``os.environ`` — ``build_profile_secret_scope`` + returns an isolated dict — which is what keeps subprocesses (MCP, kanban) + from inheriting cross-profile secrets. + """ + from hermes_constants import set_hermes_home_override, reset_hermes_home_override + from agent.secret_scope import ( + build_profile_secret_scope, + set_secret_scope, + reset_secret_scope, + ) + + home_token = set_hermes_home_override(str(profile_home)) + secret_token = set_secret_scope(build_profile_secret_scope(Path(profile_home))) + try: + yield + finally: + reset_secret_scope(secret_token) + reset_hermes_home_override(home_token) + + _DOCKER_VOLUME_SPEC_RE = re.compile(r"^(?P.+):(?P/[^:]+?)(?::(?P[^:]+))?$") _DOCKER_MEDIA_OUTPUT_CONTAINER_PATHS = {"/output", "/outputs"} @@ -2262,6 +2318,15 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew def __init__(self, config: Optional[GatewayConfig] = None): global _gateway_runner_ref self.config = config or load_gateway_config() + # Mark the process as a profile multiplexer when configured. This flips + # agent.secret_scope.get_secret() to fail-closed on any unscoped + # credential read, so a missed migration crashes loudly instead of + # leaking a cross-profile value (Workstream A). Inert when off. + try: + from agent.secret_scope import set_multiplex_active + set_multiplex_active(bool(getattr(self.config, "multiplex_profiles", False))) + except Exception: + logger.debug("could not set multiplex-active flag", exc_info=True) self.adapters: Dict[Platform, BasePlatformAdapter] = {} self._warn_if_docker_media_delivery_is_risky() _gateway_runner_ref = _weakref.ref(self) @@ -13805,6 +13870,65 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew channel_prompt: Optional[str] = None, persist_user_message: Optional[str] = None, persist_user_timestamp: Optional[float] = None, + ) -> Dict[str, Any]: + """Profile-scoping wrapper around the agent run. + + When multiplexing is active, resolve the inbound source's profile and + run the whole turn inside ``_profile_runtime_scope`` so config/skills/ + memory resolve to that profile's home AND credentials resolve from that + profile's secret scope (never the process-global ``os.environ``). When + multiplexing is off this is a transparent pass-through — zero behavior + change for single-profile gateways. + """ + if not getattr(self.config, "multiplex_profiles", False): + return await self._run_agent_inner( + message, context_prompt, history, source, session_id, + session_key=session_key, run_generation=run_generation, + _interrupt_depth=_interrupt_depth, event_message_id=event_message_id, + channel_prompt=channel_prompt, persist_user_message=persist_user_message, + persist_user_timestamp=persist_user_timestamp, + ) + + profile_home = self._resolve_profile_home_for_source(source) + with _profile_runtime_scope(profile_home): + return await self._run_agent_inner( + message, context_prompt, history, source, session_id, + session_key=session_key, run_generation=run_generation, + _interrupt_depth=_interrupt_depth, event_message_id=event_message_id, + channel_prompt=channel_prompt, persist_user_message=persist_user_message, + persist_user_timestamp=persist_user_timestamp, + ) + + def _resolve_profile_home_for_source(self, source: SessionSource) -> "Path": + """Resolve which profile's HERMES_HOME should serve this inbound source. + + Phase 2 baseline: the active profile (the multiplexer's own home). Phase + 1/3 wire real per-source attribution (URL prefix, per-credential adapter + ownership) by overriding the resolved profile on the source/adapter; this + method is the single point they hook. + """ + from hermes_cli.profiles import get_active_profile_name, get_profile_dir + try: + name = get_active_profile_name() or "default" + return get_profile_dir(name) + except Exception: + from hermes_constants import get_hermes_home + return get_hermes_home() + + async def _run_agent_inner( + self, + message: str, + context_prompt: str, + history: List[Dict[str, Any]], + source: SessionSource, + session_id: str, + session_key: str = None, + run_generation: Optional[int] = None, + _interrupt_depth: int = 0, + event_message_id: Optional[str] = None, + channel_prompt: Optional[str] = None, + persist_user_message: Optional[str] = None, + persist_user_timestamp: Optional[float] = None, ) -> Dict[str, Any]: """ Run the agent with the given message and context. diff --git a/hermes_cli/runtime_provider.py b/hermes_cli/runtime_provider.py index 78b92dcbad9..68919eaac62 100644 --- a/hermes_cli/runtime_provider.py +++ b/hermes_cli/runtime_provider.py @@ -12,6 +12,7 @@ logger = logging.getLogger(__name__) from hermes_cli import auth as auth_mod from agent.credential_pool import CredentialPool, PooledCredential, get_custom_provider_pool_key, load_pool +from agent.secret_scope import get_secret as _get_secret from hermes_cli.auth import ( AuthError, DEFAULT_CODEX_BASE_URL, @@ -35,6 +36,19 @@ from hermes_constants import OPENROUTER_BASE_URL from utils import base_url_host_matches, base_url_hostname, env_int +def _getenv(name: str, default: str = "") -> str: + """Profile-scoped replacement for ``os.getenv`` on credential/provider reads. + + Routes through the secret scope (Workstream A): identical to ``os.getenv`` + when multiplexing is off, scope-aware (and fail-closed on an unscoped read) + when on. Genuinely-global vars are handled inside ``get_secret`` and still + read ``os.environ``. Keeps the ``(name, default) -> str`` contract every + call site here already relies on. + """ + val = _get_secret(name, default) + return val if val is not None else default + + def _normalize_custom_provider_name(value: str) -> str: return value.strip().lower().replace(" ", "-") @@ -156,7 +170,7 @@ def _host_derived_api_key(base_url: str) -> str: if sanitized in ("OPENAI", "OPENROUTER", "OLLAMA"): return "" env_name = f"{sanitized}_API_KEY" - return (os.getenv(env_name, "") or "").strip() + return (_getenv(env_name, "") or "").strip() def _auto_detect_local_model(base_url: str) -> str: @@ -437,7 +451,7 @@ def resolve_requested_provider(requested: Optional[str] = None) -> str: # Prefer the persisted config selection over any stale shell/.env # provider override so chat uses the endpoint the user last saved. - env_provider = os.getenv("HERMES_INFERENCE_PROVIDER", "").strip().lower() + env_provider = _getenv("HERMES_INFERENCE_PROVIDER", "").strip().lower() if env_provider: return env_provider @@ -542,7 +556,7 @@ def _get_named_custom_provider(requested_provider: str) -> Optional[Dict[str, An name_norm = _normalize_custom_provider_name(ep_name) # Resolve the API key from the env var name stored in key_env key_env = str(entry.get("key_env", "") or "").strip() - resolved_api_key = os.getenv(key_env, "").strip() if key_env else "" + resolved_api_key = _getenv(key_env, "").strip() if key_env else "" # Fall back to inline api_key when key_env is absent or unresolvable if not resolved_api_key: resolved_api_key = str(entry.get("api_key", "") or "").strip() @@ -824,8 +838,8 @@ def _resolve_named_custom_runtime( api_key_candidates = [ (explicit_api_key or "").strip(), # Gate env key fallbacks on authoritative hosts (#28660) - (os.getenv("OPENAI_API_KEY", "").strip() if _da_is_openai_url else ""), - (os.getenv("OPENROUTER_API_KEY", "").strip() if _da_is_openrouter else ""), + (_getenv("OPENAI_API_KEY", "").strip() if _da_is_openai_url else ""), + (_getenv("OPENROUTER_API_KEY", "").strip() if _da_is_openrouter else ""), # Bonus (#28660): derive `_API_KEY` from the host so users # who set DEEPSEEK_API_KEY / GROQ_API_KEY / MISTRAL_API_KEY get the # intuitive match without configuring `custom_providers` first. @@ -878,11 +892,11 @@ def _resolve_named_custom_runtime( api_key_candidates = [ (explicit_api_key or "").strip(), str(custom_provider.get("api_key", "") or "").strip(), - os.getenv(str(custom_provider.get("key_env", "") or "").strip(), "").strip(), + _getenv(str(custom_provider.get("key_env", "") or "").strip(), "").strip(), # Gate provider env keys on their authoritative hosts — sending # OPENAI_API_KEY to a local-llm endpoint leaks credentials (#28660). - (os.getenv("OPENAI_API_KEY", "").strip() if _cp_is_openai_url else ""), - (os.getenv("OPENROUTER_API_KEY", "").strip() if _cp_is_openrouter else ""), + (_getenv("OPENAI_API_KEY", "").strip() if _cp_is_openai_url else ""), + (_getenv("OPENROUTER_API_KEY", "").strip() if _cp_is_openrouter else ""), # Bonus (#28660): derive `_API_KEY` from the host as a final # fallback when key_env wasn't set explicitly. _host_derived_api_key(base_url), @@ -941,8 +955,8 @@ def _resolve_openrouter_runtime( except Exception: pass - env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip() - env_custom_base_url = os.getenv("CUSTOM_BASE_URL", "").strip() + env_openrouter_base_url = _getenv("OPENROUTER_BASE_URL", "").strip() + env_custom_base_url = _getenv("CUSTOM_BASE_URL", "").strip() # Use config base_url when available and the provider context matches. # OPENAI_BASE_URL env var is no longer consulted — config.yaml is @@ -982,8 +996,8 @@ def _resolve_openrouter_runtime( if _is_openrouter_context: api_key_candidates = [ explicit_api_key, - os.getenv("OPENROUTER_API_KEY"), - os.getenv("OPENAI_API_KEY"), + _getenv("OPENROUTER_API_KEY"), + _getenv("OPENAI_API_KEY"), ] else: # Custom endpoint: use api_key from config when using config base_url (#1760). @@ -1003,9 +1017,9 @@ def _resolve_openrouter_runtime( api_key_candidates = [ explicit_api_key, (cfg_api_key if use_config_base_url else ""), - (os.getenv("OLLAMA_API_KEY") if _is_ollama_url else ""), - (os.getenv("OPENAI_API_KEY") if (_is_openai_url or _is_openai_azure) else ""), - (os.getenv("OPENROUTER_API_KEY") if _is_openrouter_url else ""), + (_getenv("OLLAMA_API_KEY") if _is_ollama_url else ""), + (_getenv("OPENAI_API_KEY") if (_is_openai_url or _is_openai_azure) else ""), + (_getenv("OPENROUTER_API_KEY") if _is_openrouter_url else ""), # Bonus (#28660): derive `_API_KEY` from the host so users # who set DEEPSEEK_API_KEY / GROQ_API_KEY / MISTRAL_API_KEY get the # intuitive match. Helper returns "" for IPs/loopback and for env @@ -1108,7 +1122,7 @@ def _resolve_azure_foundry_runtime( if inferred: cfg_api_mode = inferred - env_base_url = os.getenv("AZURE_FOUNDRY_BASE_URL", "").strip().rstrip("/") + env_base_url = _getenv("AZURE_FOUNDRY_BASE_URL", "").strip().rstrip("/") base_url = explicit_base_url_clean or cfg_base_url or env_base_url if not base_url: raise AuthError( @@ -1197,7 +1211,7 @@ def _resolve_azure_foundry_runtime( except Exception: api_key = "" if not api_key: - api_key = os.getenv("AZURE_FOUNDRY_API_KEY", "").strip() + api_key = _getenv("AZURE_FOUNDRY_API_KEY", "").strip() if not api_key: raise AuthError( "Azure Foundry requires an API key. Set AZURE_FOUNDRY_API_KEY in " @@ -1297,7 +1311,7 @@ def _resolve_explicit_runtime( expires_at = state.get("agent_key_expires_at") or state.get("expires_at") if not api_key: creds = resolve_nous_runtime_credentials( - timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")), + timeout_seconds=float(_getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")), ) api_key = creds.get("api_key", "") expires_at = creds.get("expires_at") @@ -1326,7 +1340,7 @@ def _resolve_explicit_runtime( if pconfig and pconfig.auth_type == "api_key": env_url = "" if pconfig.base_url_env_var: - env_url = os.getenv(pconfig.base_url_env_var, "").strip().rstrip("/") + env_url = _getenv(pconfig.base_url_env_var, "").strip().rstrip("/") base_url = explicit_base_url if not base_url: @@ -1398,8 +1412,8 @@ def resolve_runtime_provider( if requested_provider == "anthropic" and "azure.com" in _eff_base: _azure_key = ( (explicit_api_key or "").strip() - or os.getenv("AZURE_ANTHROPIC_KEY", "").strip() - or os.getenv("ANTHROPIC_API_KEY", "").strip() + or _getenv("AZURE_ANTHROPIC_KEY", "").strip() + or _getenv("ANTHROPIC_API_KEY", "").strip() ) return { "provider": "anthropic", @@ -1454,8 +1468,8 @@ def resolve_runtime_provider( if provider == "openrouter": cfg_provider = str(model_cfg.get("provider") or "").strip().lower() cfg_base_url = str(model_cfg.get("base_url") or "").strip() - env_openai_base_url = os.getenv("OPENAI_BASE_URL", "").strip() - env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip() + env_openai_base_url = _getenv("OPENAI_BASE_URL", "").strip() + env_openrouter_base_url = _getenv("OPENROUTER_BASE_URL", "").strip() has_custom_endpoint = bool( explicit_base_url or env_openai_base_url @@ -1511,7 +1525,7 @@ def resolve_runtime_provider( if provider == "nous": try: creds = resolve_nous_runtime_credentials( - timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")), + timeout_seconds=float(_getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")), ) return { "provider": "nous", @@ -1664,7 +1678,7 @@ def resolve_runtime_provider( for hint_key in ("key_env", "api_key_env"): env_var = str(model_cfg.get(hint_key) or "").strip() if env_var: - token = os.getenv(env_var, "").strip() + token = _getenv(env_var, "").strip() if token: break # Next: an inline api_key on the model config (useful in multi-profile @@ -1674,8 +1688,8 @@ def resolve_runtime_provider( # Finally fall back to the historical fixed names. if not token: token = ( - os.getenv("AZURE_ANTHROPIC_KEY", "").strip() - or os.getenv("ANTHROPIC_API_KEY", "").strip() + _getenv("AZURE_ANTHROPIC_KEY", "").strip() + or _getenv("ANTHROPIC_API_KEY", "").strip() ) if not token: raise AuthError( diff --git a/tests/agent/test_secret_scope.py b/tests/agent/test_secret_scope.py new file mode 100644 index 00000000000..1b8a1cace40 --- /dev/null +++ b/tests/agent/test_secret_scope.py @@ -0,0 +1,130 @@ +"""Tests for the profile-scoped credential primitive (Workstream A / Phase 2).""" +import pytest + +from agent import secret_scope as ss + + +@pytest.fixture(autouse=True) +def _reset_multiplex(): + """Ensure each test starts and ends with multiplexing off (it's a global).""" + ss.set_multiplex_active(False) + yield + ss.set_multiplex_active(False) + + +class TestMultiplexInactiveBackwardCompat: + """Default deployment: get_secret transparently reads os.environ.""" + + def test_reads_environ(self, monkeypatch): + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-test") + assert ss.get_secret("ANTHROPIC_API_KEY") == "sk-test" + + def test_missing_returns_default(self, monkeypatch): + monkeypatch.delenv("NOPE_KEY", raising=False) + assert ss.get_secret("NOPE_KEY") is None + assert ss.get_secret("NOPE_KEY", "fallback") == "fallback" + + def test_no_raise_without_scope(self, monkeypatch): + monkeypatch.delenv("SOME_KEY", raising=False) + # multiplex off => unscoped read is fine, returns default + assert ss.get_secret("SOME_KEY") is None + + +class TestMultiplexActiveFailClosed: + """Multiplex on: an unscoped secret read raises instead of leaking.""" + + def test_unscoped_read_raises(self, monkeypatch): + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-leaky") + ss.set_multiplex_active(True) + with pytest.raises(ss.UnscopedSecretError): + ss.get_secret("ANTHROPIC_API_KEY") + + def test_scoped_read_uses_scope_not_environ(self, monkeypatch): + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-from-environ") + ss.set_multiplex_active(True) + token = ss.set_secret_scope({"ANTHROPIC_API_KEY": "sk-from-scope"}) + try: + assert ss.get_secret("ANTHROPIC_API_KEY") == "sk-from-scope" + finally: + ss.reset_secret_scope(token) + + def test_scoped_missing_key_returns_default_not_environ(self, monkeypatch): + # Even though the value exists in os.environ, a scope is authoritative: + # an absent scope key must NOT fall through to the (cross-profile) env. + monkeypatch.setenv("OPENAI_API_KEY", "sk-other-profile") + ss.set_multiplex_active(True) + token = ss.set_secret_scope({"ANTHROPIC_API_KEY": "sk-mine"}) + try: + assert ss.get_secret("OPENAI_API_KEY") is None + assert ss.get_secret("OPENAI_API_KEY", "d") == "d" + finally: + ss.reset_secret_scope(token) + + def test_global_env_still_reads_environ_under_multiplex(self, monkeypatch): + monkeypatch.setenv("HERMES_HOME", "/opt/data") + ss.set_multiplex_active(True) + # No scope, multiplex on — but HERMES_HOME is global, so no raise. + assert ss.get_secret("HERMES_HOME") == "/opt/data" + + def test_kanban_prefix_is_global(self, monkeypatch): + monkeypatch.setenv("HERMES_KANBAN_DB", "/x/kanban.db") + ss.set_multiplex_active(True) + assert ss.get_secret("HERMES_KANBAN_DB") == "/x/kanban.db" + + +class TestScopeIsolation: + """Two scopes never see each other's secrets.""" + + def test_nested_scopes_restore(self): + ss.set_multiplex_active(True) + t1 = ss.set_secret_scope({"K": "a"}) + try: + assert ss.get_secret("K") == "a" + t2 = ss.set_secret_scope({"K": "b"}) + try: + assert ss.get_secret("K") == "b" + finally: + ss.reset_secret_scope(t2) + assert ss.get_secret("K") == "a" + finally: + ss.reset_secret_scope(t1) + + +class TestEnvFileParsing: + """load_env_file parses without mutating os.environ.""" + + def test_parses_basic(self, tmp_path): + env = tmp_path / ".env" + env.write_text( + "# comment\n" + "ANTHROPIC_API_KEY=sk-abc\n" + "export OPENAI_API_KEY=sk-def\n" + 'QUOTED="quoted-value"\n' + "SINGLE='single'\n" + "\n" + "BAD_LINE_NO_EQUALS\n" + ) + out = ss.load_env_file(env) + assert out == { + "ANTHROPIC_API_KEY": "sk-abc", + "OPENAI_API_KEY": "sk-def", + "QUOTED": "quoted-value", + "SINGLE": "single", + } + + def test_does_not_mutate_environ(self, tmp_path, monkeypatch): + monkeypatch.delenv("ZZZ_KEY", raising=False) + env = tmp_path / ".env" + env.write_text("ZZZ_KEY=secret\n") + ss.load_env_file(env) + import os + assert "ZZZ_KEY" not in os.environ + + def test_missing_file_returns_empty(self, tmp_path): + assert ss.load_env_file(tmp_path / "nope.env") == {} + + def test_build_profile_secret_scope(self, tmp_path): + (tmp_path / ".env").write_text("ANTHROPIC_API_KEY=sk-profile\n") + assert ss.build_profile_secret_scope(tmp_path) == { + "ANTHROPIC_API_KEY": "sk-profile" + } diff --git a/tests/gateway/test_multiplex_credential_isolation.py b/tests/gateway/test_multiplex_credential_isolation.py new file mode 100644 index 00000000000..748580197c7 --- /dev/null +++ b/tests/gateway/test_multiplex_credential_isolation.py @@ -0,0 +1,88 @@ +"""End-to-end credential isolation proof for multiplex mode (Workstream A). + +These exercise the REAL resolution path (runtime_provider, secret scope, MCP +interpolation) rather than mocking it, proving the property that matters: two +profiles with different keys never see each other's, and an unscoped read in +multiplex mode fails closed instead of leaking. +""" +import pytest + +from agent import secret_scope as ss + + +@pytest.fixture(autouse=True) +def _reset(monkeypatch): + ss.set_multiplex_active(False) + yield + ss.set_multiplex_active(False) + + +class TestRuntimeProviderUsesScope: + """hermes_cli.runtime_provider._getenv resolves through the secret scope.""" + + def test_getenv_reads_scope_under_multiplex(self, monkeypatch): + from hermes_cli.runtime_provider import _getenv + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-global-leak") + ss.set_multiplex_active(True) + tok = ss.set_secret_scope({"ANTHROPIC_API_KEY": "sk-profileA"}) + try: + assert _getenv("ANTHROPIC_API_KEY") == "sk-profileA" + finally: + ss.reset_secret_scope(tok) + + def test_getenv_two_profiles_isolated(self, monkeypatch): + from hermes_cli.runtime_provider import _getenv + ss.set_multiplex_active(True) + + tok_a = ss.set_secret_scope({"OPENAI_API_KEY": "sk-A"}) + try: + assert _getenv("OPENAI_API_KEY") == "sk-A" + finally: + ss.reset_secret_scope(tok_a) + + tok_b = ss.set_secret_scope({"OPENAI_API_KEY": "sk-B"}) + try: + assert _getenv("OPENAI_API_KEY") == "sk-B" + finally: + ss.reset_secret_scope(tok_b) + + def test_getenv_fails_closed_unscoped(self, monkeypatch): + from hermes_cli.runtime_provider import _getenv + monkeypatch.setenv("OPENROUTER_API_KEY", "sk-leak") + ss.set_multiplex_active(True) + with pytest.raises(ss.UnscopedSecretError): + _getenv("OPENROUTER_API_KEY") + + def test_getenv_global_var_still_reads_environ(self, monkeypatch): + from hermes_cli.runtime_provider import _getenv + monkeypatch.setenv("HERMES_MAX_ITERATIONS", "42") + ss.set_multiplex_active(True) + # global var: no scope needed, no raise + assert _getenv("HERMES_MAX_ITERATIONS") == "42" + + +class TestMcpInterpolationUsesScope: + """MCP config ${VAR} interpolation resolves through the secret scope.""" + + def test_interpolation_reads_scope(self, monkeypatch): + from tools.mcp_tool import _interpolate_env_vars + monkeypatch.setenv("MY_MCP_TOKEN", "global-token") + ss.set_multiplex_active(True) + tok = ss.set_secret_scope({"MY_MCP_TOKEN": "profile-token"}) + try: + cfg = {"env": {"TOKEN": "${MY_MCP_TOKEN}"}} + assert _interpolate_env_vars(cfg) == {"env": {"TOKEN": "profile-token"}} + finally: + ss.reset_secret_scope(tok) + + def test_interpolation_unset_keeps_placeholder(self, monkeypatch): + from tools.mcp_tool import _interpolate_env_vars + monkeypatch.delenv("UNSET_MCP_VAR", raising=False) + # multiplex off: unset var keeps literal placeholder (legacy behavior) + assert _interpolate_env_vars("${UNSET_MCP_VAR}") == "${UNSET_MCP_VAR}" + + def test_interpolation_off_reads_environ(self, monkeypatch): + from tools.mcp_tool import _interpolate_env_vars + monkeypatch.setenv("MY_MCP_TOKEN", "env-token") + # multiplex off: legacy os.environ resolution + assert _interpolate_env_vars("${MY_MCP_TOKEN}") == "env-token" diff --git a/tools/mcp_tool.py b/tools/mcp_tool.py index db419196a47..2c5a1be5975 100644 --- a/tools/mcp_tool.py +++ b/tools/mcp_tool.py @@ -2662,10 +2662,19 @@ def _interrupted_call_result() -> str: # --------------------------------------------------------------------------- def _interpolate_env_vars(value): - """Recursively resolve ``${VAR}`` placeholders from ``os.environ``.""" + """Recursively resolve ``${VAR}`` placeholders. + + Resolves from the active profile's secret scope when multiplexing is on + (so an MCP server config's ``${API_KEY}`` picks up the routed profile's + value, not the process-global ``os.environ`` which may hold another + profile's), falling back to ``os.environ`` otherwise. Unset vars keep the + literal ``${VAR}`` placeholder, as before. + """ + from agent.secret_scope import get_secret as _get_secret + if isinstance(value, str): def _replace(m): - return os.environ.get(m.group(1), m.group(0)) + return _get_secret(m.group(1), m.group(0)) or m.group(0) return _ENV_VAR_PATTERN.sub(_replace, value) if isinstance(value, dict): return {k: _interpolate_env_vars(v) for k, v in value.items()}