feat(security): startup security posture audit (warn-on-load)

Surface dangerous host/deployment posture at gateway startup so operators get
the 'you're exposed' signal the June 2026 MCP-config persistence campaign
victims never had. Warn-only — never blocks startup, never raises.

Checks (each independently fail-safe):
- Running as root (POSIX uid 0)
- SSH daemon with PasswordAuthentication enabled (incl. the 'yes' default)
- Running in a container with no persistent volume mount over HERMES_HOME
- Network-accessible API server with no API_SERVER_KEY

New module hermes_cli/security_audit_startup.py; invoked once per process from
start_gateway() right after setup_logging(). Cross-platform (root/SSH checks
no-op on Windows). Idea: @Cthulhu.
This commit is contained in:
teknium1 2026-06-21 18:06:01 -07:00 committed by Teknium
parent eb51c180e6
commit f45ace9318
3 changed files with 463 additions and 0 deletions

View file

@ -17414,6 +17414,24 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
from hermes_logging import setup_logging, _safe_stderr
setup_logging(hermes_home=_hermes_home, mode="gateway")
# Startup security posture audit — warn-on-load, never blocks. Surfaces
# root / weak-SSH / ephemeral-container / unauthenticated-listener posture
# so operators get the "you're exposed" signal the June 2026 MCP-config
# persistence campaign victims never had.
try:
from hermes_cli.security_audit_startup import log_startup_security_warnings
_audit_cfg = None
try:
from hermes_cli.config import read_raw_config
_audit_cfg = read_raw_config()
except Exception:
_audit_cfg = None
log_startup_security_warnings(hermes_home=_hermes_home, config=_audit_cfg)
except Exception as _audit_exc:
logger.debug("Startup security audit failed (non-fatal): %s", _audit_exc)
# Optional stderr handler — level driven by -v/-q flags on the CLI.
# verbosity=None (-q/--quiet): no stderr output
# verbosity=0 (default): WARNING and above

View file

@ -0,0 +1,282 @@
"""Startup security posture audit (warn-on-load, never blocks).
Surfaces dangerous host / deployment posture at process start so operators
get an at-a-glance "you're exposed" signal. Motivated by the June 2026
MCP-config persistence campaign, where compromised boxes ran as root with an
exposed dashboard / API server and no firewall and nothing ever told the
operator. These checks are advisory: they emit ``logger.warning`` records
and return human-readable strings; they never raise or block startup.
Checks (each is independent and fail-safe any internal error is swallowed
and simply yields no finding):
1. Running as root (POSIX uid 0).
2. SSH daemon present with password authentication enabled.
3. Running inside a container with no persistent volume mount over the
HERMES_HOME data dir (state is ephemeral lost on container restart).
4. A network-accessible gateway listener (dashboard / API server) with no
authentication configured.
Cross-platform: the root and SSH checks are POSIX-only and no-op on Windows.
Everything is best-effort and read-only.
"""
from __future__ import annotations
import logging
import os
import re
from pathlib import Path
from typing import Any, Optional
logger = logging.getLogger("hermes.security_audit")
# Sentinel so the audit only runs once per process even if both the CLI and
# gateway startup paths call it.
_AUDIT_RAN = False
def _is_root() -> bool:
"""True when the process runs as POSIX uid 0. Always False on Windows."""
getuid = getattr(os, "geteuid", None) or getattr(os, "getuid", None)
if getuid is None:
return False
try:
return getuid() == 0
except Exception:
return False
def _running_as_root() -> Optional[str]:
if not _is_root():
return None
return (
"Running as ROOT. The agent's terminal/file tools execute with full "
"root privileges — a single prompt-injection or exposed endpoint is a "
"full host compromise. Run Hermes as an unprivileged user (or in a "
"sandboxed terminal backend / container with a non-root user)."
)
_SSHD_CONFIG_PATHS = (
"/etc/ssh/sshd_config",
)
_SSHD_CONFIG_DIR = "/etc/ssh/sshd_config.d"
def _iter_sshd_config_lines() -> list[str]:
"""Yield non-comment lines from sshd_config + its drop-in directory."""
lines: list[str] = []
paths: list[Path] = [Path(p) for p in _SSHD_CONFIG_PATHS]
try:
d = Path(_SSHD_CONFIG_DIR)
if d.is_dir():
paths.extend(sorted(d.glob("*.conf")))
except Exception:
pass
for p in paths:
try:
for raw in p.read_text(errors="replace").splitlines():
stripped = raw.strip()
if stripped and not stripped.startswith("#"):
lines.append(stripped)
except Exception:
continue
return lines
def _ssh_password_auth_enabled() -> Optional[str]:
"""Warn when an SSH daemon has password authentication enabled.
Password auth on a public SSH daemon is the classic brute-force surface
and pairs badly with a root-capable agent box. POSIX-only; returns None
when there's no sshd config to read (e.g. Windows, or SSH not installed).
"""
lines = _iter_sshd_config_lines()
if not lines:
return None
# Last directive wins in sshd_config. Default (no directive) is "yes".
verdict = "yes"
saw_directive = False
for line in lines:
m = re.match(r"(?i)^PasswordAuthentication\s+(\w+)", line)
if m:
verdict = m.group(1).lower()
saw_directive = True
if verdict == "no":
return None
qualifier = "" if saw_directive else " (default — no explicit directive)"
return (
f"SSH password authentication is ENABLED{qualifier}. Password auth is "
"brute-forceable and dangerous on an internet-facing box. Set "
"'PasswordAuthentication no' in sshd_config and use key-based auth."
)
def _in_container() -> bool:
"""Best-effort container detection (Docker / Podman / generic OCI)."""
if os.path.exists("/.dockerenv"):
return True
if os.environ.get("HERMES_DESKTOP_CHILD_PID"):
return False # desktop child, not a server container
try:
cgroup = Path("/proc/1/cgroup").read_text(errors="replace")
if any(tok in cgroup for tok in ("docker", "containerd", "kubepods", "libpod")):
return True
except Exception:
pass
return False
def _path_is_mounted(path: Path) -> bool:
"""True if *path* sits on (or under) a real mount point per /proc/mounts.
Container overlay/root filesystems are ephemeral; a bind/volume mount over
the data dir shows up as a distinct mount entry. We treat the path as
persisted when a mountpoint at or above it is NOT the container root
overlay.
"""
try:
target = path.resolve()
except Exception:
target = path
try:
mounts = Path("/proc/mounts").read_text(errors="replace").splitlines()
except Exception:
return True # can't tell — fail safe (no warning)
best = None
best_fstype = ""
for line in mounts:
parts = line.split()
if len(parts) < 3:
continue
mountpoint, fstype = parts[1], parts[2]
try:
mp = Path(mountpoint)
except Exception:
continue
if mp == target or mp in target.parents:
# Longest matching mountpoint wins (most specific).
if best is None or len(str(mp)) > len(str(best)):
best = mp
best_fstype = fstype
if best is None:
return True
# overlay / tmpfs over the data dir = ephemeral container storage.
return best_fstype not in ("overlay", "tmpfs", "aufs")
def _container_no_volume_mount(hermes_home: Optional[Path]) -> Optional[str]:
if not _in_container():
return None
home = hermes_home or Path(
os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))
)
try:
if _path_is_mounted(home):
return None
except Exception:
return None
return (
f"Running in a container but the data dir ({home}) is NOT on a "
"persistent volume mount — sessions, memory, skills, and API keys are "
"ephemeral and lost on container restart. Mount a host volume over the "
"HERMES_HOME data directory."
)
def _network_listener_without_auth(config: Optional[dict]) -> list[str]:
"""Warn about network-accessible gateway listeners with no auth.
Covers the API server (no API_SERVER_KEY) and the dashboard (non-loopback
bind with no auth provider). Read-only against config + env; overlaps the
hard fail-closed guards but surfaces the posture proactively at startup.
"""
findings: list[str] = []
try:
from gateway.platforms.base import is_network_accessible
except Exception:
return findings
cfg = config or {}
# API server.
try:
plats = (cfg.get("platforms") or {})
api = plats.get("api_server") if isinstance(plats, dict) else None
if isinstance(api, dict) and api.get("enabled"):
extra = api.get("extra") or {}
host = extra.get("host") or os.environ.get("API_SERVER_HOST", "127.0.0.1")
key = extra.get("key") or os.environ.get("API_SERVER_KEY", "")
if is_network_accessible(str(host)) and not str(key).strip():
findings.append(
f"OpenAI-compatible API server is network-accessible ({host}) "
"with NO API_SERVER_KEY. It dispatches terminal-capable agent "
"work — an unauthenticated network endpoint is remote code "
"execution. Set a strong API_SERVER_KEY."
)
except Exception:
pass
return findings
def run_security_audit(
*, hermes_home: Optional[Path] = None, config: Optional[dict] = None
) -> list[str]:
"""Run all checks and return a list of human-readable warning strings.
Pure: no logging, no side effects. Each check is independently
fail-safe. Used directly by tests; the logging wrapper is
:func:`log_startup_security_warnings`.
"""
findings: list[str] = []
for check in (
_running_as_root,
_ssh_password_auth_enabled,
):
try:
r = check()
if r:
findings.append(r)
except Exception:
continue
try:
r = _container_no_volume_mount(hermes_home)
if r:
findings.append(r)
except Exception:
pass
try:
findings.extend(_network_listener_without_auth(config))
except Exception:
pass
return findings
def log_startup_security_warnings(
*,
hermes_home: Optional[Path] = None,
config: Optional[dict] = None,
force: bool = False,
) -> list[str]:
"""Run the audit once per process and emit each finding via logger.warning.
Returns the findings (also for tests). Never raises. Idempotent unless
``force=True`` (used by tests).
"""
global _AUDIT_RAN
if _AUDIT_RAN and not force:
return []
_AUDIT_RAN = True
try:
findings = run_security_audit(hermes_home=hermes_home, config=config)
except Exception:
return []
if findings:
logger.warning(
"Security posture audit found %d issue(s) — review your deployment:",
len(findings),
)
for i, f in enumerate(findings, 1):
logger.warning(" [security %d/%d] %s", i, len(findings), f)
return findings

View file

@ -0,0 +1,163 @@
"""Tests for the startup security posture audit (hermes_cli.security_audit_startup)."""
from __future__ import annotations
import os
from pathlib import Path
import pytest
import hermes_cli.security_audit_startup as audit
@pytest.fixture(autouse=True)
def _reset_audit_sentinel():
audit._AUDIT_RAN = False
yield
audit._AUDIT_RAN = False
# ── root check ────────────────────────────────────────────────────────────
def test_root_check_flags_uid_zero(monkeypatch):
monkeypatch.setattr(audit, "_is_root", lambda: True)
msg = audit._running_as_root()
assert msg and "ROOT" in msg
def test_root_check_silent_for_non_root(monkeypatch):
monkeypatch.setattr(audit, "_is_root", lambda: False)
assert audit._running_as_root() is None
# ── SSH password-auth check ─────────────────────────────────────────────────
def test_ssh_password_auth_enabled_explicit_yes(monkeypatch):
monkeypatch.setattr(
audit, "_iter_sshd_config_lines",
lambda: ["PasswordAuthentication yes", "PermitRootLogin no"],
)
msg = audit._ssh_password_auth_enabled()
assert msg and "password authentication is enabled" in msg.lower()
def test_ssh_password_auth_disabled(monkeypatch):
monkeypatch.setattr(
audit, "_iter_sshd_config_lines",
lambda: ["PasswordAuthentication no"],
)
assert audit._ssh_password_auth_enabled() is None
def test_ssh_password_auth_default_is_yes(monkeypatch):
"""No explicit directive → sshd default is 'yes' → warn (with qualifier)."""
monkeypatch.setattr(
audit, "_iter_sshd_config_lines",
lambda: ["PermitRootLogin prohibit-password"],
)
msg = audit._ssh_password_auth_enabled()
assert msg and "default" in msg.lower()
def test_ssh_check_silent_when_no_config(monkeypatch):
"""No sshd config readable (e.g. Windows / SSH not installed) → no finding."""
monkeypatch.setattr(audit, "_iter_sshd_config_lines", lambda: [])
assert audit._ssh_password_auth_enabled() is None
def test_ssh_last_directive_wins(monkeypatch):
monkeypatch.setattr(
audit, "_iter_sshd_config_lines",
lambda: ["PasswordAuthentication yes", "PasswordAuthentication no"],
)
assert audit._ssh_password_auth_enabled() is None
# ── container / volume-mount check ──────────────────────────────────────────
def test_container_no_mount_flags(monkeypatch, tmp_path):
monkeypatch.setattr(audit, "_in_container", lambda: True)
monkeypatch.setattr(audit, "_path_is_mounted", lambda p: False)
msg = audit._container_no_volume_mount(tmp_path / ".hermes")
assert msg and "persistent volume" in msg
def test_container_with_mount_silent(monkeypatch, tmp_path):
monkeypatch.setattr(audit, "_in_container", lambda: True)
monkeypatch.setattr(audit, "_path_is_mounted", lambda p: True)
assert audit._container_no_volume_mount(tmp_path / ".hermes") is None
def test_not_in_container_silent(monkeypatch, tmp_path):
monkeypatch.setattr(audit, "_in_container", lambda: False)
assert audit._container_no_volume_mount(tmp_path / ".hermes") is None
# ── network listener without auth ──────────────────────────────────────────
def test_api_server_network_no_key_flags(monkeypatch):
monkeypatch.delenv("API_SERVER_KEY", raising=False)
cfg = {"platforms": {"api_server": {"enabled": True, "extra": {"host": "0.0.0.0", "key": ""}}}}
findings = audit._network_listener_without_auth(cfg)
assert any("NO API_SERVER_KEY" in f for f in findings)
def test_api_server_loopback_silent(monkeypatch):
cfg = {"platforms": {"api_server": {"enabled": True, "extra": {"host": "127.0.0.1", "key": ""}}}}
assert audit._network_listener_without_auth(cfg) == []
def test_api_server_with_key_silent(monkeypatch):
cfg = {"platforms": {"api_server": {"enabled": True, "extra": {"host": "0.0.0.0", "key": "a-strong-key-1234567890"}}}}
assert audit._network_listener_without_auth(cfg) == []
# ── orchestration + logging ─────────────────────────────────────────────────
def test_run_security_audit_aggregates(monkeypatch, tmp_path):
monkeypatch.setattr(audit, "_is_root", lambda: True)
monkeypatch.setattr(audit, "_iter_sshd_config_lines", lambda: ["PasswordAuthentication yes"])
monkeypatch.setattr(audit, "_in_container", lambda: False)
findings = audit.run_security_audit(hermes_home=tmp_path, config={})
assert len(findings) == 2 # root + ssh
def test_run_security_audit_clean_posture(monkeypatch, tmp_path):
monkeypatch.setattr(audit, "_is_root", lambda: False)
monkeypatch.setattr(audit, "_iter_sshd_config_lines", lambda: ["PasswordAuthentication no"])
monkeypatch.setattr(audit, "_in_container", lambda: False)
assert audit.run_security_audit(hermes_home=tmp_path, config={}) == []
def test_log_startup_security_warnings_emits_and_is_idempotent(monkeypatch, tmp_path, caplog):
import logging
monkeypatch.setattr(audit, "_is_root", lambda: True)
monkeypatch.setattr(audit, "_iter_sshd_config_lines", lambda: [])
monkeypatch.setattr(audit, "_in_container", lambda: False)
with caplog.at_level(logging.WARNING, logger="hermes.security_audit"):
first = audit.log_startup_security_warnings(hermes_home=tmp_path, config={})
assert len(first) == 1
assert any("ROOT" in r.message for r in caplog.records)
# Second call is a no-op (idempotent within a process) unless forced.
second = audit.log_startup_security_warnings(hermes_home=tmp_path, config={})
assert second == []
forced = audit.log_startup_security_warnings(hermes_home=tmp_path, config={}, force=True)
assert len(forced) == 1
def test_audit_never_raises_on_broken_check(monkeypatch, tmp_path):
def _boom():
raise RuntimeError("boom")
monkeypatch.setattr(audit, "_is_root", _boom)
# Must not propagate — the broken check is swallowed, others still run.
findings = audit.run_security_audit(hermes_home=tmp_path, config={})
assert isinstance(findings, list)