feat(security): on-demand supply-chain audit via OSV.dev (#31460)

Adds 'hermes security audit' — a one-shot vulnerability scan against
OSV.dev covering three surfaces a Hermes user actually controls:

  1. The running Python's installed PyPI dists (importlib.metadata)
  2. Plugin requirements.txt / pyproject.toml pins under ~/.hermes/plugins/
  3. Pinned npx/uvx MCP servers in config.yaml

Zero new dependencies (stdlib urllib + importlib.metadata + tomllib +
concurrent.futures). No auth required for OSV's public batch API.

Flags: --json, --fail-on {low,moderate,high,critical} (default: critical),
       --skip-venv, --skip-plugins, --skip-mcp

Output groups findings by source, sorts by severity descending, surfaces
fixed-versions inline. Exit 1 when any finding meets the --fail-on tier.

Deliberately out of scope: globally-installed pip/npm, editor/browser
extensions, daily background scans, auto-blocking of installs. The audit
is on-demand by design — daily scans become noise the user trains
themselves to ignore.
This commit is contained in:
Teknium 2026-05-24 15:15:16 -07:00 committed by GitHub
parent 8065e70274
commit 7ab1677362
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 943 additions and 1 deletions

View file

@ -6182,6 +6182,19 @@ def cmd_doctor(args):
run_doctor(args)
def cmd_security(args):
"""Dispatch `hermes security <subcmd>`."""
sub = getattr(args, "security_command", None)
if sub in ("audit", None):
from hermes_cli.security_audit import cmd_security_audit
# Default subcommand is `audit` when no subcmd is given.
code = cmd_security_audit(args)
sys.exit(int(code or 0))
print(f"unknown security subcommand: {sub}", file=sys.stderr)
sys.exit(2)
def cmd_dump(args):
"""Dump setup summary for support/debugging."""
from hermes_cli.dump import run_dump
@ -9842,6 +9855,7 @@ def _coalesce_session_name_args(argv: list) -> list:
"honcho",
"claw",
"plugins",
"security",
"acp",
"webhook",
"memory",
@ -10682,7 +10696,7 @@ _BUILTIN_SUBCOMMANDS = frozenset(
"model", "pairing", "plugins", "portal", "postinstall", "profile", "proxy",
"send", "sessions", "setup",
"skills", "slack", "status", "tools", "uninstall", "update",
"version", "webhook", "whatsapp", "chat", "secrets",
"version", "webhook", "whatsapp", "chat", "secrets", "security",
# Help-ish invocations — plugin commands not being listed in
# top-level --help is an acceptable trade-off for skipping an
# expensive eager import of every bundled plugin module.
@ -12002,6 +12016,58 @@ def main():
)
doctor_parser.set_defaults(func=cmd_doctor)
# =========================================================================
# security command — on-demand supply-chain audit
# =========================================================================
security_parser = subparsers.add_parser(
"security",
help="Supply-chain audit (OSV.dev) for venv, plugins, and MCP servers",
description=(
"On-demand vulnerability scan against OSV.dev. Covers the Hermes "
"venv (installed PyPI dists), Python deps declared by plugins under "
"~/.hermes/plugins/, and pinned npx/uvx MCP servers in config.yaml. "
"Does NOT scan globally-installed packages or editor/browser extensions."
),
)
security_subparsers = security_parser.add_subparsers(
dest="security_command",
metavar="<subcommand>",
)
audit_parser = security_subparsers.add_parser(
"audit",
help="Run a one-shot supply-chain audit",
description="Query OSV.dev for known vulnerabilities in installed components.",
)
audit_parser.add_argument(
"--json",
action="store_true",
help="Emit machine-readable JSON instead of human-readable text",
)
audit_parser.add_argument(
"--fail-on",
default="critical",
choices=["low", "moderate", "high", "critical"],
help="Exit non-zero when any finding meets this severity (default: critical)",
)
audit_parser.add_argument(
"--skip-venv",
action="store_true",
help="Skip scanning the Hermes Python venv",
)
audit_parser.add_argument(
"--skip-plugins",
action="store_true",
help="Skip scanning plugin requirements files",
)
audit_parser.add_argument(
"--skip-mcp",
action="store_true",
help="Skip scanning pinned MCP servers in config.yaml",
)
audit_parser.set_defaults(func=cmd_security)
security_parser.set_defaults(func=cmd_security)
# =========================================================================
# dump command
# =========================================================================

View file

@ -0,0 +1,576 @@
"""On-demand supply-chain audit for Hermes Agent installs.
Scans three surfaces a Hermes user actually controls and we can map to
upstream advisories without auth or extra binaries:
1. The Hermes venv (every PyPI dist via ``importlib.metadata``).
2. Python deps declared by user-installed plugins under ``~/.hermes/plugins``
(``requirements.txt`` + ``pyproject.toml`` best-effort pin extraction).
3. MCP servers wired in ``config.yaml`` whose ``command/args`` look like
``npx -y <pkg>@<ver>`` or ``uvx <pkg>==<ver>``.
Vulnerabilities are looked up against OSV.dev (``api.osv.dev/v1/querybatch``
+ ``/v1/vulns/{id}``). Single-shot, on-demand, never daily see the design
notes in ``references/security-disclosure-triage.md``.
Out of scope on purpose: global pip/npm, editor/browser extensions,
daily background scans, auto-blocking installs.
"""
from __future__ import annotations
import argparse
import concurrent.futures
import json
import re
import sys
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, Optional
from hermes_constants import get_hermes_home
OSV_BATCH_URL = "https://api.osv.dev/v1/querybatch"
OSV_VULN_URL = "https://api.osv.dev/v1/vulns/{vid}"
OSV_BATCH_MAX = 1000 # OSV documented hard cap per request
HTTP_TIMEOUT = 20
DETAIL_PARALLELISM = 8
# Severity ordering for --fail-on gating. UNKNOWN sits below LOW so it
# never blocks unless --fail-on is passed something even lower (we don't
# expose that).
SEVERITY_ORDER = {
"UNKNOWN": 0,
"LOW": 1,
"MODERATE": 2,
"MEDIUM": 2,
"HIGH": 3,
"CRITICAL": 4,
}
# ─── Data shapes ──────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class Component:
"""A single (name, version, ecosystem) tuple discovered on disk."""
name: str
version: str
ecosystem: str # "PyPI" | "npm" — exactly as OSV expects
source: str # human-readable origin, e.g. "venv", "plugin:foo", "mcp:bar"
@dataclass
class Vulnerability:
osv_id: str
severity: str = "UNKNOWN"
summary: str = ""
fixed_versions: list[str] = field(default_factory=list)
@dataclass
class Finding:
component: Component
vuln: Vulnerability
# ─── Component discovery ──────────────────────────────────────────────────────
def _discover_venv() -> list[Component]:
"""Every dist installed in the running Python's import path."""
from importlib.metadata import distributions
out: list[Component] = []
seen: set[tuple[str, str]] = set()
for dist in distributions():
try:
name = (dist.metadata["Name"] or "").strip()
except Exception:
continue
version = (dist.version or "").strip()
if not name or not version:
continue
key = (name.lower(), version)
if key in seen:
continue
seen.add(key)
out.append(Component(name=name, version=version, ecosystem="PyPI", source="venv"))
return out
# requirements.txt line: drop comments, environment markers, options, extras
_REQ_LINE = re.compile(
r"""^\s*
(?P<name>[A-Za-z0-9][A-Za-z0-9._-]*)
(?:\[[^\]]+\])? # extras
\s*==\s*
(?P<version>[A-Za-z0-9._+!-]+)
\s*(?:;.*)?$
""",
re.VERBOSE,
)
def _parse_requirements(text: str) -> list[tuple[str, str]]:
"""Extract ``name==version`` pins. Everything else (>=, ~=, no pin) is skipped.
A loose pin can't be mapped to a single OSV query, and getting it wrong
is worse than missing a finding for an audit tool false positives
train users to ignore output.
"""
pins: list[tuple[str, str]] = []
for raw in text.splitlines():
line = raw.strip()
if not line or line.startswith("#") or line.startswith("-"):
continue
m = _REQ_LINE.match(line)
if m:
pins.append((m.group("name"), m.group("version")))
return pins
def _parse_pyproject_pins(text: str) -> list[tuple[str, str]]:
"""Pull ``name==version`` pins from a ``pyproject.toml`` ``dependencies`` list.
Uses stdlib ``tomllib`` (3.11+). Same exact-pin policy as requirements.
"""
try:
import tomllib
except ImportError: # pragma: no cover - 3.10 only
return []
try:
data = tomllib.loads(text)
except Exception:
return []
deps: list[str] = []
project = data.get("project") or {}
if isinstance(project.get("dependencies"), list):
deps.extend(str(x) for x in project["dependencies"])
optional = project.get("optional-dependencies") or {}
if isinstance(optional, dict):
for group in optional.values():
if isinstance(group, list):
deps.extend(str(x) for x in group)
pins: list[tuple[str, str]] = []
for dep in deps:
m = _REQ_LINE.match(dep)
if m:
pins.append((m.group("name"), m.group("version")))
return pins
def _discover_plugins(hermes_home: Path) -> list[Component]:
"""Python deps declared by plugins under ``~/.hermes/plugins``.
Plugins typically don't install into the venv (they're directory-based
with relative imports), so their stated requirements are useful audit
surface even when the venv scan misses them.
"""
plugins_dir = hermes_home / "plugins"
if not plugins_dir.is_dir():
return []
out: list[Component] = []
for plugin_dir in sorted(plugins_dir.iterdir()):
if not plugin_dir.is_dir() or plugin_dir.name.startswith("."):
continue
source = f"plugin:{plugin_dir.name}"
for req_file in ("requirements.txt", "requirements-dev.txt"):
path = plugin_dir / req_file
if path.is_file():
try:
pins = _parse_requirements(path.read_text(encoding="utf-8", errors="replace"))
except OSError:
continue
for name, version in pins:
out.append(Component(name=name, version=version, ecosystem="PyPI", source=source))
pyproject = plugin_dir / "pyproject.toml"
if pyproject.is_file():
try:
pins = _parse_pyproject_pins(pyproject.read_text(encoding="utf-8", errors="replace"))
except OSError:
continue
for name, version in pins:
out.append(Component(name=name, version=version, ecosystem="PyPI", source=source))
return out
# npx forms we recognise:
# npx -y @scope/pkg@1.2.3
# npx --yes pkg@1.2.3
# npx pkg@1.2.3 [...args]
# We deliberately don't try to resolve unversioned names — that maps to
# "latest" at runtime and isn't a stable audit subject.
_NPX_PKG = re.compile(r"^(@[A-Za-z0-9._-]+/[A-Za-z0-9._-]+|[A-Za-z0-9._-]+)@([A-Za-z0-9._+-]+)$")
# uvx forms:
# uvx pkg==1.2.3
# uvx --with pkg==1.2.3 entrypoint
_UVX_PKG = re.compile(r"^([A-Za-z0-9][A-Za-z0-9._-]*)==([A-Za-z0-9._+!-]+)$")
def _extract_mcp_component(server_name: str, command: str, args: list[str]) -> Optional[Component]:
"""Best-effort: parse `command/args` into a (name, version, ecosystem).
Returns None when the entry doesn't pin a version we can audit (local
paths, Docker images, unversioned npx, etc.). Audit output stays silent
rather than guess.
"""
cmd = (command or "").strip().lower()
if not args:
return None
# npx (any prefix path)
if cmd.endswith("npx") or cmd == "npx":
# Skip flag tokens until we see the first thing that looks like a pkg ref
for token in args:
if token.startswith("-"):
continue
m = _NPX_PKG.match(token)
if m:
return Component(
name=m.group(1),
version=m.group(2),
ecosystem="npm",
source=f"mcp:{server_name}",
)
return None # First non-flag token isn't a pinned ref
# uvx (any prefix path)
if cmd.endswith("uvx") or cmd == "uvx":
for token in args:
if token.startswith("-"):
continue
m = _UVX_PKG.match(token)
if m:
return Component(
name=m.group(1),
version=m.group(2),
ecosystem="PyPI",
source=f"mcp:{server_name}",
)
return None
return None
def _discover_mcp() -> list[Component]:
"""Pinned MCP server packages from ``config.yaml``."""
try:
from hermes_cli.mcp_config import _get_mcp_servers
except Exception:
return []
out: list[Component] = []
servers = _get_mcp_servers()
if not isinstance(servers, dict):
return []
for name, cfg in servers.items():
if not isinstance(cfg, dict):
continue
command = cfg.get("command", "") or ""
args = cfg.get("args") or []
if not isinstance(args, list):
continue
comp = _extract_mcp_component(name, command, [str(a) for a in args])
if comp is not None:
out.append(comp)
return out
# ─── OSV client ───────────────────────────────────────────────────────────────
def _http_post_json(url: str, payload: dict) -> dict:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url, data=data, headers={"Content-Type": "application/json"}, method="POST"
)
with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as resp:
return json.loads(resp.read().decode("utf-8"))
def _http_get_json(url: str) -> dict:
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as resp:
return json.loads(resp.read().decode("utf-8"))
def _osv_query_batch(components: list[Component]) -> dict[Component, list[str]]:
"""Return {component -> [osv_id, ...]} for components with any vulns.
Components without findings are omitted from the result dict.
"""
if not components:
return {}
findings: dict[Component, list[str]] = {}
for chunk_start in range(0, len(components), OSV_BATCH_MAX):
chunk = components[chunk_start:chunk_start + OSV_BATCH_MAX]
payload = {
"queries": [
{
"package": {"name": c.name, "ecosystem": c.ecosystem},
"version": c.version,
}
for c in chunk
]
}
try:
resp = _http_post_json(OSV_BATCH_URL, payload)
except (urllib.error.URLError, TimeoutError, ConnectionError) as exc:
raise RuntimeError(f"OSV batch query failed: {exc}") from exc
results = resp.get("results") or []
for comp, result in zip(chunk, results):
vulns = (result or {}).get("vulns") or []
ids = [v.get("id") for v in vulns if v.get("id")]
if ids:
findings[comp] = ids
return findings
def _osv_severity_from_record(record: dict) -> str:
"""Extract CVSS-derived severity tier from an OSV vuln record."""
# OSV puts CVSS in `severity` (top-level or per-affected) and a
# human-readable bucket in `database_specific.severity` for GHSAs.
db_specific = record.get("database_specific") or {}
raw = db_specific.get("severity")
if isinstance(raw, str) and raw.strip():
upper = raw.strip().upper()
if upper in SEVERITY_ORDER:
return upper
# Fall back to CVSS score → tier
score: Optional[float] = None
for sev_entry in record.get("severity") or []:
s = sev_entry.get("score")
if isinstance(s, str):
# CVSS vector strings look like "CVSS:3.1/AV:N/..." — we can't
# parse without a lib. Look for an explicit numeric in
# affected[].ecosystem_specific later if present.
continue
affected = record.get("affected") or []
for entry in affected:
eco_spec = entry.get("ecosystem_specific") or {}
sev = eco_spec.get("severity")
if isinstance(sev, str) and sev.strip().upper() in SEVERITY_ORDER:
return sev.strip().upper()
if score is not None:
if score >= 9.0:
return "CRITICAL"
if score >= 7.0:
return "HIGH"
if score >= 4.0:
return "MODERATE"
if score > 0:
return "LOW"
return "UNKNOWN"
def _osv_fixed_versions(record: dict) -> list[str]:
fixes: list[str] = []
for entry in record.get("affected") or []:
for rng in entry.get("ranges") or []:
for event in rng.get("events") or []:
if "fixed" in event:
fixes.append(str(event["fixed"]))
# Dedupe, preserve order
seen: set[str] = set()
out: list[str] = []
for f in fixes:
if f not in seen:
seen.add(f)
out.append(f)
return out
def _osv_fetch_details(vuln_ids: Iterable[str]) -> dict[str, Vulnerability]:
"""Fetch summary/severity for each unique vuln id, in parallel."""
unique = sorted({vid for vid in vuln_ids if vid})
if not unique:
return {}
out: dict[str, Vulnerability] = {}
def _fetch_one(vid: str) -> Vulnerability:
try:
rec = _http_get_json(OSV_VULN_URL.format(vid=vid))
except (urllib.error.URLError, TimeoutError, ConnectionError):
return Vulnerability(osv_id=vid)
return Vulnerability(
osv_id=vid,
severity=_osv_severity_from_record(rec),
summary=(rec.get("summary") or "").strip(),
fixed_versions=_osv_fixed_versions(rec),
)
with concurrent.futures.ThreadPoolExecutor(max_workers=DETAIL_PARALLELISM) as pool:
for vuln in pool.map(_fetch_one, unique):
out[vuln.osv_id] = vuln
return out
# ─── Orchestration ────────────────────────────────────────────────────────────
def run_audit(
*,
skip_venv: bool = False,
skip_plugins: bool = False,
skip_mcp: bool = False,
hermes_home: Optional[Path] = None,
) -> list[Finding]:
"""Discover components, query OSV, return findings sorted by severity desc."""
home = hermes_home or Path(get_hermes_home())
components: list[Component] = []
if not skip_venv:
components.extend(_discover_venv())
if not skip_plugins:
components.extend(_discover_plugins(home))
if not skip_mcp:
components.extend(_discover_mcp())
if not components:
return []
raw = _osv_query_batch(components)
if not raw:
return []
all_ids: list[str] = []
for ids in raw.values():
all_ids.extend(ids)
details = _osv_fetch_details(all_ids)
findings: list[Finding] = []
for comp, ids in raw.items():
for vid in ids:
vuln = details.get(vid) or Vulnerability(osv_id=vid)
findings.append(Finding(component=comp, vuln=vuln))
findings.sort(
key=lambda f: (
-SEVERITY_ORDER.get(f.vuln.severity, 0),
f.component.source,
f.component.name.lower(),
f.vuln.osv_id,
)
)
return findings
# ─── Rendering ────────────────────────────────────────────────────────────────
def _render_human(findings: list[Finding], total_components: int) -> str:
if not findings:
return f"No known vulnerabilities found across {total_components} component(s)."
lines: list[str] = []
lines.append(
f"Found {len(findings)} known vulnerability finding(s) "
f"across {total_components} component(s):"
)
lines.append("")
last_source = None
for f in findings:
if f.component.source != last_source:
lines.append(f"[{f.component.source}]")
last_source = f.component.source
sev = f.vuln.severity.ljust(8)
head = f" {sev} {f.component.name}=={f.component.version} {f.vuln.osv_id}"
lines.append(head)
if f.vuln.summary:
summary = f.vuln.summary
if len(summary) > 100:
summary = summary[:97] + "..."
lines.append(f" {summary}")
if f.vuln.fixed_versions:
lines.append(f" fixed in: {', '.join(f.vuln.fixed_versions[:3])}")
return "\n".join(lines)
def _render_json(findings: list[Finding], total_components: int) -> str:
payload = {
"total_components_scanned": total_components,
"finding_count": len(findings),
"findings": [
{
"package": f.component.name,
"version": f.component.version,
"ecosystem": f.component.ecosystem,
"source": f.component.source,
"vuln_id": f.vuln.osv_id,
"severity": f.vuln.severity,
"summary": f.vuln.summary,
"fixed_versions": f.vuln.fixed_versions,
}
for f in findings
],
}
return json.dumps(payload, indent=2)
def _count_components(
*, skip_venv: bool, skip_plugins: bool, skip_mcp: bool, hermes_home: Path
) -> int:
total = 0
if not skip_venv:
total += len(_discover_venv())
if not skip_plugins:
total += len(_discover_plugins(hermes_home))
if not skip_mcp:
total += len(_discover_mcp())
return total
# ─── CLI entrypoint ───────────────────────────────────────────────────────────
def cmd_security_audit(args: argparse.Namespace) -> int:
"""Implementation of `hermes security audit`."""
home = Path(get_hermes_home())
skip_venv = bool(getattr(args, "skip_venv", False))
skip_plugins = bool(getattr(args, "skip_plugins", False))
skip_mcp = bool(getattr(args, "skip_mcp", False))
output_json = bool(getattr(args, "json", False))
fail_on = (getattr(args, "fail_on", None) or "critical").upper()
if fail_on not in SEVERITY_ORDER:
print(
f"unknown --fail-on value: {fail_on.lower()} "
f"(choose from: low, moderate, high, critical)",
file=sys.stderr,
)
return 2
total = _count_components(
skip_venv=skip_venv, skip_plugins=skip_plugins, skip_mcp=skip_mcp, hermes_home=home
)
if total == 0:
msg = "No components discovered (everything skipped, or empty environment)."
if output_json:
print(json.dumps({"total_components_scanned": 0, "finding_count": 0, "findings": []}))
else:
print(msg)
return 0
try:
findings = run_audit(
skip_venv=skip_venv,
skip_plugins=skip_plugins,
skip_mcp=skip_mcp,
hermes_home=home,
)
except RuntimeError as exc:
print(f"audit failed: {exc}", file=sys.stderr)
return 2
if output_json:
print(_render_json(findings, total))
else:
print(_render_human(findings, total))
# Exit code: 1 iff any finding meets or exceeds the --fail-on threshold.
threshold = SEVERITY_ORDER[fail_on]
for f in findings:
if SEVERITY_ORDER.get(f.vuln.severity, 0) >= threshold:
return 1
return 0

View file

@ -0,0 +1,299 @@
"""Unit tests for hermes_cli.security_audit — parsers + OSV plumbing.
These never hit the live OSV API; HTTP is monkeypatched. The live-call path
is exercised in the E2E test embedded in PR validation, not here.
"""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import patch
import pytest
from hermes_cli import security_audit as sa
# ─── Parsers ──────────────────────────────────────────────────────────────────
class TestRequirementsParser:
def test_extracts_pinned_versions(self):
text = "requests==2.20.0\nflask==2.0.1\n"
assert sa._parse_requirements(text) == [
("requests", "2.20.0"),
("flask", "2.0.1"),
]
def test_skips_comments_and_options(self):
text = "# comment\n-r other.txt\n--index-url https://x\nflask==2.0.1\n"
assert sa._parse_requirements(text) == [("flask", "2.0.1")]
def test_skips_unpinned(self):
# We deliberately don't try to map >=, ~=, or bare-name deps to OSV.
text = "requests>=2.0\ntyping-extensions\nflask~=2.0\n"
assert sa._parse_requirements(text) == []
def test_handles_extras_and_markers(self):
text = 'requests[security]==2.20.0\nflask==2.0.1 ; python_version >= "3.8"\n'
assert sa._parse_requirements(text) == [
("requests", "2.20.0"),
("flask", "2.0.1"),
]
def test_handles_empty(self):
assert sa._parse_requirements("") == []
assert sa._parse_requirements(" \n\n ") == []
class TestMCPComponentExtraction:
def test_npx_scoped_pinned(self):
comp = sa._extract_mcp_component(
"fs", "npx", ["-y", "@modelcontextprotocol/server-filesystem@0.5.0"]
)
assert comp == sa.Component(
name="@modelcontextprotocol/server-filesystem",
version="0.5.0",
ecosystem="npm",
source="mcp:fs",
)
def test_npx_full_path_command(self):
comp = sa._extract_mcp_component(
"fetch", "/usr/local/bin/npx", ["mcp-server-fetch@1.2.3"]
)
assert comp is not None
assert comp.name == "mcp-server-fetch"
assert comp.version == "1.2.3"
def test_uvx_pinned(self):
comp = sa._extract_mcp_component("time", "uvx", ["mcp-server-time==2.1.0"])
assert comp is not None
assert comp.ecosystem == "PyPI"
assert comp.name == "mcp-server-time"
assert comp.version == "2.1.0"
def test_unpinned_returns_none(self):
# Bare npx package name = "latest" at runtime; not an audit subject.
assert sa._extract_mcp_component("x", "npx", ["-y", "some-pkg"]) is None
def test_docker_returns_none(self):
# We don't currently parse docker image refs.
assert sa._extract_mcp_component("x", "docker", ["run", "-i", "mcp/foo:1.0"]) is None
def test_empty_args(self):
assert sa._extract_mcp_component("x", "npx", []) is None
# ─── Plugin discovery ─────────────────────────────────────────────────────────
class TestPluginDiscovery:
def test_reads_requirements_txt(self, tmp_path: Path):
plugin = tmp_path / "plugins" / "myplugin"
plugin.mkdir(parents=True)
(plugin / "requirements.txt").write_text("requests==2.20.0\n")
components = sa._discover_plugins(tmp_path)
assert len(components) == 1
assert components[0].name == "requests"
assert components[0].source == "plugin:myplugin"
def test_skips_when_no_plugins_dir(self, tmp_path: Path):
assert sa._discover_plugins(tmp_path) == []
def test_skips_hidden_dirs(self, tmp_path: Path):
(tmp_path / "plugins" / ".hidden").mkdir(parents=True)
(tmp_path / "plugins" / ".hidden" / "requirements.txt").write_text(
"requests==2.20.0\n"
)
assert sa._discover_plugins(tmp_path) == []
def test_reads_pyproject_dependencies(self, tmp_path: Path):
plugin = tmp_path / "plugins" / "py"
plugin.mkdir(parents=True)
(plugin / "pyproject.toml").write_text(
'[project]\ndependencies = ["flask==2.0.1", "uvicorn>=0.20"]\n'
)
components = sa._discover_plugins(tmp_path)
# uvicorn>=0.20 is unpinned, so only flask comes through
assert len(components) == 1
assert components[0].name == "flask"
assert components[0].version == "2.0.1"
# ─── OSV severity extraction ──────────────────────────────────────────────────
class TestSeverityExtraction:
def test_database_specific_severity(self):
rec = {"database_specific": {"severity": "HIGH"}}
assert sa._osv_severity_from_record(rec) == "HIGH"
def test_unknown_when_no_severity(self):
assert sa._osv_severity_from_record({}) == "UNKNOWN"
def test_ecosystem_specific_fallback(self):
rec = {"affected": [{"ecosystem_specific": {"severity": "MODERATE"}}]}
assert sa._osv_severity_from_record(rec) == "MODERATE"
def test_fixed_versions_extracted_and_deduped(self):
rec = {
"affected": [
{
"ranges": [
{
"events": [
{"introduced": "0"},
{"fixed": "2.0.0"},
]
}
]
},
{"ranges": [{"events": [{"fixed": "2.0.0"}, {"fixed": "1.9.5"}]}]},
]
}
assert sa._osv_fixed_versions(rec) == ["2.0.0", "1.9.5"]
# ─── End-to-end orchestration with mocked OSV ─────────────────────────────────
class TestRunAudit:
def test_no_components_returns_empty(self, tmp_path: Path):
findings = sa.run_audit(
skip_venv=True, skip_plugins=True, skip_mcp=True, hermes_home=tmp_path
)
assert findings == []
def test_findings_sorted_by_severity_desc(self, tmp_path: Path):
plugin = tmp_path / "plugins" / "p"
plugin.mkdir(parents=True)
(plugin / "requirements.txt").write_text("alpha==1.0.0\nbeta==2.0.0\n")
def fake_batch(comps):
return {
comps[0]: ["LOW-1"],
comps[1]: ["CRIT-1"],
}
def fake_details(ids):
return {
"LOW-1": sa.Vulnerability(osv_id="LOW-1", severity="LOW", summary="low"),
"CRIT-1": sa.Vulnerability(osv_id="CRIT-1", severity="CRITICAL", summary="crit"),
}
with patch.object(sa, "_osv_query_batch", side_effect=fake_batch), \
patch.object(sa, "_osv_fetch_details", side_effect=fake_details):
findings = sa.run_audit(
skip_venv=True, skip_plugins=False, skip_mcp=True, hermes_home=tmp_path
)
assert len(findings) == 2
# CRITICAL must come first
assert findings[0].vuln.osv_id == "CRIT-1"
assert findings[1].vuln.osv_id == "LOW-1"
# ─── CLI subcommand exit codes ────────────────────────────────────────────────
class TestExitCodes:
def _build_args(self, **kwargs):
import argparse
defaults = {
"skip_venv": True,
"skip_plugins": True,
"skip_mcp": True,
"json": False,
"fail_on": "critical",
}
defaults.update(kwargs)
return argparse.Namespace(**defaults)
def test_clean_audit_exits_zero(self, tmp_path: Path, monkeypatch, capsys):
monkeypatch.setattr(sa, "get_hermes_home", lambda: str(tmp_path))
# Everything skipped → no components → exit 0
code = sa.cmd_security_audit(self._build_args())
assert code == 0
out = capsys.readouterr().out
assert "No components" in out or "0 component" in out
def test_finding_above_threshold_exits_one(self, tmp_path: Path, monkeypatch):
monkeypatch.setattr(sa, "get_hermes_home", lambda: str(tmp_path))
# Force a venv discovery to return one component, OSV to flag it CRITICAL
fake_comp = sa.Component(
name="pkg", version="1.0", ecosystem="PyPI", source="venv"
)
monkeypatch.setattr(sa, "_discover_venv", lambda: [fake_comp])
monkeypatch.setattr(
sa, "_osv_query_batch", lambda comps: {fake_comp: ["X-1"]}
)
monkeypatch.setattr(
sa,
"_osv_fetch_details",
lambda ids: {"X-1": sa.Vulnerability(osv_id="X-1", severity="CRITICAL")},
)
code = sa.cmd_security_audit(
self._build_args(skip_venv=False, fail_on="critical")
)
assert code == 1
def test_finding_below_threshold_exits_zero(self, tmp_path: Path, monkeypatch):
monkeypatch.setattr(sa, "get_hermes_home", lambda: str(tmp_path))
fake_comp = sa.Component(
name="pkg", version="1.0", ecosystem="PyPI", source="venv"
)
monkeypatch.setattr(sa, "_discover_venv", lambda: [fake_comp])
monkeypatch.setattr(
sa, "_osv_query_batch", lambda comps: {fake_comp: ["X-1"]}
)
monkeypatch.setattr(
sa,
"_osv_fetch_details",
lambda ids: {"X-1": sa.Vulnerability(osv_id="X-1", severity="MODERATE")},
)
code = sa.cmd_security_audit(
self._build_args(skip_venv=False, fail_on="critical")
)
assert code == 0
def test_unknown_fail_on_value_exits_two(self, tmp_path: Path, monkeypatch, capsys):
monkeypatch.setattr(sa, "get_hermes_home", lambda: str(tmp_path))
code = sa.cmd_security_audit(self._build_args(fail_on="garbage"))
assert code == 2
err = capsys.readouterr().err
assert "fail-on" in err.lower()
def test_json_output_shape(self, tmp_path: Path, monkeypatch, capsys):
monkeypatch.setattr(sa, "get_hermes_home", lambda: str(tmp_path))
fake_comp = sa.Component(
name="pkg", version="1.0", ecosystem="PyPI", source="venv"
)
monkeypatch.setattr(sa, "_discover_venv", lambda: [fake_comp])
monkeypatch.setattr(
sa, "_osv_query_batch", lambda comps: {fake_comp: ["X-1"]}
)
monkeypatch.setattr(
sa,
"_osv_fetch_details",
lambda ids: {
"X-1": sa.Vulnerability(
osv_id="X-1",
severity="HIGH",
summary="bad",
fixed_versions=["1.1"],
)
},
)
sa.cmd_security_audit(
self._build_args(skip_venv=False, json=True, fail_on="critical")
)
payload = capsys.readouterr().out
# The bitwarden banner can leak above the json; pick the first { line.
lines = payload.splitlines()
json_start = next(i for i, l in enumerate(lines) if l.startswith("{"))
data = json.loads("\n".join(lines[json_start:]))
assert data["finding_count"] == 1
assert data["findings"][0]["severity"] == "HIGH"
assert data["findings"][0]["fixed_versions"] == ["1.1"]

View file

@ -53,6 +53,7 @@ hermes [global-options] <command> [subcommand/options]
| `hermes webhook` | Manage dynamic webhook subscriptions for event-driven activation. |
| `hermes hooks` | Inspect, approve, or remove shell-script hooks declared in `config.yaml`. |
| `hermes doctor` | Diagnose config and dependency issues. |
| `hermes security audit` | On-demand supply-chain audit (OSV.dev) for the venv, plugin requirements, and pinned MCP servers. |
| `hermes dump` | Copy-pasteable setup summary for support/debugging. |
| `hermes debug` | Debug tools — upload logs and system info for support. |
| `hermes backup` | Back up Hermes home directory to a zip file. |