From f45ace9318be7f78dd9250afc67e806908767fa8 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sun, 21 Jun 2026 18:06:01 -0700 Subject: [PATCH] feat(security): startup security posture audit (warn-on-load) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Surface dangerous host/deployment posture at gateway startup so operators get the 'you're exposed' signal the June 2026 MCP-config persistence campaign victims never had. Warn-only — never blocks startup, never raises. Checks (each independently fail-safe): - Running as root (POSIX uid 0) - SSH daemon with PasswordAuthentication enabled (incl. the 'yes' default) - Running in a container with no persistent volume mount over HERMES_HOME - Network-accessible API server with no API_SERVER_KEY New module hermes_cli/security_audit_startup.py; invoked once per process from start_gateway() right after setup_logging(). Cross-platform (root/SSH checks no-op on Windows). Idea: @Cthulhu. --- gateway/run.py | 18 ++ hermes_cli/security_audit_startup.py | 282 ++++++++++++++++++ .../hermes_cli/test_security_audit_startup.py | 163 ++++++++++ 3 files changed, 463 insertions(+) create mode 100644 hermes_cli/security_audit_startup.py create mode 100644 tests/hermes_cli/test_security_audit_startup.py diff --git a/gateway/run.py b/gateway/run.py index 622881b83f5..3d822c7dcef 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -17414,6 +17414,24 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = from hermes_logging import setup_logging, _safe_stderr setup_logging(hermes_home=_hermes_home, mode="gateway") + # Startup security posture audit — warn-on-load, never blocks. Surfaces + # root / weak-SSH / ephemeral-container / unauthenticated-listener posture + # so operators get the "you're exposed" signal the June 2026 MCP-config + # persistence campaign victims never had. + try: + from hermes_cli.security_audit_startup import log_startup_security_warnings + + _audit_cfg = None + try: + from hermes_cli.config import read_raw_config + + _audit_cfg = read_raw_config() + except Exception: + _audit_cfg = None + log_startup_security_warnings(hermes_home=_hermes_home, config=_audit_cfg) + except Exception as _audit_exc: + logger.debug("Startup security audit failed (non-fatal): %s", _audit_exc) + # Optional stderr handler — level driven by -v/-q flags on the CLI. # verbosity=None (-q/--quiet): no stderr output # verbosity=0 (default): WARNING and above diff --git a/hermes_cli/security_audit_startup.py b/hermes_cli/security_audit_startup.py new file mode 100644 index 00000000000..a28daa633cd --- /dev/null +++ b/hermes_cli/security_audit_startup.py @@ -0,0 +1,282 @@ +"""Startup security posture audit (warn-on-load, never blocks). + +Surfaces dangerous host / deployment posture at process start so operators +get an at-a-glance "you're exposed" signal. Motivated by the June 2026 +MCP-config persistence campaign, where compromised boxes ran as root with an +exposed dashboard / API server and no firewall — and nothing ever told the +operator. These checks are advisory: they emit ``logger.warning`` records +and return human-readable strings; they never raise or block startup. + +Checks (each is independent and fail-safe — any internal error is swallowed +and simply yields no finding): + +1. Running as root (POSIX uid 0). +2. SSH daemon present with password authentication enabled. +3. Running inside a container with no persistent volume mount over the + HERMES_HOME data dir (state is ephemeral — lost on container restart). +4. A network-accessible gateway listener (dashboard / API server) with no + authentication configured. + +Cross-platform: the root and SSH checks are POSIX-only and no-op on Windows. +Everything is best-effort and read-only. +""" +from __future__ import annotations + +import logging +import os +import re +from pathlib import Path +from typing import Any, Optional + +logger = logging.getLogger("hermes.security_audit") + +# Sentinel so the audit only runs once per process even if both the CLI and +# gateway startup paths call it. +_AUDIT_RAN = False + + +def _is_root() -> bool: + """True when the process runs as POSIX uid 0. Always False on Windows.""" + getuid = getattr(os, "geteuid", None) or getattr(os, "getuid", None) + if getuid is None: + return False + try: + return getuid() == 0 + except Exception: + return False + + +def _running_as_root() -> Optional[str]: + if not _is_root(): + return None + return ( + "Running as ROOT. The agent's terminal/file tools execute with full " + "root privileges — a single prompt-injection or exposed endpoint is a " + "full host compromise. Run Hermes as an unprivileged user (or in a " + "sandboxed terminal backend / container with a non-root user)." + ) + + +_SSHD_CONFIG_PATHS = ( + "/etc/ssh/sshd_config", +) +_SSHD_CONFIG_DIR = "/etc/ssh/sshd_config.d" + + +def _iter_sshd_config_lines() -> list[str]: + """Yield non-comment lines from sshd_config + its drop-in directory.""" + lines: list[str] = [] + paths: list[Path] = [Path(p) for p in _SSHD_CONFIG_PATHS] + try: + d = Path(_SSHD_CONFIG_DIR) + if d.is_dir(): + paths.extend(sorted(d.glob("*.conf"))) + except Exception: + pass + for p in paths: + try: + for raw in p.read_text(errors="replace").splitlines(): + stripped = raw.strip() + if stripped and not stripped.startswith("#"): + lines.append(stripped) + except Exception: + continue + return lines + + +def _ssh_password_auth_enabled() -> Optional[str]: + """Warn when an SSH daemon has password authentication enabled. + + Password auth on a public SSH daemon is the classic brute-force surface + and pairs badly with a root-capable agent box. POSIX-only; returns None + when there's no sshd config to read (e.g. Windows, or SSH not installed). + """ + lines = _iter_sshd_config_lines() + if not lines: + return None + # Last directive wins in sshd_config. Default (no directive) is "yes". + verdict = "yes" + saw_directive = False + for line in lines: + m = re.match(r"(?i)^PasswordAuthentication\s+(\w+)", line) + if m: + verdict = m.group(1).lower() + saw_directive = True + if verdict == "no": + return None + qualifier = "" if saw_directive else " (default — no explicit directive)" + return ( + f"SSH password authentication is ENABLED{qualifier}. Password auth is " + "brute-forceable and dangerous on an internet-facing box. Set " + "'PasswordAuthentication no' in sshd_config and use key-based auth." + ) + + +def _in_container() -> bool: + """Best-effort container detection (Docker / Podman / generic OCI).""" + if os.path.exists("/.dockerenv"): + return True + if os.environ.get("HERMES_DESKTOP_CHILD_PID"): + return False # desktop child, not a server container + try: + cgroup = Path("/proc/1/cgroup").read_text(errors="replace") + if any(tok in cgroup for tok in ("docker", "containerd", "kubepods", "libpod")): + return True + except Exception: + pass + return False + + +def _path_is_mounted(path: Path) -> bool: + """True if *path* sits on (or under) a real mount point per /proc/mounts. + + Container overlay/root filesystems are ephemeral; a bind/volume mount over + the data dir shows up as a distinct mount entry. We treat the path as + persisted when a mountpoint at or above it is NOT the container root + overlay. + """ + try: + target = path.resolve() + except Exception: + target = path + try: + mounts = Path("/proc/mounts").read_text(errors="replace").splitlines() + except Exception: + return True # can't tell — fail safe (no warning) + best = None + best_fstype = "" + for line in mounts: + parts = line.split() + if len(parts) < 3: + continue + mountpoint, fstype = parts[1], parts[2] + try: + mp = Path(mountpoint) + except Exception: + continue + if mp == target or mp in target.parents: + # Longest matching mountpoint wins (most specific). + if best is None or len(str(mp)) > len(str(best)): + best = mp + best_fstype = fstype + if best is None: + return True + # overlay / tmpfs over the data dir = ephemeral container storage. + return best_fstype not in ("overlay", "tmpfs", "aufs") + + +def _container_no_volume_mount(hermes_home: Optional[Path]) -> Optional[str]: + if not _in_container(): + return None + home = hermes_home or Path( + os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes")) + ) + try: + if _path_is_mounted(home): + return None + except Exception: + return None + return ( + f"Running in a container but the data dir ({home}) is NOT on a " + "persistent volume mount — sessions, memory, skills, and API keys are " + "ephemeral and lost on container restart. Mount a host volume over the " + "HERMES_HOME data directory." + ) + + +def _network_listener_without_auth(config: Optional[dict]) -> list[str]: + """Warn about network-accessible gateway listeners with no auth. + + Covers the API server (no API_SERVER_KEY) and the dashboard (non-loopback + bind with no auth provider). Read-only against config + env; overlaps the + hard fail-closed guards but surfaces the posture proactively at startup. + """ + findings: list[str] = [] + try: + from gateway.platforms.base import is_network_accessible + except Exception: + return findings + + cfg = config or {} + + # API server. + try: + plats = (cfg.get("platforms") or {}) + api = plats.get("api_server") if isinstance(plats, dict) else None + if isinstance(api, dict) and api.get("enabled"): + extra = api.get("extra") or {} + host = extra.get("host") or os.environ.get("API_SERVER_HOST", "127.0.0.1") + key = extra.get("key") or os.environ.get("API_SERVER_KEY", "") + if is_network_accessible(str(host)) and not str(key).strip(): + findings.append( + f"OpenAI-compatible API server is network-accessible ({host}) " + "with NO API_SERVER_KEY. It dispatches terminal-capable agent " + "work — an unauthenticated network endpoint is remote code " + "execution. Set a strong API_SERVER_KEY." + ) + except Exception: + pass + + return findings + + +def run_security_audit( + *, hermes_home: Optional[Path] = None, config: Optional[dict] = None +) -> list[str]: + """Run all checks and return a list of human-readable warning strings. + + Pure: no logging, no side effects. Each check is independently + fail-safe. Used directly by tests; the logging wrapper is + :func:`log_startup_security_warnings`. + """ + findings: list[str] = [] + for check in ( + _running_as_root, + _ssh_password_auth_enabled, + ): + try: + r = check() + if r: + findings.append(r) + except Exception: + continue + try: + r = _container_no_volume_mount(hermes_home) + if r: + findings.append(r) + except Exception: + pass + try: + findings.extend(_network_listener_without_auth(config)) + except Exception: + pass + return findings + + +def log_startup_security_warnings( + *, + hermes_home: Optional[Path] = None, + config: Optional[dict] = None, + force: bool = False, +) -> list[str]: + """Run the audit once per process and emit each finding via logger.warning. + + Returns the findings (also for tests). Never raises. Idempotent unless + ``force=True`` (used by tests). + """ + global _AUDIT_RAN + if _AUDIT_RAN and not force: + return [] + _AUDIT_RAN = True + try: + findings = run_security_audit(hermes_home=hermes_home, config=config) + except Exception: + return [] + if findings: + logger.warning( + "Security posture audit found %d issue(s) — review your deployment:", + len(findings), + ) + for i, f in enumerate(findings, 1): + logger.warning(" [security %d/%d] %s", i, len(findings), f) + return findings diff --git a/tests/hermes_cli/test_security_audit_startup.py b/tests/hermes_cli/test_security_audit_startup.py new file mode 100644 index 00000000000..a0001fb6cbd --- /dev/null +++ b/tests/hermes_cli/test_security_audit_startup.py @@ -0,0 +1,163 @@ +"""Tests for the startup security posture audit (hermes_cli.security_audit_startup).""" + +from __future__ import annotations + +import os +from pathlib import Path + +import pytest + +import hermes_cli.security_audit_startup as audit + + +@pytest.fixture(autouse=True) +def _reset_audit_sentinel(): + audit._AUDIT_RAN = False + yield + audit._AUDIT_RAN = False + + +# ── root check ──────────────────────────────────────────────────────────── + + +def test_root_check_flags_uid_zero(monkeypatch): + monkeypatch.setattr(audit, "_is_root", lambda: True) + msg = audit._running_as_root() + assert msg and "ROOT" in msg + + +def test_root_check_silent_for_non_root(monkeypatch): + monkeypatch.setattr(audit, "_is_root", lambda: False) + assert audit._running_as_root() is None + + +# ── SSH password-auth check ───────────────────────────────────────────────── + + +def test_ssh_password_auth_enabled_explicit_yes(monkeypatch): + monkeypatch.setattr( + audit, "_iter_sshd_config_lines", + lambda: ["PasswordAuthentication yes", "PermitRootLogin no"], + ) + msg = audit._ssh_password_auth_enabled() + assert msg and "password authentication is enabled" in msg.lower() + + +def test_ssh_password_auth_disabled(monkeypatch): + monkeypatch.setattr( + audit, "_iter_sshd_config_lines", + lambda: ["PasswordAuthentication no"], + ) + assert audit._ssh_password_auth_enabled() is None + + +def test_ssh_password_auth_default_is_yes(monkeypatch): + """No explicit directive → sshd default is 'yes' → warn (with qualifier).""" + monkeypatch.setattr( + audit, "_iter_sshd_config_lines", + lambda: ["PermitRootLogin prohibit-password"], + ) + msg = audit._ssh_password_auth_enabled() + assert msg and "default" in msg.lower() + + +def test_ssh_check_silent_when_no_config(monkeypatch): + """No sshd config readable (e.g. Windows / SSH not installed) → no finding.""" + monkeypatch.setattr(audit, "_iter_sshd_config_lines", lambda: []) + assert audit._ssh_password_auth_enabled() is None + + +def test_ssh_last_directive_wins(monkeypatch): + monkeypatch.setattr( + audit, "_iter_sshd_config_lines", + lambda: ["PasswordAuthentication yes", "PasswordAuthentication no"], + ) + assert audit._ssh_password_auth_enabled() is None + + +# ── container / volume-mount check ────────────────────────────────────────── + + +def test_container_no_mount_flags(monkeypatch, tmp_path): + monkeypatch.setattr(audit, "_in_container", lambda: True) + monkeypatch.setattr(audit, "_path_is_mounted", lambda p: False) + msg = audit._container_no_volume_mount(tmp_path / ".hermes") + assert msg and "persistent volume" in msg + + +def test_container_with_mount_silent(monkeypatch, tmp_path): + monkeypatch.setattr(audit, "_in_container", lambda: True) + monkeypatch.setattr(audit, "_path_is_mounted", lambda p: True) + assert audit._container_no_volume_mount(tmp_path / ".hermes") is None + + +def test_not_in_container_silent(monkeypatch, tmp_path): + monkeypatch.setattr(audit, "_in_container", lambda: False) + assert audit._container_no_volume_mount(tmp_path / ".hermes") is None + + +# ── network listener without auth ────────────────────────────────────────── + + +def test_api_server_network_no_key_flags(monkeypatch): + monkeypatch.delenv("API_SERVER_KEY", raising=False) + cfg = {"platforms": {"api_server": {"enabled": True, "extra": {"host": "0.0.0.0", "key": ""}}}} + findings = audit._network_listener_without_auth(cfg) + assert any("NO API_SERVER_KEY" in f for f in findings) + + +def test_api_server_loopback_silent(monkeypatch): + cfg = {"platforms": {"api_server": {"enabled": True, "extra": {"host": "127.0.0.1", "key": ""}}}} + assert audit._network_listener_without_auth(cfg) == [] + + +def test_api_server_with_key_silent(monkeypatch): + cfg = {"platforms": {"api_server": {"enabled": True, "extra": {"host": "0.0.0.0", "key": "a-strong-key-1234567890"}}}} + assert audit._network_listener_without_auth(cfg) == [] + + +# ── orchestration + logging ───────────────────────────────────────────────── + + +def test_run_security_audit_aggregates(monkeypatch, tmp_path): + monkeypatch.setattr(audit, "_is_root", lambda: True) + monkeypatch.setattr(audit, "_iter_sshd_config_lines", lambda: ["PasswordAuthentication yes"]) + monkeypatch.setattr(audit, "_in_container", lambda: False) + findings = audit.run_security_audit(hermes_home=tmp_path, config={}) + assert len(findings) == 2 # root + ssh + + +def test_run_security_audit_clean_posture(monkeypatch, tmp_path): + monkeypatch.setattr(audit, "_is_root", lambda: False) + monkeypatch.setattr(audit, "_iter_sshd_config_lines", lambda: ["PasswordAuthentication no"]) + monkeypatch.setattr(audit, "_in_container", lambda: False) + assert audit.run_security_audit(hermes_home=tmp_path, config={}) == [] + + +def test_log_startup_security_warnings_emits_and_is_idempotent(monkeypatch, tmp_path, caplog): + import logging + + monkeypatch.setattr(audit, "_is_root", lambda: True) + monkeypatch.setattr(audit, "_iter_sshd_config_lines", lambda: []) + monkeypatch.setattr(audit, "_in_container", lambda: False) + + with caplog.at_level(logging.WARNING, logger="hermes.security_audit"): + first = audit.log_startup_security_warnings(hermes_home=tmp_path, config={}) + assert len(first) == 1 + assert any("ROOT" in r.message for r in caplog.records) + + # Second call is a no-op (idempotent within a process) unless forced. + second = audit.log_startup_security_warnings(hermes_home=tmp_path, config={}) + assert second == [] + forced = audit.log_startup_security_warnings(hermes_home=tmp_path, config={}, force=True) + assert len(forced) == 1 + + +def test_audit_never_raises_on_broken_check(monkeypatch, tmp_path): + def _boom(): + raise RuntimeError("boom") + + monkeypatch.setattr(audit, "_is_root", _boom) + # Must not propagate — the broken check is swallowed, others still run. + findings = audit.run_security_audit(hermes_home=tmp_path, config={}) + assert isinstance(findings, list)