hermes-agent/hermes_cli/security_audit_startup.py

282 lines
9.5 KiB
Python

"""Startup security posture audit (warn-on-load, never blocks).
Surfaces dangerous host / deployment posture at process start so operators
get an at-a-glance "you're exposed" signal. Motivated by the June 2026
MCP-config persistence campaign, where compromised boxes ran as root with an
exposed dashboard / API server and no firewall — and nothing ever told the
operator. These checks are advisory: they emit ``logger.warning`` records
and return human-readable strings; they never raise or block startup.
Checks (each is independent and fail-safe — any internal error is swallowed
and simply yields no finding):
1. Running as root (POSIX uid 0).
2. SSH daemon present with password authentication enabled.
3. Running inside a container with no persistent volume mount over the
HERMES_HOME data dir (state is ephemeral — lost on container restart).
4. A network-accessible gateway listener (dashboard / API server) with no
authentication configured.
Cross-platform: the root and SSH checks are POSIX-only and no-op on Windows.
Everything is best-effort and read-only.
"""
from __future__ import annotations
import logging
import os
import re
from pathlib import Path
from typing import Any, Optional
logger = logging.getLogger("hermes.security_audit")
# Sentinel so the audit only runs once per process even if both the CLI and
# gateway startup paths call it.
_AUDIT_RAN = False
def _is_root() -> bool:
"""True when the process runs as POSIX uid 0. Always False on Windows."""
getuid = getattr(os, "geteuid", None) or getattr(os, "getuid", None)
if getuid is None:
return False
try:
return getuid() == 0
except Exception:
return False
def _running_as_root() -> Optional[str]:
if not _is_root():
return None
return (
"Running as ROOT. The agent's terminal/file tools execute with full "
"root privileges — a single prompt-injection or exposed endpoint is a "
"full host compromise. Run Hermes as an unprivileged user (or in a "
"sandboxed terminal backend / container with a non-root user)."
)
_SSHD_CONFIG_PATHS = (
"/etc/ssh/sshd_config",
)
_SSHD_CONFIG_DIR = "/etc/ssh/sshd_config.d"
def _iter_sshd_config_lines() -> list[str]:
"""Yield non-comment lines from sshd_config + its drop-in directory."""
lines: list[str] = []
paths: list[Path] = [Path(p) for p in _SSHD_CONFIG_PATHS]
try:
d = Path(_SSHD_CONFIG_DIR)
if d.is_dir():
paths.extend(sorted(d.glob("*.conf")))
except Exception:
pass
for p in paths:
try:
for raw in p.read_text(encoding="utf-8", errors="replace").splitlines():
stripped = raw.strip()
if stripped and not stripped.startswith("#"):
lines.append(stripped)
except Exception:
continue
return lines
def _ssh_password_auth_enabled() -> Optional[str]:
"""Warn when an SSH daemon has password authentication enabled.
Password auth on a public SSH daemon is the classic brute-force surface
and pairs badly with a root-capable agent box. POSIX-only; returns None
when there's no sshd config to read (e.g. Windows, or SSH not installed).
"""
lines = _iter_sshd_config_lines()
if not lines:
return None
# Last directive wins in sshd_config. Default (no directive) is "yes".
verdict = "yes"
saw_directive = False
for line in lines:
m = re.match(r"(?i)^PasswordAuthentication\s+(\w+)", line)
if m:
verdict = m.group(1).lower()
saw_directive = True
if verdict == "no":
return None
qualifier = "" if saw_directive else " (default — no explicit directive)"
return (
f"SSH password authentication is ENABLED{qualifier}. Password auth is "
"brute-forceable and dangerous on an internet-facing box. Set "
"'PasswordAuthentication no' in sshd_config and use key-based auth."
)
def _in_container() -> bool:
"""Best-effort container detection (Docker / Podman / generic OCI)."""
if os.path.exists("/.dockerenv"):
return True
if os.environ.get("HERMES_DESKTOP_CHILD_PID"):
return False # desktop child, not a server container
try:
cgroup = Path("/proc/1/cgroup").read_text(encoding="utf-8", errors="replace")
if any(tok in cgroup for tok in ("docker", "containerd", "kubepods", "libpod")):
return True
except Exception:
pass
return False
def _path_is_mounted(path: Path) -> bool:
"""True if *path* sits on (or under) a real mount point per /proc/mounts.
Container overlay/root filesystems are ephemeral; a bind/volume mount over
the data dir shows up as a distinct mount entry. We treat the path as
persisted when a mountpoint at or above it is NOT the container root
overlay.
"""
try:
target = path.resolve()
except Exception:
target = path
try:
mounts = Path("/proc/mounts").read_text(encoding="utf-8", errors="replace").splitlines()
except Exception:
return True # can't tell — fail safe (no warning)
best = None
best_fstype = ""
for line in mounts:
parts = line.split()
if len(parts) < 3:
continue
mountpoint, fstype = parts[1], parts[2]
try:
mp = Path(mountpoint)
except Exception:
continue
if mp == target or mp in target.parents:
# Longest matching mountpoint wins (most specific).
if best is None or len(str(mp)) > len(str(best)):
best = mp
best_fstype = fstype
if best is None:
return True
# overlay / tmpfs over the data dir = ephemeral container storage.
return best_fstype not in ("overlay", "tmpfs", "aufs")
def _container_no_volume_mount(hermes_home: Optional[Path]) -> Optional[str]:
if not _in_container():
return None
home = hermes_home or Path(
os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))
)
try:
if _path_is_mounted(home):
return None
except Exception:
return None
return (
f"Running in a container but the data dir ({home}) is NOT on a "
"persistent volume mount — sessions, memory, skills, and API keys are "
"ephemeral and lost on container restart. Mount a host volume over the "
"HERMES_HOME data directory."
)
def _network_listener_without_auth(config: Optional[dict]) -> list[str]:
"""Warn about network-accessible gateway listeners with no auth.
Covers the API server (no API_SERVER_KEY) and the dashboard (non-loopback
bind with no auth provider). Read-only against config + env; overlaps the
hard fail-closed guards but surfaces the posture proactively at startup.
"""
findings: list[str] = []
try:
from gateway.platforms.base import is_network_accessible
except Exception:
return findings
cfg = config or {}
# API server.
try:
plats = (cfg.get("platforms") or {})
api = plats.get("api_server") if isinstance(plats, dict) else None
if isinstance(api, dict) and api.get("enabled"):
extra = api.get("extra") or {}
host = extra.get("host") or os.environ.get("API_SERVER_HOST", "127.0.0.1")
key = extra.get("key") or os.environ.get("API_SERVER_KEY", "")
if is_network_accessible(str(host)) and not str(key).strip():
findings.append(
f"OpenAI-compatible API server is network-accessible ({host}) "
"with NO API_SERVER_KEY. It dispatches terminal-capable agent "
"work — an unauthenticated network endpoint is remote code "
"execution. Set a strong API_SERVER_KEY."
)
except Exception:
pass
return findings
def run_security_audit(
*, hermes_home: Optional[Path] = None, config: Optional[dict] = None
) -> list[str]:
"""Run all checks and return a list of human-readable warning strings.
Pure: no logging, no side effects. Each check is independently
fail-safe. Used directly by tests; the logging wrapper is
:func:`log_startup_security_warnings`.
"""
findings: list[str] = []
for check in (
_running_as_root,
_ssh_password_auth_enabled,
):
try:
r = check()
if r:
findings.append(r)
except Exception:
continue
try:
r = _container_no_volume_mount(hermes_home)
if r:
findings.append(r)
except Exception:
pass
try:
findings.extend(_network_listener_without_auth(config))
except Exception:
pass
return findings
def log_startup_security_warnings(
*,
hermes_home: Optional[Path] = None,
config: Optional[dict] = None,
force: bool = False,
) -> list[str]:
"""Run the audit once per process and emit each finding via logger.warning.
Returns the findings (also for tests). Never raises. Idempotent unless
``force=True`` (used by tests).
"""
global _AUDIT_RAN
if _AUDIT_RAN and not force:
return []
_AUDIT_RAN = True
try:
findings = run_security_audit(hermes_home=hermes_home, config=config)
except Exception:
return []
if findings:
logger.warning(
"Security posture audit found %d issue(s) — review your deployment:",
len(findings),
)
for i, f in enumerate(findings, 1):
logger.warning(" [security %d/%d] %s", i, len(findings), f)
return findings