feat(gateway): multiplex phase 2 — fail-closed profile credential isolation (Workstream A)

The credential gate. When multiplexing is active, a profile's secrets resolve
from a context-local scope, never the process-global os.environ (which in a
multiplexer may hold another profile's keys, and is inherited by every
subprocess spawned with env=dict(os.environ)).

- agent/secret_scope.py: get_secret() backed by a secret-scope contextvar.
  FAIL-CLOSED: when multiplex is active and no scope is installed, an unscoped
  read RAISES UnscopedSecretError instead of falling back to os.environ — a
  missed/new call site crashes loudly at that line rather than leaking a
  cross-profile value. Genuinely-global vars (HERMES_*, PATH, kanban paths,
  …) keep reading os.environ via an allowlist. load_env_file/build_profile_
  secret_scope parse a profile .env into an isolated dict WITHOUT mutating
  os.environ. Off by default => transparent os.getenv behavior.
- hermes_cli/runtime_provider.py: all credential/provider/base-url reads go
  through _getenv -> get_secret.
- agent/credential_pool.py: env fallbacks route through get_secret (the
  ~/.hermes/.env-first preference is preserved and already profile-correct via
  the home override).
- tools/mcp_tool.py: MCP config  interpolation resolves through
  get_secret, so a server's  picks up the routed profile's value.
- gateway/run.py: set_multiplex_active() at GatewayRunner init; per-turn .env
  reload is a no-op for credentials in multiplex mode (secrets come from the
  scope, not global env); _profile_runtime_scope context manager combines the
  HERMES_HOME override + secret scope; _run_agent wraps _run_agent_inner in
  that scope (resolved via _resolve_profile_home_for_source) when multiplexing.

Propagates into the agent worker thread for free via the existing
copy_context() in _run_in_executor_with_context.

Tests: 13 unit (fail-closed, scope isolation, global allowlist, .env parsing
without environ mutation) + 7 E2E (runtime_provider + MCP interpolation prove
two profiles isolated, unscoped read raises, globals still read environ).
This commit is contained in:
Ben Barclay 2026-06-18 15:51:33 +10:00 committed by Teknium
parent d82f9fa7f7
commit f538470cf4
7 changed files with 603 additions and 32 deletions

View file

@ -15,6 +15,7 @@ from typing import Any, Dict, List, Optional, Set, Tuple
from hermes_constants import OPENROUTER_BASE_URL
from hermes_cli.config import load_env
from agent.secret_scope import get_secret as _get_secret
from agent.credential_persistence import (
is_borrowed_credential_source,
sanitize_borrowed_credential_payload,
@ -1666,7 +1667,7 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup
_env_file = load_env()
def _env_val(key: str) -> str:
return (_env_file.get(key) or os.environ.get(key) or "").strip()
return (_env_file.get(key) or _get_secret(key, "") or "").strip()
anthropic_api_key = _env_val("ANTHROPIC_API_KEY")
anthropic_oauth_env = (
@ -1952,7 +1953,7 @@ def _seed_from_env(provider: str, entries: List[PooledCredential]) -> Tuple[bool
# changes to the .env file.
def _get_env_prefer_dotenv(key: str) -> str:
env_file = load_env()
val = env_file.get(key) or os.environ.get(key) or ""
val = env_file.get(key) or _get_secret(key, "") or ""
return val.strip()
# Honour user suppression — `hermes auth remove <provider> <N>` for an

205
agent/secret_scope.py Normal file
View file

@ -0,0 +1,205 @@
"""Profile-scoped credential resolution for multi-profile gateway multiplexing.
The multiplexing gateway serves many profiles from one process. Each profile
has its own ``.env`` with its own provider keys and platform tokens, so we
**cannot** union them into the process-global ``os.environ`` (that would leak
profile A's keys to profile B's turns, and to every subprocess spawned with
``env=dict(os.environ)``).
This module provides a fail-closed, context-local secret scope:
- ``set_secret_scope(mapping)`` installs the active profile's secrets for the
current task (a contextvar, so it propagates into the agent's worker thread
via ``copy_context()`` exactly like the HERMES_HOME override).
- ``get_secret(name)`` reads from that scope. When multiplexing is **active**
and no scope is set, it RAISES rather than silently falling back to
``os.environ`` an un-migrated or newly-added call site fails loud at that
exact line instead of leaking another profile's value. When multiplexing is
**off** (the default), it transparently reads ``os.environ`` so the
single-profile gateway and every non-gateway caller behave exactly as before.
Design rationale lives in ``docs/design/multiplexing-gateway.md`` (Workstream A).
"""
from __future__ import annotations
import os
from contextvars import ContextVar, Token
from pathlib import Path
from typing import Dict, Mapping, Optional
# ── multiplex-active flag ────────────────────────────────────────────────
# Process-global: set once at gateway startup when gateway.multiplex_profiles
# is true. Governs whether get_secret() fails closed on an unscoped read.
# A plain module global (not a contextvar): it describes the deployment mode,
# not a per-task value.
_MULTIPLEX_ACTIVE: bool = False
def set_multiplex_active(active: bool) -> None:
"""Mark whether the process is running as a profile multiplexer.
Called once at gateway startup. When True, ``get_secret`` fails closed on
an unscoped read instead of falling back to ``os.environ``.
"""
global _MULTIPLEX_ACTIVE
_MULTIPLEX_ACTIVE = bool(active)
def is_multiplex_active() -> bool:
"""Return whether the process is running as a profile multiplexer."""
return _MULTIPLEX_ACTIVE
# ── the secret scope contextvar ──────────────────────────────────────────
_SECRET_SCOPE: ContextVar[Optional[Mapping[str, str]]] = ContextVar(
"_SECRET_SCOPE", default=None
)
class UnscopedSecretError(RuntimeError):
"""Raised when a secret is read in multiplex mode with no scope installed.
This is the fail-closed signal: it means a credential read reached
``get_secret`` without a profile scope active, which in a multiplexer would
otherwise leak whichever profile's value happened to be in ``os.environ``.
The fix is to wrap the call path in ``set_secret_scope(...)`` (the per-turn
/ per-adapter profile scope), not to widen the allowlist.
"""
def set_secret_scope(secrets: Optional[Mapping[str, str]]) -> Token:
"""Install the active profile's secret mapping for the current context.
Returns a token for ``reset_secret_scope``. Pass ``None`` to clear.
"""
return _SECRET_SCOPE.set(secrets)
def reset_secret_scope(token: Token) -> None:
"""Restore the previous secret scope."""
_SECRET_SCOPE.reset(token)
def current_secret_scope() -> Optional[Mapping[str, str]]:
"""Return the active secret mapping, or None when no scope is installed."""
return _SECRET_SCOPE.get()
# ── genuinely-global env vars (NOT per-profile secrets) ──────────────────
# These are process/deployment-level settings, not profile credentials. They
# legitimately live in os.environ and must keep reading from it even in
# multiplex mode — routing them through the fail-closed path would wrongly
# crash. Anything matching is read from os.environ regardless of scope.
#
# Membership test is by exact name OR prefix (see _is_global_env). Keep this
# list tight: when in doubt a value is a profile secret, not a global.
_GLOBAL_ENV_EXACT = frozenset({
# Hermes runtime / deployment
"HERMES_HOME", "HERMES_PROFILE", "HERMES_GATEWAY_LOCK_DIR",
"HERMES_MAX_ITERATIONS", "HERMES_MAX_TOKENS", "HERMES_API_TIMEOUT",
"HERMES_REDACT_SECRETS", "HERMES_NOUS_TIMEOUT_SECONDS",
"_HERMES_GATEWAY",
# OS / interpreter
"PATH", "HOME", "USER", "LANG", "LC_ALL", "TZ", "PWD", "SHELL", "TMPDIR",
"VIRTUAL_ENV", "PYTHONPATH", "SSL_CERT_FILE",
# Kanban paths (per-board, not per-profile-secret)
"HERMES_KANBAN_DB", "HERMES_KANBAN_WORKSPACES_ROOT", "HERMES_KANBAN_BOARD",
})
_GLOBAL_ENV_PREFIXES = (
"HERMES_KANBAN_",
"HERMES_TELEGRAM_", # tuning knobs (batch delays, fallback toggles) — NOT the token
"TERMINAL_", # terminal/sandbox backend settings
)
def _is_global_env(name: str) -> bool:
"""Return True for genuinely process-global (non-profile-secret) env vars."""
if name in _GLOBAL_ENV_EXACT:
return True
return any(name.startswith(p) for p in _GLOBAL_ENV_PREFIXES)
def get_secret(name: str, default: Optional[str] = None) -> Optional[str]:
"""Resolve a credential by env-var name, honoring the active profile scope.
Resolution order:
1. Genuinely-global vars (``_is_global_env``) always read ``os.environ``
they are deployment settings, not profile secrets.
2. When a secret scope is installed (multiplexed turn), read from it; an
absent key returns ``default``. The scope is authoritative we do NOT
fall through to ``os.environ``, because in a multiplexer ``os.environ``
may hold another profile's value.
3. No scope installed:
- multiplex INACTIVE (default deployment): read ``os.environ``
identical to the legacy ``os.getenv`` behavior every caller had before.
- multiplex ACTIVE: FAIL CLOSED. Raise ``UnscopedSecretError`` so the
missing scope is caught loudly instead of leaking a cross-profile value.
"""
if _is_global_env(name):
val = os.environ.get(name)
return val if val is not None else default
scope = _SECRET_SCOPE.get()
if scope is not None:
val = scope.get(name)
return val if val is not None else default
if _MULTIPLEX_ACTIVE:
raise UnscopedSecretError(
f"get_secret({name!r}) called with no profile secret scope active "
f"while multiplexing is on. This credential read must run inside a "
f"set_secret_scope(...) block (the per-turn / per-adapter profile "
f"scope). Reading os.environ here would risk leaking another "
f"profile's value. See docs/design/multiplexing-gateway.md "
f"(Workstream A)."
)
val = os.environ.get(name)
return val if val is not None else default
def load_env_file(env_path: Path) -> Dict[str, str]:
"""Parse a ``.env`` file into a plain dict WITHOUT touching ``os.environ``.
Used to load a profile's secrets into an isolated mapping for
``set_secret_scope``. Mirrors python-dotenv's basic parsing (KEY=VALUE,
``export`` prefix, ``#`` comments, optional matching quotes) but never
mutates the process environment that isolation is the whole point.
"""
secrets: Dict[str, str] = {}
try:
text = env_path.read_text(encoding="utf-8")
except (FileNotFoundError, OSError, UnicodeDecodeError):
return secrets
for raw in text.splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if line.startswith("export "):
line = line[len("export "):].lstrip()
if "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
if not key:
continue
value = value.strip()
if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'):
value = value[1:-1]
secrets[key] = value
return secrets
def build_profile_secret_scope(hermes_home: Path) -> Dict[str, str]:
"""Build a profile's secret mapping from its ``<home>/.env``.
Returns a fresh dict (safe to install via ``set_secret_scope``). Genuinely
global vars are intentionally NOT copied in ``get_secret`` reads those
from ``os.environ`` directly, so the scope holds only profile secrets.
"""
return load_env_file(Path(hermes_home) / ".env")

View file

@ -1186,13 +1186,31 @@ def _reload_runtime_env_preserving_config_authority() -> None:
pick up rotated API keys. config.yaml remains authoritative for agent budget
settings such as agent.max_turns; otherwise a stale HERMES_MAX_ITERATIONS in
.env can replace the startup bridge on later turns.
In multiplex mode this is a NO-OP for the credential reload: secrets come
from the per-turn ``set_secret_scope`` (installed by ``_profile_runtime_scope``)
which loads the routed profile's ``.env`` into an isolated mapping. Mutating
the process-global ``os.environ`` here would defeat that isolation and leak
the default profile's keys to every profile's turns and subprocesses.
"""
from agent.secret_scope import is_multiplex_active
if is_multiplex_active():
# Credentials are resolved from the active profile's secret scope, not
# os.environ. Still honor config.yaml's agent.max_turns bridge below
# using the scoped home, but never reload .env into global env.
_bridge_max_turns_from_config(_hermes_home)
return
load_hermes_dotenv(
hermes_home=_hermes_home,
project_env=Path(__file__).resolve().parents[1] / '.env',
)
_bridge_max_turns_from_config(_hermes_home)
config_path = _hermes_home / 'config.yaml'
def _bridge_max_turns_from_config(home: "Path") -> None:
"""Bridge config.yaml agent.max_turns into HERMES_MAX_ITERATIONS (a global)."""
config_path = home / 'config.yaml'
if not config_path.exists():
return
try:
@ -1218,6 +1236,44 @@ def _current_max_iterations() -> int:
return 90
from contextlib import contextmanager as _contextmanager
@_contextmanager
def _profile_runtime_scope(profile_home: "Path"):
"""Scope config/skills/memory AND credentials to a profile for one turn.
Combines the two seams the multiplexer needs:
1. ``set_hermes_home_override`` redirects ``get_hermes_home()`` (config,
skills, memory, SOUL, sessions) to the profile's home. Contextvar, so
it propagates into the agent worker thread via ``copy_context()``.
2. ``set_secret_scope`` installs the profile's ``.env`` secrets as the
authoritative credential source, so ``get_secret`` reads this profile's
keys and never the process-global ``os.environ`` (which in a
multiplexer may hold another profile's values).
Only used on the multiplexed inbound path. Single-profile gateways never
enter this scope, so their behavior is unchanged. Loading the profile's
``.env`` here does NOT mutate ``os.environ`` ``build_profile_secret_scope``
returns an isolated dict which is what keeps subprocesses (MCP, kanban)
from inheriting cross-profile secrets.
"""
from hermes_constants import set_hermes_home_override, reset_hermes_home_override
from agent.secret_scope import (
build_profile_secret_scope,
set_secret_scope,
reset_secret_scope,
)
home_token = set_hermes_home_override(str(profile_home))
secret_token = set_secret_scope(build_profile_secret_scope(Path(profile_home)))
try:
yield
finally:
reset_secret_scope(secret_token)
reset_hermes_home_override(home_token)
_DOCKER_VOLUME_SPEC_RE = re.compile(r"^(?P<host>.+):(?P<container>/[^:]+?)(?::(?P<options>[^:]+))?$")
_DOCKER_MEDIA_OUTPUT_CONTAINER_PATHS = {"/output", "/outputs"}
@ -2262,6 +2318,15 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
def __init__(self, config: Optional[GatewayConfig] = None):
global _gateway_runner_ref
self.config = config or load_gateway_config()
# Mark the process as a profile multiplexer when configured. This flips
# agent.secret_scope.get_secret() to fail-closed on any unscoped
# credential read, so a missed migration crashes loudly instead of
# leaking a cross-profile value (Workstream A). Inert when off.
try:
from agent.secret_scope import set_multiplex_active
set_multiplex_active(bool(getattr(self.config, "multiplex_profiles", False)))
except Exception:
logger.debug("could not set multiplex-active flag", exc_info=True)
self.adapters: Dict[Platform, BasePlatformAdapter] = {}
self._warn_if_docker_media_delivery_is_risky()
_gateway_runner_ref = _weakref.ref(self)
@ -13805,6 +13870,65 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
channel_prompt: Optional[str] = None,
persist_user_message: Optional[str] = None,
persist_user_timestamp: Optional[float] = None,
) -> Dict[str, Any]:
"""Profile-scoping wrapper around the agent run.
When multiplexing is active, resolve the inbound source's profile and
run the whole turn inside ``_profile_runtime_scope`` so config/skills/
memory resolve to that profile's home AND credentials resolve from that
profile's secret scope (never the process-global ``os.environ``). When
multiplexing is off this is a transparent pass-through zero behavior
change for single-profile gateways.
"""
if not getattr(self.config, "multiplex_profiles", False):
return await self._run_agent_inner(
message, context_prompt, history, source, session_id,
session_key=session_key, run_generation=run_generation,
_interrupt_depth=_interrupt_depth, event_message_id=event_message_id,
channel_prompt=channel_prompt, persist_user_message=persist_user_message,
persist_user_timestamp=persist_user_timestamp,
)
profile_home = self._resolve_profile_home_for_source(source)
with _profile_runtime_scope(profile_home):
return await self._run_agent_inner(
message, context_prompt, history, source, session_id,
session_key=session_key, run_generation=run_generation,
_interrupt_depth=_interrupt_depth, event_message_id=event_message_id,
channel_prompt=channel_prompt, persist_user_message=persist_user_message,
persist_user_timestamp=persist_user_timestamp,
)
def _resolve_profile_home_for_source(self, source: SessionSource) -> "Path":
"""Resolve which profile's HERMES_HOME should serve this inbound source.
Phase 2 baseline: the active profile (the multiplexer's own home). Phase
1/3 wire real per-source attribution (URL prefix, per-credential adapter
ownership) by overriding the resolved profile on the source/adapter; this
method is the single point they hook.
"""
from hermes_cli.profiles import get_active_profile_name, get_profile_dir
try:
name = get_active_profile_name() or "default"
return get_profile_dir(name)
except Exception:
from hermes_constants import get_hermes_home
return get_hermes_home()
async def _run_agent_inner(
self,
message: str,
context_prompt: str,
history: List[Dict[str, Any]],
source: SessionSource,
session_id: str,
session_key: str = None,
run_generation: Optional[int] = None,
_interrupt_depth: int = 0,
event_message_id: Optional[str] = None,
channel_prompt: Optional[str] = None,
persist_user_message: Optional[str] = None,
persist_user_timestamp: Optional[float] = None,
) -> Dict[str, Any]:
"""
Run the agent with the given message and context.

View file

@ -12,6 +12,7 @@ logger = logging.getLogger(__name__)
from hermes_cli import auth as auth_mod
from agent.credential_pool import CredentialPool, PooledCredential, get_custom_provider_pool_key, load_pool
from agent.secret_scope import get_secret as _get_secret
from hermes_cli.auth import (
AuthError,
DEFAULT_CODEX_BASE_URL,
@ -35,6 +36,19 @@ from hermes_constants import OPENROUTER_BASE_URL
from utils import base_url_host_matches, base_url_hostname, env_int
def _getenv(name: str, default: str = "") -> str:
"""Profile-scoped replacement for ``os.getenv`` on credential/provider reads.
Routes through the secret scope (Workstream A): identical to ``os.getenv``
when multiplexing is off, scope-aware (and fail-closed on an unscoped read)
when on. Genuinely-global vars are handled inside ``get_secret`` and still
read ``os.environ``. Keeps the ``(name, default) -> str`` contract every
call site here already relies on.
"""
val = _get_secret(name, default)
return val if val is not None else default
def _normalize_custom_provider_name(value: str) -> str:
return value.strip().lower().replace(" ", "-")
@ -156,7 +170,7 @@ def _host_derived_api_key(base_url: str) -> str:
if sanitized in ("OPENAI", "OPENROUTER", "OLLAMA"):
return ""
env_name = f"{sanitized}_API_KEY"
return (os.getenv(env_name, "") or "").strip()
return (_getenv(env_name, "") or "").strip()
def _auto_detect_local_model(base_url: str) -> str:
@ -437,7 +451,7 @@ def resolve_requested_provider(requested: Optional[str] = None) -> str:
# Prefer the persisted config selection over any stale shell/.env
# provider override so chat uses the endpoint the user last saved.
env_provider = os.getenv("HERMES_INFERENCE_PROVIDER", "").strip().lower()
env_provider = _getenv("HERMES_INFERENCE_PROVIDER", "").strip().lower()
if env_provider:
return env_provider
@ -542,7 +556,7 @@ def _get_named_custom_provider(requested_provider: str) -> Optional[Dict[str, An
name_norm = _normalize_custom_provider_name(ep_name)
# Resolve the API key from the env var name stored in key_env
key_env = str(entry.get("key_env", "") or "").strip()
resolved_api_key = os.getenv(key_env, "").strip() if key_env else ""
resolved_api_key = _getenv(key_env, "").strip() if key_env else ""
# Fall back to inline api_key when key_env is absent or unresolvable
if not resolved_api_key:
resolved_api_key = str(entry.get("api_key", "") or "").strip()
@ -824,8 +838,8 @@ def _resolve_named_custom_runtime(
api_key_candidates = [
(explicit_api_key or "").strip(),
# Gate env key fallbacks on authoritative hosts (#28660)
(os.getenv("OPENAI_API_KEY", "").strip() if _da_is_openai_url else ""),
(os.getenv("OPENROUTER_API_KEY", "").strip() if _da_is_openrouter else ""),
(_getenv("OPENAI_API_KEY", "").strip() if _da_is_openai_url else ""),
(_getenv("OPENROUTER_API_KEY", "").strip() if _da_is_openrouter else ""),
# Bonus (#28660): derive `<VENDOR>_API_KEY` from the host so users
# who set DEEPSEEK_API_KEY / GROQ_API_KEY / MISTRAL_API_KEY get the
# intuitive match without configuring `custom_providers` first.
@ -878,11 +892,11 @@ def _resolve_named_custom_runtime(
api_key_candidates = [
(explicit_api_key or "").strip(),
str(custom_provider.get("api_key", "") or "").strip(),
os.getenv(str(custom_provider.get("key_env", "") or "").strip(), "").strip(),
_getenv(str(custom_provider.get("key_env", "") or "").strip(), "").strip(),
# Gate provider env keys on their authoritative hosts — sending
# OPENAI_API_KEY to a local-llm endpoint leaks credentials (#28660).
(os.getenv("OPENAI_API_KEY", "").strip() if _cp_is_openai_url else ""),
(os.getenv("OPENROUTER_API_KEY", "").strip() if _cp_is_openrouter else ""),
(_getenv("OPENAI_API_KEY", "").strip() if _cp_is_openai_url else ""),
(_getenv("OPENROUTER_API_KEY", "").strip() if _cp_is_openrouter else ""),
# Bonus (#28660): derive `<VENDOR>_API_KEY` from the host as a final
# fallback when key_env wasn't set explicitly.
_host_derived_api_key(base_url),
@ -941,8 +955,8 @@ def _resolve_openrouter_runtime(
except Exception:
pass
env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip()
env_custom_base_url = os.getenv("CUSTOM_BASE_URL", "").strip()
env_openrouter_base_url = _getenv("OPENROUTER_BASE_URL", "").strip()
env_custom_base_url = _getenv("CUSTOM_BASE_URL", "").strip()
# Use config base_url when available and the provider context matches.
# OPENAI_BASE_URL env var is no longer consulted — config.yaml is
@ -982,8 +996,8 @@ def _resolve_openrouter_runtime(
if _is_openrouter_context:
api_key_candidates = [
explicit_api_key,
os.getenv("OPENROUTER_API_KEY"),
os.getenv("OPENAI_API_KEY"),
_getenv("OPENROUTER_API_KEY"),
_getenv("OPENAI_API_KEY"),
]
else:
# Custom endpoint: use api_key from config when using config base_url (#1760).
@ -1003,9 +1017,9 @@ def _resolve_openrouter_runtime(
api_key_candidates = [
explicit_api_key,
(cfg_api_key if use_config_base_url else ""),
(os.getenv("OLLAMA_API_KEY") if _is_ollama_url else ""),
(os.getenv("OPENAI_API_KEY") if (_is_openai_url or _is_openai_azure) else ""),
(os.getenv("OPENROUTER_API_KEY") if _is_openrouter_url else ""),
(_getenv("OLLAMA_API_KEY") if _is_ollama_url else ""),
(_getenv("OPENAI_API_KEY") if (_is_openai_url or _is_openai_azure) else ""),
(_getenv("OPENROUTER_API_KEY") if _is_openrouter_url else ""),
# Bonus (#28660): derive `<VENDOR>_API_KEY` from the host so users
# who set DEEPSEEK_API_KEY / GROQ_API_KEY / MISTRAL_API_KEY get the
# intuitive match. Helper returns "" for IPs/loopback and for env
@ -1108,7 +1122,7 @@ def _resolve_azure_foundry_runtime(
if inferred:
cfg_api_mode = inferred
env_base_url = os.getenv("AZURE_FOUNDRY_BASE_URL", "").strip().rstrip("/")
env_base_url = _getenv("AZURE_FOUNDRY_BASE_URL", "").strip().rstrip("/")
base_url = explicit_base_url_clean or cfg_base_url or env_base_url
if not base_url:
raise AuthError(
@ -1197,7 +1211,7 @@ def _resolve_azure_foundry_runtime(
except Exception:
api_key = ""
if not api_key:
api_key = os.getenv("AZURE_FOUNDRY_API_KEY", "").strip()
api_key = _getenv("AZURE_FOUNDRY_API_KEY", "").strip()
if not api_key:
raise AuthError(
"Azure Foundry requires an API key. Set AZURE_FOUNDRY_API_KEY in "
@ -1297,7 +1311,7 @@ def _resolve_explicit_runtime(
expires_at = state.get("agent_key_expires_at") or state.get("expires_at")
if not api_key:
creds = resolve_nous_runtime_credentials(
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
timeout_seconds=float(_getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
)
api_key = creds.get("api_key", "")
expires_at = creds.get("expires_at")
@ -1326,7 +1340,7 @@ def _resolve_explicit_runtime(
if pconfig and pconfig.auth_type == "api_key":
env_url = ""
if pconfig.base_url_env_var:
env_url = os.getenv(pconfig.base_url_env_var, "").strip().rstrip("/")
env_url = _getenv(pconfig.base_url_env_var, "").strip().rstrip("/")
base_url = explicit_base_url
if not base_url:
@ -1398,8 +1412,8 @@ def resolve_runtime_provider(
if requested_provider == "anthropic" and "azure.com" in _eff_base:
_azure_key = (
(explicit_api_key or "").strip()
or os.getenv("AZURE_ANTHROPIC_KEY", "").strip()
or os.getenv("ANTHROPIC_API_KEY", "").strip()
or _getenv("AZURE_ANTHROPIC_KEY", "").strip()
or _getenv("ANTHROPIC_API_KEY", "").strip()
)
return {
"provider": "anthropic",
@ -1454,8 +1468,8 @@ def resolve_runtime_provider(
if provider == "openrouter":
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
cfg_base_url = str(model_cfg.get("base_url") or "").strip()
env_openai_base_url = os.getenv("OPENAI_BASE_URL", "").strip()
env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip()
env_openai_base_url = _getenv("OPENAI_BASE_URL", "").strip()
env_openrouter_base_url = _getenv("OPENROUTER_BASE_URL", "").strip()
has_custom_endpoint = bool(
explicit_base_url
or env_openai_base_url
@ -1511,7 +1525,7 @@ def resolve_runtime_provider(
if provider == "nous":
try:
creds = resolve_nous_runtime_credentials(
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
timeout_seconds=float(_getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
)
return {
"provider": "nous",
@ -1664,7 +1678,7 @@ def resolve_runtime_provider(
for hint_key in ("key_env", "api_key_env"):
env_var = str(model_cfg.get(hint_key) or "").strip()
if env_var:
token = os.getenv(env_var, "").strip()
token = _getenv(env_var, "").strip()
if token:
break
# Next: an inline api_key on the model config (useful in multi-profile
@ -1674,8 +1688,8 @@ def resolve_runtime_provider(
# Finally fall back to the historical fixed names.
if not token:
token = (
os.getenv("AZURE_ANTHROPIC_KEY", "").strip()
or os.getenv("ANTHROPIC_API_KEY", "").strip()
_getenv("AZURE_ANTHROPIC_KEY", "").strip()
or _getenv("ANTHROPIC_API_KEY", "").strip()
)
if not token:
raise AuthError(

View file

@ -0,0 +1,130 @@
"""Tests for the profile-scoped credential primitive (Workstream A / Phase 2)."""
import pytest
from agent import secret_scope as ss
@pytest.fixture(autouse=True)
def _reset_multiplex():
"""Ensure each test starts and ends with multiplexing off (it's a global)."""
ss.set_multiplex_active(False)
yield
ss.set_multiplex_active(False)
class TestMultiplexInactiveBackwardCompat:
"""Default deployment: get_secret transparently reads os.environ."""
def test_reads_environ(self, monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-test")
assert ss.get_secret("ANTHROPIC_API_KEY") == "sk-test"
def test_missing_returns_default(self, monkeypatch):
monkeypatch.delenv("NOPE_KEY", raising=False)
assert ss.get_secret("NOPE_KEY") is None
assert ss.get_secret("NOPE_KEY", "fallback") == "fallback"
def test_no_raise_without_scope(self, monkeypatch):
monkeypatch.delenv("SOME_KEY", raising=False)
# multiplex off => unscoped read is fine, returns default
assert ss.get_secret("SOME_KEY") is None
class TestMultiplexActiveFailClosed:
"""Multiplex on: an unscoped secret read raises instead of leaking."""
def test_unscoped_read_raises(self, monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-leaky")
ss.set_multiplex_active(True)
with pytest.raises(ss.UnscopedSecretError):
ss.get_secret("ANTHROPIC_API_KEY")
def test_scoped_read_uses_scope_not_environ(self, monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-from-environ")
ss.set_multiplex_active(True)
token = ss.set_secret_scope({"ANTHROPIC_API_KEY": "sk-from-scope"})
try:
assert ss.get_secret("ANTHROPIC_API_KEY") == "sk-from-scope"
finally:
ss.reset_secret_scope(token)
def test_scoped_missing_key_returns_default_not_environ(self, monkeypatch):
# Even though the value exists in os.environ, a scope is authoritative:
# an absent scope key must NOT fall through to the (cross-profile) env.
monkeypatch.setenv("OPENAI_API_KEY", "sk-other-profile")
ss.set_multiplex_active(True)
token = ss.set_secret_scope({"ANTHROPIC_API_KEY": "sk-mine"})
try:
assert ss.get_secret("OPENAI_API_KEY") is None
assert ss.get_secret("OPENAI_API_KEY", "d") == "d"
finally:
ss.reset_secret_scope(token)
def test_global_env_still_reads_environ_under_multiplex(self, monkeypatch):
monkeypatch.setenv("HERMES_HOME", "/opt/data")
ss.set_multiplex_active(True)
# No scope, multiplex on — but HERMES_HOME is global, so no raise.
assert ss.get_secret("HERMES_HOME") == "/opt/data"
def test_kanban_prefix_is_global(self, monkeypatch):
monkeypatch.setenv("HERMES_KANBAN_DB", "/x/kanban.db")
ss.set_multiplex_active(True)
assert ss.get_secret("HERMES_KANBAN_DB") == "/x/kanban.db"
class TestScopeIsolation:
"""Two scopes never see each other's secrets."""
def test_nested_scopes_restore(self):
ss.set_multiplex_active(True)
t1 = ss.set_secret_scope({"K": "a"})
try:
assert ss.get_secret("K") == "a"
t2 = ss.set_secret_scope({"K": "b"})
try:
assert ss.get_secret("K") == "b"
finally:
ss.reset_secret_scope(t2)
assert ss.get_secret("K") == "a"
finally:
ss.reset_secret_scope(t1)
class TestEnvFileParsing:
"""load_env_file parses without mutating os.environ."""
def test_parses_basic(self, tmp_path):
env = tmp_path / ".env"
env.write_text(
"# comment\n"
"ANTHROPIC_API_KEY=sk-abc\n"
"export OPENAI_API_KEY=sk-def\n"
'QUOTED="quoted-value"\n'
"SINGLE='single'\n"
"\n"
"BAD_LINE_NO_EQUALS\n"
)
out = ss.load_env_file(env)
assert out == {
"ANTHROPIC_API_KEY": "sk-abc",
"OPENAI_API_KEY": "sk-def",
"QUOTED": "quoted-value",
"SINGLE": "single",
}
def test_does_not_mutate_environ(self, tmp_path, monkeypatch):
monkeypatch.delenv("ZZZ_KEY", raising=False)
env = tmp_path / ".env"
env.write_text("ZZZ_KEY=secret\n")
ss.load_env_file(env)
import os
assert "ZZZ_KEY" not in os.environ
def test_missing_file_returns_empty(self, tmp_path):
assert ss.load_env_file(tmp_path / "nope.env") == {}
def test_build_profile_secret_scope(self, tmp_path):
(tmp_path / ".env").write_text("ANTHROPIC_API_KEY=sk-profile\n")
assert ss.build_profile_secret_scope(tmp_path) == {
"ANTHROPIC_API_KEY": "sk-profile"
}

View file

@ -0,0 +1,88 @@
"""End-to-end credential isolation proof for multiplex mode (Workstream A).
These exercise the REAL resolution path (runtime_provider, secret scope, MCP
interpolation) rather than mocking it, proving the property that matters: two
profiles with different keys never see each other's, and an unscoped read in
multiplex mode fails closed instead of leaking.
"""
import pytest
from agent import secret_scope as ss
@pytest.fixture(autouse=True)
def _reset(monkeypatch):
ss.set_multiplex_active(False)
yield
ss.set_multiplex_active(False)
class TestRuntimeProviderUsesScope:
"""hermes_cli.runtime_provider._getenv resolves through the secret scope."""
def test_getenv_reads_scope_under_multiplex(self, monkeypatch):
from hermes_cli.runtime_provider import _getenv
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-global-leak")
ss.set_multiplex_active(True)
tok = ss.set_secret_scope({"ANTHROPIC_API_KEY": "sk-profileA"})
try:
assert _getenv("ANTHROPIC_API_KEY") == "sk-profileA"
finally:
ss.reset_secret_scope(tok)
def test_getenv_two_profiles_isolated(self, monkeypatch):
from hermes_cli.runtime_provider import _getenv
ss.set_multiplex_active(True)
tok_a = ss.set_secret_scope({"OPENAI_API_KEY": "sk-A"})
try:
assert _getenv("OPENAI_API_KEY") == "sk-A"
finally:
ss.reset_secret_scope(tok_a)
tok_b = ss.set_secret_scope({"OPENAI_API_KEY": "sk-B"})
try:
assert _getenv("OPENAI_API_KEY") == "sk-B"
finally:
ss.reset_secret_scope(tok_b)
def test_getenv_fails_closed_unscoped(self, monkeypatch):
from hermes_cli.runtime_provider import _getenv
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-leak")
ss.set_multiplex_active(True)
with pytest.raises(ss.UnscopedSecretError):
_getenv("OPENROUTER_API_KEY")
def test_getenv_global_var_still_reads_environ(self, monkeypatch):
from hermes_cli.runtime_provider import _getenv
monkeypatch.setenv("HERMES_MAX_ITERATIONS", "42")
ss.set_multiplex_active(True)
# global var: no scope needed, no raise
assert _getenv("HERMES_MAX_ITERATIONS") == "42"
class TestMcpInterpolationUsesScope:
"""MCP config ${VAR} interpolation resolves through the secret scope."""
def test_interpolation_reads_scope(self, monkeypatch):
from tools.mcp_tool import _interpolate_env_vars
monkeypatch.setenv("MY_MCP_TOKEN", "global-token")
ss.set_multiplex_active(True)
tok = ss.set_secret_scope({"MY_MCP_TOKEN": "profile-token"})
try:
cfg = {"env": {"TOKEN": "${MY_MCP_TOKEN}"}}
assert _interpolate_env_vars(cfg) == {"env": {"TOKEN": "profile-token"}}
finally:
ss.reset_secret_scope(tok)
def test_interpolation_unset_keeps_placeholder(self, monkeypatch):
from tools.mcp_tool import _interpolate_env_vars
monkeypatch.delenv("UNSET_MCP_VAR", raising=False)
# multiplex off: unset var keeps literal placeholder (legacy behavior)
assert _interpolate_env_vars("${UNSET_MCP_VAR}") == "${UNSET_MCP_VAR}"
def test_interpolation_off_reads_environ(self, monkeypatch):
from tools.mcp_tool import _interpolate_env_vars
monkeypatch.setenv("MY_MCP_TOKEN", "env-token")
# multiplex off: legacy os.environ resolution
assert _interpolate_env_vars("${MY_MCP_TOKEN}") == "env-token"

View file

@ -2662,10 +2662,19 @@ def _interrupted_call_result() -> str:
# ---------------------------------------------------------------------------
def _interpolate_env_vars(value):
"""Recursively resolve ``${VAR}`` placeholders from ``os.environ``."""
"""Recursively resolve ``${VAR}`` placeholders.
Resolves from the active profile's secret scope when multiplexing is on
(so an MCP server config's ``${API_KEY}`` picks up the routed profile's
value, not the process-global ``os.environ`` which may hold another
profile's), falling back to ``os.environ`` otherwise. Unset vars keep the
literal ``${VAR}`` placeholder, as before.
"""
from agent.secret_scope import get_secret as _get_secret
if isinstance(value, str):
def _replace(m):
return os.environ.get(m.group(1), m.group(0))
return _get_secret(m.group(1), m.group(0)) or m.group(0)
return _ENV_VAR_PATTERN.sub(_replace, value)
if isinstance(value, dict):
return {k: _interpolate_env_vars(v) for k, v in value.items()}