mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-23 10:42:00 +00:00
282 lines
9.5 KiB
Python
282 lines
9.5 KiB
Python
"""Startup security posture audit (warn-on-load, never blocks).
|
|
|
|
Surfaces dangerous host / deployment posture at process start so operators
|
|
get an at-a-glance "you're exposed" signal. Motivated by the June 2026
|
|
MCP-config persistence campaign, where compromised boxes ran as root with an
|
|
exposed dashboard / API server and no firewall — and nothing ever told the
|
|
operator. These checks are advisory: they emit ``logger.warning`` records
|
|
and return human-readable strings; they never raise or block startup.
|
|
|
|
Checks (each is independent and fail-safe — any internal error is swallowed
|
|
and simply yields no finding):
|
|
|
|
1. Running as root (POSIX uid 0).
|
|
2. SSH daemon present with password authentication enabled.
|
|
3. Running inside a container with no persistent volume mount over the
|
|
HERMES_HOME data dir (state is ephemeral — lost on container restart).
|
|
4. A network-accessible gateway listener (dashboard / API server) with no
|
|
authentication configured.
|
|
|
|
Cross-platform: the root and SSH checks are POSIX-only and no-op on Windows.
|
|
Everything is best-effort and read-only.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
logger = logging.getLogger("hermes.security_audit")
|
|
|
|
# Sentinel so the audit only runs once per process even if both the CLI and
|
|
# gateway startup paths call it.
|
|
_AUDIT_RAN = False
|
|
|
|
|
|
def _is_root() -> bool:
|
|
"""True when the process runs as POSIX uid 0. Always False on Windows."""
|
|
getuid = getattr(os, "geteuid", None) or getattr(os, "getuid", None)
|
|
if getuid is None:
|
|
return False
|
|
try:
|
|
return getuid() == 0
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _running_as_root() -> Optional[str]:
|
|
if not _is_root():
|
|
return None
|
|
return (
|
|
"Running as ROOT. The agent's terminal/file tools execute with full "
|
|
"root privileges — a single prompt-injection or exposed endpoint is a "
|
|
"full host compromise. Run Hermes as an unprivileged user (or in a "
|
|
"sandboxed terminal backend / container with a non-root user)."
|
|
)
|
|
|
|
|
|
_SSHD_CONFIG_PATHS = (
|
|
"/etc/ssh/sshd_config",
|
|
)
|
|
_SSHD_CONFIG_DIR = "/etc/ssh/sshd_config.d"
|
|
|
|
|
|
def _iter_sshd_config_lines() -> list[str]:
|
|
"""Yield non-comment lines from sshd_config + its drop-in directory."""
|
|
lines: list[str] = []
|
|
paths: list[Path] = [Path(p) for p in _SSHD_CONFIG_PATHS]
|
|
try:
|
|
d = Path(_SSHD_CONFIG_DIR)
|
|
if d.is_dir():
|
|
paths.extend(sorted(d.glob("*.conf")))
|
|
except Exception:
|
|
pass
|
|
for p in paths:
|
|
try:
|
|
for raw in p.read_text(encoding="utf-8", errors="replace").splitlines():
|
|
stripped = raw.strip()
|
|
if stripped and not stripped.startswith("#"):
|
|
lines.append(stripped)
|
|
except Exception:
|
|
continue
|
|
return lines
|
|
|
|
|
|
def _ssh_password_auth_enabled() -> Optional[str]:
|
|
"""Warn when an SSH daemon has password authentication enabled.
|
|
|
|
Password auth on a public SSH daemon is the classic brute-force surface
|
|
and pairs badly with a root-capable agent box. POSIX-only; returns None
|
|
when there's no sshd config to read (e.g. Windows, or SSH not installed).
|
|
"""
|
|
lines = _iter_sshd_config_lines()
|
|
if not lines:
|
|
return None
|
|
# Last directive wins in sshd_config. Default (no directive) is "yes".
|
|
verdict = "yes"
|
|
saw_directive = False
|
|
for line in lines:
|
|
m = re.match(r"(?i)^PasswordAuthentication\s+(\w+)", line)
|
|
if m:
|
|
verdict = m.group(1).lower()
|
|
saw_directive = True
|
|
if verdict == "no":
|
|
return None
|
|
qualifier = "" if saw_directive else " (default — no explicit directive)"
|
|
return (
|
|
f"SSH password authentication is ENABLED{qualifier}. Password auth is "
|
|
"brute-forceable and dangerous on an internet-facing box. Set "
|
|
"'PasswordAuthentication no' in sshd_config and use key-based auth."
|
|
)
|
|
|
|
|
|
def _in_container() -> bool:
|
|
"""Best-effort container detection (Docker / Podman / generic OCI)."""
|
|
if os.path.exists("/.dockerenv"):
|
|
return True
|
|
if os.environ.get("HERMES_DESKTOP_CHILD_PID"):
|
|
return False # desktop child, not a server container
|
|
try:
|
|
cgroup = Path("/proc/1/cgroup").read_text(encoding="utf-8", errors="replace")
|
|
if any(tok in cgroup for tok in ("docker", "containerd", "kubepods", "libpod")):
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
|
|
def _path_is_mounted(path: Path) -> bool:
|
|
"""True if *path* sits on (or under) a real mount point per /proc/mounts.
|
|
|
|
Container overlay/root filesystems are ephemeral; a bind/volume mount over
|
|
the data dir shows up as a distinct mount entry. We treat the path as
|
|
persisted when a mountpoint at or above it is NOT the container root
|
|
overlay.
|
|
"""
|
|
try:
|
|
target = path.resolve()
|
|
except Exception:
|
|
target = path
|
|
try:
|
|
mounts = Path("/proc/mounts").read_text(encoding="utf-8", errors="replace").splitlines()
|
|
except Exception:
|
|
return True # can't tell — fail safe (no warning)
|
|
best = None
|
|
best_fstype = ""
|
|
for line in mounts:
|
|
parts = line.split()
|
|
if len(parts) < 3:
|
|
continue
|
|
mountpoint, fstype = parts[1], parts[2]
|
|
try:
|
|
mp = Path(mountpoint)
|
|
except Exception:
|
|
continue
|
|
if mp == target or mp in target.parents:
|
|
# Longest matching mountpoint wins (most specific).
|
|
if best is None or len(str(mp)) > len(str(best)):
|
|
best = mp
|
|
best_fstype = fstype
|
|
if best is None:
|
|
return True
|
|
# overlay / tmpfs over the data dir = ephemeral container storage.
|
|
return best_fstype not in ("overlay", "tmpfs", "aufs")
|
|
|
|
|
|
def _container_no_volume_mount(hermes_home: Optional[Path]) -> Optional[str]:
|
|
if not _in_container():
|
|
return None
|
|
home = hermes_home or Path(
|
|
os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))
|
|
)
|
|
try:
|
|
if _path_is_mounted(home):
|
|
return None
|
|
except Exception:
|
|
return None
|
|
return (
|
|
f"Running in a container but the data dir ({home}) is NOT on a "
|
|
"persistent volume mount — sessions, memory, skills, and API keys are "
|
|
"ephemeral and lost on container restart. Mount a host volume over the "
|
|
"HERMES_HOME data directory."
|
|
)
|
|
|
|
|
|
def _network_listener_without_auth(config: Optional[dict]) -> list[str]:
|
|
"""Warn about network-accessible gateway listeners with no auth.
|
|
|
|
Covers the API server (no API_SERVER_KEY) and the dashboard (non-loopback
|
|
bind with no auth provider). Read-only against config + env; overlaps the
|
|
hard fail-closed guards but surfaces the posture proactively at startup.
|
|
"""
|
|
findings: list[str] = []
|
|
try:
|
|
from gateway.platforms.base import is_network_accessible
|
|
except Exception:
|
|
return findings
|
|
|
|
cfg = config or {}
|
|
|
|
# API server.
|
|
try:
|
|
plats = (cfg.get("platforms") or {})
|
|
api = plats.get("api_server") if isinstance(plats, dict) else None
|
|
if isinstance(api, dict) and api.get("enabled"):
|
|
extra = api.get("extra") or {}
|
|
host = extra.get("host") or os.environ.get("API_SERVER_HOST", "127.0.0.1")
|
|
key = extra.get("key") or os.environ.get("API_SERVER_KEY", "")
|
|
if is_network_accessible(str(host)) and not str(key).strip():
|
|
findings.append(
|
|
f"OpenAI-compatible API server is network-accessible ({host}) "
|
|
"with NO API_SERVER_KEY. It dispatches terminal-capable agent "
|
|
"work — an unauthenticated network endpoint is remote code "
|
|
"execution. Set a strong API_SERVER_KEY."
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return findings
|
|
|
|
|
|
def run_security_audit(
|
|
*, hermes_home: Optional[Path] = None, config: Optional[dict] = None
|
|
) -> list[str]:
|
|
"""Run all checks and return a list of human-readable warning strings.
|
|
|
|
Pure: no logging, no side effects. Each check is independently
|
|
fail-safe. Used directly by tests; the logging wrapper is
|
|
:func:`log_startup_security_warnings`.
|
|
"""
|
|
findings: list[str] = []
|
|
for check in (
|
|
_running_as_root,
|
|
_ssh_password_auth_enabled,
|
|
):
|
|
try:
|
|
r = check()
|
|
if r:
|
|
findings.append(r)
|
|
except Exception:
|
|
continue
|
|
try:
|
|
r = _container_no_volume_mount(hermes_home)
|
|
if r:
|
|
findings.append(r)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
findings.extend(_network_listener_without_auth(config))
|
|
except Exception:
|
|
pass
|
|
return findings
|
|
|
|
|
|
def log_startup_security_warnings(
|
|
*,
|
|
hermes_home: Optional[Path] = None,
|
|
config: Optional[dict] = None,
|
|
force: bool = False,
|
|
) -> list[str]:
|
|
"""Run the audit once per process and emit each finding via logger.warning.
|
|
|
|
Returns the findings (also for tests). Never raises. Idempotent unless
|
|
``force=True`` (used by tests).
|
|
"""
|
|
global _AUDIT_RAN
|
|
if _AUDIT_RAN and not force:
|
|
return []
|
|
_AUDIT_RAN = True
|
|
try:
|
|
findings = run_security_audit(hermes_home=hermes_home, config=config)
|
|
except Exception:
|
|
return []
|
|
if findings:
|
|
logger.warning(
|
|
"Security posture audit found %d issue(s) — review your deployment:",
|
|
len(findings),
|
|
)
|
|
for i, f in enumerate(findings, 1):
|
|
logger.warning(" [security %d/%d] %s", i, len(findings), f)
|
|
return findings
|