hermes-agent/hermes_cli/managed_scope.py
Ben 9cbcc0c9c8 feat(managed-scope): add managed_scope module (resolver, loaders, key helpers)
New hermes_cli/managed_scope.py resolves a system-level managed directory
(HERMES_MANAGED_DIR override > /etc/hermes), parses managed config.yaml/.env
with fail-open semantics, and exposes is_key_managed/is_env_managed helpers.
The system default is ignored under pytest and HERMES_MANAGED_DIR is added to
the conftest env scrub so a real managed scope can't leak into the suite.

Not wired into the load paths yet (Phases 2-3).
2026-06-19 07:46:33 -07:00

171 lines
5.9 KiB
Python

"""Managed scope — IT-pushed, user-immutable config & env layer.
A system-level directory (default ``/etc/hermes``, root-owned and not
user-writable) supplies ``config.yaml`` and ``.env`` values that WIN over the
user's ``~/.hermes/config.yaml`` and ``~/.hermes/.env`` on a per-leaf-key basis.
This is DISTINCT from ``hermes_cli.config.is_managed()`` / ``HERMES_MANAGED``,
which is a coarse package-manager write-lock (declarative-distro / formula
installs). That lock blocks all mutation; this layer injects specific immutable
values. The two are independent and may coexist.
v1 enforcement is filesystem permissions only — see
``docs/design/managed-scope.md`` §7. v1 is Linux/POSIX-first; ``get_managed_dir()``
is the single seam for adding macOS / Windows native locations later.
Attribution: do not reference any third-party product by name in this file.
"""
from __future__ import annotations
import copy
import logging
import os
import threading
from pathlib import Path
from typing import Dict, Optional
import yaml
logger = logging.getLogger(__name__)
# POSIX default. Other-platform locations are a deliberate v2 item; when added,
# they belong ONLY inside get_managed_dir().
_DEFAULT_MANAGED_DIR = Path("/etc/hermes")
_CACHE_LOCK = threading.Lock()
# path_key -> (mtime_ns, size, parsed)
_CONFIG_CACHE: Dict[str, tuple] = {}
_ENV_CACHE: Dict[str, tuple] = {}
def _under_pytest() -> bool:
"""True when running inside the test suite.
Used to ignore the system default ``/etc/hermes`` during tests so a real
managed scope on a developer/CI box can't leak policy into the suite. Tests
that exercise managed scope set ``HERMES_MANAGED_DIR`` explicitly, which is
still honored (the override path below runs before this guard takes effect).
"""
return "PYTEST_CURRENT_TEST" in os.environ
def get_managed_dir() -> Optional[Path]:
"""Resolve the managed-scope directory, or None when no scope is present.
Resolution (highest priority first):
1. ``$HERMES_MANAGED_DIR`` — deployment/bootstrap path override (IT-only;
never persisted to any .env). Honored only when set to a non-empty value
AND the directory exists.
2. ``/etc/hermes`` — POSIX default, when it exists. Ignored under pytest so
a real system managed scope can't leak into the test suite.
A non-existent directory at either tier resolves to None (no managed scope),
which is the common case and must be cheap + side-effect-free.
"""
override = os.environ.get("HERMES_MANAGED_DIR", "").strip()
if override:
p = Path(override)
return p if p.is_dir() else None
if _under_pytest():
return None
return _DEFAULT_MANAGED_DIR if _DEFAULT_MANAGED_DIR.is_dir() else None
def invalidate_managed_cache() -> None:
"""Drop cached managed config/env. For tests and post-edit reloads."""
with _CACHE_LOCK:
_CONFIG_CACHE.clear()
_ENV_CACHE.clear()
def _cached_read(path: Path, cache: Dict[str, tuple], parse):
"""Shared (mtime_ns, size)-keyed read. Returns a deepcopy of the parsed value.
Returns ``None`` when the file is absent or fails to parse (fail-open). A
parse failure is logged LOUDLY — the admin needs to know their policy isn't
being applied — but never raises, so a malformed managed file can't brick
startup.
"""
try:
st = path.stat()
except OSError:
return None # absent
key = (st.st_mtime_ns, st.st_size)
path_key = str(path)
with _CACHE_LOCK:
hit = cache.get(path_key)
if hit is not None and hit[:2] == key:
return copy.deepcopy(hit[2])
try:
with open(path, encoding="utf-8") as f:
parsed = parse(f)
except Exception as exc: # noqa: BLE001 — fail-open, but LOUD
logger.warning(
"managed scope: failed to parse %s: %s — IGNORING this managed file. "
"Admin policy from this file is NOT being applied. Fix and restart.",
path,
exc,
)
return None
with _CACHE_LOCK:
cache[path_key] = (key[0], key[1], copy.deepcopy(parsed))
return parsed
def load_managed_config() -> dict:
"""Parsed managed config.yaml, or {} when absent/malformed (fail-open)."""
managed_dir = get_managed_dir()
if managed_dir is None:
return {}
parsed = _cached_read(
managed_dir / "config.yaml",
_CONFIG_CACHE,
lambda f: yaml.safe_load(f) or {},
)
return parsed if isinstance(parsed, dict) else {}
def load_managed_env() -> Dict[str, str]:
"""Parsed managed .env (KEY=VALUE), or {} when absent (fail-open)."""
managed_dir = get_managed_dir()
if managed_dir is None:
return {}
parsed = _cached_read(managed_dir / ".env", _ENV_CACHE, _parse_env)
return parsed if isinstance(parsed, dict) else {}
def _parse_env(f) -> Dict[str, str]:
out: Dict[str, str] = {}
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
out[key.strip()] = value.strip().strip("\"'")
return out
def _flatten_keys(d: dict, prefix: str = "") -> set:
keys: set = set()
for k, v in d.items():
dotted = f"{prefix}.{k}" if prefix else str(k)
if isinstance(v, dict) and v:
keys |= _flatten_keys(v, dotted)
else:
keys.add(dotted)
return keys
def managed_config_keys() -> set:
"""Dotted leaf keys pinned by the managed config (e.g. {'model.default'})."""
return _flatten_keys(load_managed_config())
def is_key_managed(dotted_key: str) -> bool:
"""True if the exact dotted config key is pinned by the managed layer."""
return dotted_key in managed_config_keys()
def is_env_managed(name: str) -> bool:
"""True if the env var name is pinned by the managed .env layer."""
return name in load_managed_env()