mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-22 10:32:00 +00:00
New hermes_cli/managed_scope.py resolves a system-level managed directory (HERMES_MANAGED_DIR override > /etc/hermes), parses managed config.yaml/.env with fail-open semantics, and exposes is_key_managed/is_env_managed helpers. The system default is ignored under pytest and HERMES_MANAGED_DIR is added to the conftest env scrub so a real managed scope can't leak into the suite. Not wired into the load paths yet (Phases 2-3).
171 lines
5.9 KiB
Python
171 lines
5.9 KiB
Python
"""Managed scope — IT-pushed, user-immutable config & env layer.
|
|
|
|
A system-level directory (default ``/etc/hermes``, root-owned and not
|
|
user-writable) supplies ``config.yaml`` and ``.env`` values that WIN over the
|
|
user's ``~/.hermes/config.yaml`` and ``~/.hermes/.env`` on a per-leaf-key basis.
|
|
|
|
This is DISTINCT from ``hermes_cli.config.is_managed()`` / ``HERMES_MANAGED``,
|
|
which is a coarse package-manager write-lock (declarative-distro / formula
|
|
installs). That lock blocks all mutation; this layer injects specific immutable
|
|
values. The two are independent and may coexist.
|
|
|
|
v1 enforcement is filesystem permissions only — see
|
|
``docs/design/managed-scope.md`` §7. v1 is Linux/POSIX-first; ``get_managed_dir()``
|
|
is the single seam for adding macOS / Windows native locations later.
|
|
|
|
Attribution: do not reference any third-party product by name in this file.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import copy
|
|
import logging
|
|
import os
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Dict, Optional
|
|
|
|
import yaml
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# POSIX default. Other-platform locations are a deliberate v2 item; when added,
|
|
# they belong ONLY inside get_managed_dir().
|
|
_DEFAULT_MANAGED_DIR = Path("/etc/hermes")
|
|
|
|
_CACHE_LOCK = threading.Lock()
|
|
# path_key -> (mtime_ns, size, parsed)
|
|
_CONFIG_CACHE: Dict[str, tuple] = {}
|
|
_ENV_CACHE: Dict[str, tuple] = {}
|
|
|
|
|
|
def _under_pytest() -> bool:
|
|
"""True when running inside the test suite.
|
|
|
|
Used to ignore the system default ``/etc/hermes`` during tests so a real
|
|
managed scope on a developer/CI box can't leak policy into the suite. Tests
|
|
that exercise managed scope set ``HERMES_MANAGED_DIR`` explicitly, which is
|
|
still honored (the override path below runs before this guard takes effect).
|
|
"""
|
|
return "PYTEST_CURRENT_TEST" in os.environ
|
|
|
|
|
|
def get_managed_dir() -> Optional[Path]:
|
|
"""Resolve the managed-scope directory, or None when no scope is present.
|
|
|
|
Resolution (highest priority first):
|
|
1. ``$HERMES_MANAGED_DIR`` — deployment/bootstrap path override (IT-only;
|
|
never persisted to any .env). Honored only when set to a non-empty value
|
|
AND the directory exists.
|
|
2. ``/etc/hermes`` — POSIX default, when it exists. Ignored under pytest so
|
|
a real system managed scope can't leak into the test suite.
|
|
|
|
A non-existent directory at either tier resolves to None (no managed scope),
|
|
which is the common case and must be cheap + side-effect-free.
|
|
"""
|
|
override = os.environ.get("HERMES_MANAGED_DIR", "").strip()
|
|
if override:
|
|
p = Path(override)
|
|
return p if p.is_dir() else None
|
|
if _under_pytest():
|
|
return None
|
|
return _DEFAULT_MANAGED_DIR if _DEFAULT_MANAGED_DIR.is_dir() else None
|
|
|
|
|
|
def invalidate_managed_cache() -> None:
|
|
"""Drop cached managed config/env. For tests and post-edit reloads."""
|
|
with _CACHE_LOCK:
|
|
_CONFIG_CACHE.clear()
|
|
_ENV_CACHE.clear()
|
|
|
|
|
|
def _cached_read(path: Path, cache: Dict[str, tuple], parse):
|
|
"""Shared (mtime_ns, size)-keyed read. Returns a deepcopy of the parsed value.
|
|
|
|
Returns ``None`` when the file is absent or fails to parse (fail-open). A
|
|
parse failure is logged LOUDLY — the admin needs to know their policy isn't
|
|
being applied — but never raises, so a malformed managed file can't brick
|
|
startup.
|
|
"""
|
|
try:
|
|
st = path.stat()
|
|
except OSError:
|
|
return None # absent
|
|
key = (st.st_mtime_ns, st.st_size)
|
|
path_key = str(path)
|
|
with _CACHE_LOCK:
|
|
hit = cache.get(path_key)
|
|
if hit is not None and hit[:2] == key:
|
|
return copy.deepcopy(hit[2])
|
|
try:
|
|
with open(path, encoding="utf-8") as f:
|
|
parsed = parse(f)
|
|
except Exception as exc: # noqa: BLE001 — fail-open, but LOUD
|
|
logger.warning(
|
|
"managed scope: failed to parse %s: %s — IGNORING this managed file. "
|
|
"Admin policy from this file is NOT being applied. Fix and restart.",
|
|
path,
|
|
exc,
|
|
)
|
|
return None
|
|
with _CACHE_LOCK:
|
|
cache[path_key] = (key[0], key[1], copy.deepcopy(parsed))
|
|
return parsed
|
|
|
|
|
|
def load_managed_config() -> dict:
|
|
"""Parsed managed config.yaml, or {} when absent/malformed (fail-open)."""
|
|
managed_dir = get_managed_dir()
|
|
if managed_dir is None:
|
|
return {}
|
|
parsed = _cached_read(
|
|
managed_dir / "config.yaml",
|
|
_CONFIG_CACHE,
|
|
lambda f: yaml.safe_load(f) or {},
|
|
)
|
|
return parsed if isinstance(parsed, dict) else {}
|
|
|
|
|
|
def load_managed_env() -> Dict[str, str]:
|
|
"""Parsed managed .env (KEY=VALUE), or {} when absent (fail-open)."""
|
|
managed_dir = get_managed_dir()
|
|
if managed_dir is None:
|
|
return {}
|
|
parsed = _cached_read(managed_dir / ".env", _ENV_CACHE, _parse_env)
|
|
return parsed if isinstance(parsed, dict) else {}
|
|
|
|
|
|
def _parse_env(f) -> Dict[str, str]:
|
|
out: Dict[str, str] = {}
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, _, value = line.partition("=")
|
|
out[key.strip()] = value.strip().strip("\"'")
|
|
return out
|
|
|
|
|
|
def _flatten_keys(d: dict, prefix: str = "") -> set:
|
|
keys: set = set()
|
|
for k, v in d.items():
|
|
dotted = f"{prefix}.{k}" if prefix else str(k)
|
|
if isinstance(v, dict) and v:
|
|
keys |= _flatten_keys(v, dotted)
|
|
else:
|
|
keys.add(dotted)
|
|
return keys
|
|
|
|
|
|
def managed_config_keys() -> set:
|
|
"""Dotted leaf keys pinned by the managed config (e.g. {'model.default'})."""
|
|
return _flatten_keys(load_managed_config())
|
|
|
|
|
|
def is_key_managed(dotted_key: str) -> bool:
|
|
"""True if the exact dotted config key is pinned by the managed layer."""
|
|
return dotted_key in managed_config_keys()
|
|
|
|
|
|
def is_env_managed(name: str) -> bool:
|
|
"""True if the env var name is pinned by the managed .env layer."""
|
|
return name in load_managed_env()
|