mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
feat(security): supply-chain advisory checker + lazy-install framework + tiered install fallback (#24220)
* feat(security): supply-chain advisory checker + lazy-install framework + tiered install fallback
Three coordinated mitigations for the Mini Shai-Hulud worm hitting
mistralai 2.4.6 on PyPI (2026-05-12) and for the next single-package
compromise that follows.
# What this PR makes true
1. Users with the poisoned mistralai 2.4.6 in their venv get a loud
detection banner with copy-pasteable remediation steps the moment
they run hermes (and on every gateway startup).
2. One quarantined / yanked PyPI package can no longer silently demote
a fresh install to 'core only' — the installer keeps every other
extra and tells the user which tier landed.
3. Future opt-in backends (Mistral, ElevenLabs, Honcho, etc.) can
lazy-install on first use under a strict allowlist, instead of
eagerly pulling everything at install time.
# Detection: hermes_cli/security_advisories.py
- ADVISORIES catalog (one entry currently: shai-hulud-2026-05 for
mistralai==2.4.6). Adding the next one is a single dataclass.
- detect_compromised() uses importlib.metadata.version() — no pip
dependency, works in uv venvs that lack pip.
- Banner cache (~/.hermes/cache/advisory_banner_seen) rate-limits
the startup banner to once per 24h per advisory.
- Acks persisted to security.acked_advisories in config.yaml; never
re-banner after ack.
- Wired into:
* hermes doctor — runs first, prints full remediation block
* hermes doctor --ack <id> — dismisses an advisory
* cli.py interactive run() and single-query branches — short
stderr banner pointing at hermes doctor
* gateway/run.py startup — operator-visible warning in gateway.log
# Lazy-install framework: tools/lazy_deps.py
- LAZY_DEPS allowlist maps namespaced feature keys (tts.elevenlabs,
memory.honcho, provider.bedrock, etc.) to pip specs.
- ensure(feature) installs missing deps in the active venv via the
uv → pip → ensurepip ladder (matches tools_config._pip_install).
- Strict spec safety regex rejects URLs, file paths, shell metas,
pip flag injection, control chars — only PyPI-by-name accepted.
- Gated on security.allow_lazy_installs (default true) plus the
HERMES_DISABLE_LAZY_INSTALLS env var for restricted/audited envs.
- Migrated three backends as proof of pattern:
* tools/tts_tool.py — _import_elevenlabs() calls ensure first
* plugins/memory/honcho/client.py — get_honcho_client lazy-installs
* tts.mistral / stt.mistral entries pre-registered for when PyPI
restores mistralai
# Installer fallback tiers
scripts/install.sh, scripts/install.ps1, setup-hermes.sh:
- Centralised _BROKEN_EXTRAS list (currently: mistral). Edit one
array when a transitive breaks; users keep every other extra.
- New 'all minus known-broken' tier between [all] and the existing
PyPI-only-extras tier. Only kicks in when [all] fails resolve.
- All three tiers explicit: every fallback announces which tier
landed and prints a re-run hint when not on Tier 1.
- install.ps1 and install.sh both regenerate their tier specs from
the same _BROKEN_EXTRAS array so updates stay in sync.
Side effect: install.ps1 Tier 2 spec previously hardcoded 'mistral'
in its extra list — bug fixed by the refactor (mistral is filtered
out).
# Config
hermes_cli/config.py — DEFAULT_CONFIG.security gains:
- acked_advisories: [] (advisory IDs the user has dismissed)
- allow_lazy_installs: True (security gate for ensure())
No config version bump needed — both keys nest under existing
security: block, and load_config's deep-merge picks up DEFAULT_CONFIG
defaults for users with older configs.
# Tests
tests/hermes_cli/test_security_advisories.py — 23 tests covering:
- detect_compromised matches/non-matches, wildcard frozenset
- ack persistence, idempotence, blank rejection, config-failure path
- banner cache rate limiting + 24h re-banner + ack-stops-banner
- short_banner_lines / full_remediation_text / render_doctor_section /
gateway_log_message
- shipped catalog well-formedness invariant
tests/tools/test_lazy_deps.py — 40 tests covering:
- spec safety: 11 safe parametrized + 18 unsafe parametrized
- allowlist: unknown-feature rejection, namespace.name shape,
every shipped spec passes the safety regex
- security gating: config flag, env var, default, fail-open
- ensure() happy/sad paths: already-satisfied, install success,
pip stderr surfaced on failure, install-succeeds-but-still-missing
- is_available, feature_install_command
Combined: 63 new tests, all passing under scripts/run_tests.sh.
# Validation
- scripts/run_tests.sh tests/hermes_cli/test_security_advisories.py
tests/tools/test_lazy_deps.py → 63/63 passing
- scripts/run_tests.sh tests/hermes_cli/test_doctor.py
tests/hermes_cli/test_doctor_command_install.py
tests/tools/test_tts_mistral.py tests/tools/test_transcription_tools.py
tests/tools/test_transcription_dotenv_fallback.py → 165/165 passing
- scripts/run_tests.sh tests/hermes_cli/ tests/tools/ →
9191 passed, 8 pre-existing failures (verified on origin/main
before this change)
- bash -n on install.sh and setup-hermes.sh → OK
- py_compile on all modified .py files → OK
- End-to-end smoke test of detect_compromised + render_doctor_section
+ gateway_log_message with mocked installed version → produces
copy-pasteable remediation output
# Community
Full advisory + remediation steps:
website/docs/community/security-advisories/shai-hulud-mistralai-2026-05.md
Short-form post drafts (Discord, GitHub pinned issue, README banner):
scripts/community-announcement-shai-hulud.md
Refs: PR #24205 (mistral disabled), Socket Security advisory
<https://socket.dev/blog/mini-shai-hulud-worm-pypi>
* build(deps): pin every direct dep to ==X.Y.Z (no ranges)
Companion to the supply-chain advisory work: replace every >=/</~= range
in pyproject.toml's [project.dependencies] and [project.optional-dependencies]
with an exact ==X.Y.Z pin sourced from uv.lock.
Why: ranges allow PyPI to ship a fresh version of any direct dep at any
time without a code review on our side. With ranges, the malicious
mistralai 2.4.6 release would have been pulled by every fresh
'pip install -e .[all]' for the hours between upload and PyPI's
quarantine — exactly the install window we got hit on. Exact pins close
that window: the only way a new package version reaches a user is via
an intentional update on our end.
What the user-facing change is: nothing, behavior-wise. Every package
resolves to the same version it was already resolving to via uv.lock —
the pins just remove the resolver's freedom to pick a different one.
Cost: any user installing Hermes alongside another package that requires
a newer pin gets a resolver conflict. Acceptable for our isolated-venv
install path; documented in the new comment block.
Build-system requires line (setuptools>=61.0) is intentionally left
as a range — pinning the build backend would block fresh pip from
bootstrapping the build on architectures where that exact wheel isn't
available.
mistral extra (mistralai==2.3.0) is pinned but stays out of [all]
(per PR #24205). 'uv lock' regeneration will fail until PyPI restores
mistralai; lockfile regeneration is gated behind that, NOT on every PR.
LAZY_DEPS in tools/lazy_deps.py also moved to exact pins so the lazy-
install pathway can never resolve a different version than the one
declared in pyproject.toml.
Validation:
- Cross-checked all 77 pinned direct deps in pyproject.toml against
uv.lock — every pin matches the resolved version exactly.
- Cross-checked all LAZY_DEPS specs against uv.lock — same.
- 'uv pip install -e .[all] --dry-run' resolves 205 packages cleanly.
- tests/tools/test_lazy_deps.py + tests/hermes_cli/test_security_advisories.py
→ 63/63 passing (every shipped spec passes the safety regex).
- Doctor + TTS + transcription targeted suite → 146/146 passing.
* build(deps): hash-verify transitives via uv.lock; remove unresolvable [mistral] extra
You asked: 'what about the dependencies the dependencies rely on?' —
correctly noting that exact-pinning direct deps in pyproject.toml does
NOT cover the transitive graph. `pip install` and `uv pip install` both
re-resolve transitives fresh from PyPI at install time, so a compromised
transitive (e.g. `httpcore` if it got worm-poisoned tomorrow) would
still hit our users even with every direct dep exact-pinned.
# What this commit fixes
1. **Both real installer scripts now prefer `uv sync --locked` as Tier 0.**
uv.lock records SHA256 hashes for every transitive — a compromised
package with a different hash gets REJECTED. Falls through to the
existing `uv pip install` cascade if the lockfile is missing or
stale, with a loud warning that the fallback path does NOT
hash-verify transitives. Previously only `setup-hermes.sh` (the dev
path) used the lockfile; `scripts/install.sh` and `scripts/install.ps1`
(the paths fresh users actually run) skipped it.
2. **Removed the `[mistral]` extra entirely.** The `mistralai` PyPI
project is fully quarantined right now — every version returns 404,
so any pin we wrote was unresolvable, which broke `uv lock --check`
in CI. Restoration is documented in pyproject.toml as a 5-step
checklist (verify, re-add extra, re-enable in 4 modules, regenerate
lock, optionally re-add to [all]).
3. **Regenerated uv.lock.** 262 packages, mistralai/eval-type-backport/
jsonpath-python pruned. `uv lock --check` now passes.
# Defense-in-depth view
| Layer | Where | Protects against |
|----------------------------|-------------------|-------------------------------------------|
| Exact pins in pyproject | direct deps | new mistralai 2.4.6-style direct compromise |
| uv.lock + `--locked` install | transitive graph | transitive worm injection |
| Tier-0 hash-verified path | install.sh / .ps1 | actually USE the lockfile in fresh installs |
| `uv lock --check` CI gate | every PR | drift between pyproject and lockfile |
| `hermes_cli/security_advisories.py` | runtime | cleanup for users who already got hit |
The exact pinning + hash verification together close the supply-chain
gap. Without the lockfile path, exact pins alone are theater.
# Validation
- `uv lock --check` → passes (262 packages resolved, no drift).
- `bash -n` on install.sh + setup-hermes.sh → OK.
- 209/209 tests passing across new + adjacent test files
(test_lazy_deps.py, test_security_advisories.py, test_doctor.py,
test_tts_mistral.py, test_transcription_tools.py).
- TOML parse OK.
* chore: remove community announcement drafts (PR body covers it)
* build(deps): lazy-install every opt-in backend (anthropic, search, terminal, platforms, dashboard)
Extends the lazy-install framework to cover everything that's not used by
every hermes session. Base install drops from ~60 packages to 45.
Moved out of core dependencies = []:
- anthropic (only when provider=anthropic native, not via aggregators)
- exa-py, firecrawl-py, parallel-web (search backends; only when picked)
- fal-client (image gen; only when picked)
- edge-tts (default TTS but still optional)
New extras in pyproject.toml: [anthropic] [exa] [firecrawl] [parallel-web]
[fal] [edge-tts]. All added to [all].
New LAZY_DEPS entries: provider.anthropic, search.{exa,firecrawl,parallel},
tts.edge, image.fal, memory.hindsight, platform.{telegram,discord,matrix},
terminal.{modal,daytona,vercel}, tool.dashboard.
Each import site now calls ensure() before importing the SDK. Where the
module had a top-level try/except (telegram, discord, fastapi), the
graceful-fallback pattern was extended to lazy-install on first
check_*_requirements() call and re-bind module globals.
Updated test_windows_native_support.py tzdata check from snapshot
(>=2023.3 literal) to invariant (any version + win32 marker).
Validation:
- Base install: 45 packages (was ~60); 6 newly-extracted packages absent
- uv lock --check: passes (262 packages, no drift)
- 209/209 lazy_deps + advisory + doctor + tts/transcription tests passing
- py_compile clean on all 12 modified modules
This commit is contained in:
parent
99ad2d1372
commit
c1eb2dcda7
28 changed files with 2433 additions and 243 deletions
|
|
@ -1332,6 +1332,21 @@ DEFAULT_CONFIG = {
|
|||
"domains": [],
|
||||
"shared_files": [],
|
||||
},
|
||||
# Acknowledged supply-chain security advisories. Each entry is the
|
||||
# ID of an advisory the user has read and acted on (uninstalled the
|
||||
# compromised package, rotated credentials). Acked advisories no
|
||||
# longer trigger the startup banner. Add via `hermes doctor --ack
|
||||
# <id>`; remove by editing the list directly. See
|
||||
# ``hermes_cli/security_advisories.py`` for the catalog.
|
||||
"acked_advisories": [],
|
||||
# Allow Hermes to lazy-install opt-in backend packages from PyPI
|
||||
# the first time the user enables a backend that needs them
|
||||
# (e.g. installing ``elevenlabs`` when the user picks ElevenLabs as
|
||||
# their TTS provider). Set to false to require explicit
|
||||
# ``pip install`` for everything beyond the base set — appropriate
|
||||
# for restricted networks, audited environments, or air-gapped
|
||||
# systems where any runtime install is unacceptable.
|
||||
"allow_lazy_installs": True,
|
||||
},
|
||||
|
||||
"cron": {
|
||||
|
|
|
|||
|
|
@ -296,19 +296,101 @@ def _build_apikey_providers_list() -> list:
|
|||
def run_doctor(args):
|
||||
"""Run diagnostic checks."""
|
||||
should_fix = getattr(args, 'fix', False)
|
||||
ack_target = getattr(args, 'ack', None)
|
||||
|
||||
# Doctor runs from the interactive CLI, so CLI-gated tool availability
|
||||
# checks (like cronjob management) should see the same context as `hermes`.
|
||||
os.environ.setdefault("HERMES_INTERACTIVE", "1")
|
||||
|
||||
|
||||
# Handle `hermes doctor --ack <id>` as a fast path. Persist the ack and
|
||||
# return without running the rest of the diagnostics — the user has
|
||||
# already seen the advisory and just wants to silence it.
|
||||
if ack_target:
|
||||
from hermes_cli.security_advisories import (
|
||||
ADVISORIES,
|
||||
ack_advisory,
|
||||
)
|
||||
valid_ids = {a.id for a in ADVISORIES}
|
||||
if ack_target not in valid_ids:
|
||||
print(color(
|
||||
f"Unknown advisory ID: {ack_target!r}. Known IDs: "
|
||||
f"{', '.join(sorted(valid_ids)) or '(none)'}",
|
||||
Colors.RED,
|
||||
))
|
||||
sys.exit(2)
|
||||
if ack_advisory(ack_target):
|
||||
print(color(
|
||||
f" ✓ Acknowledged advisory {ack_target}. "
|
||||
f"It will no longer trigger startup banners.",
|
||||
Colors.GREEN,
|
||||
))
|
||||
else:
|
||||
print(color(
|
||||
f" ✗ Failed to persist ack for {ack_target}. "
|
||||
f"Check ~/.hermes/config.yaml is writable.",
|
||||
Colors.RED,
|
||||
))
|
||||
sys.exit(1)
|
||||
return
|
||||
|
||||
issues = []
|
||||
manual_issues = [] # issues that can't be auto-fixed
|
||||
fixed_count = 0
|
||||
|
||||
|
||||
print()
|
||||
print(color("┌─────────────────────────────────────────────────────────┐", Colors.CYAN))
|
||||
print(color("│ 🩺 Hermes Doctor │", Colors.CYAN))
|
||||
print(color("└─────────────────────────────────────────────────────────┘", Colors.CYAN))
|
||||
|
||||
# =========================================================================
|
||||
# Check: Security advisories (RUNS FIRST — these are the most urgent)
|
||||
# =========================================================================
|
||||
print()
|
||||
print(color("◆ Security Advisories", Colors.CYAN, Colors.BOLD))
|
||||
try:
|
||||
from hermes_cli.security_advisories import (
|
||||
detect_compromised,
|
||||
filter_unacked,
|
||||
full_remediation_text,
|
||||
get_acked_ids,
|
||||
)
|
||||
all_hits = detect_compromised()
|
||||
fresh_hits = filter_unacked(all_hits)
|
||||
if fresh_hits:
|
||||
for hit in fresh_hits:
|
||||
check_fail(
|
||||
f"{hit.advisory.title}",
|
||||
f"({hit.package}=={hit.installed_version})",
|
||||
)
|
||||
# Print the full remediation block, indented under the
|
||||
# check_fail header so it reads as a single section.
|
||||
for line in full_remediation_text(hit):
|
||||
if line:
|
||||
print(f" {color(line, Colors.YELLOW)}")
|
||||
else:
|
||||
print()
|
||||
# Funnel into the action list so the summary block surfaces it
|
||||
# for users who scroll past the section.
|
||||
manual_issues.append(
|
||||
f"Resolve security advisory {hit.advisory.id}: "
|
||||
f"uninstall {hit.package}=={hit.installed_version} and "
|
||||
f"rotate credentials, then run "
|
||||
f"`hermes doctor --ack {hit.advisory.id}`."
|
||||
)
|
||||
# Acked-but-still-installed: show as informational so the user
|
||||
# knows the package is still on disk after the ack.
|
||||
acked_ids = get_acked_ids()
|
||||
for h in all_hits:
|
||||
if h.advisory.id in acked_ids:
|
||||
check_warn(
|
||||
f"{h.package}=={h.installed_version} still installed "
|
||||
f"(advisory {h.advisory.id} acknowledged)",
|
||||
)
|
||||
else:
|
||||
check_ok("No active security advisories")
|
||||
except Exception as e:
|
||||
# Never let a bug in the advisory check block the rest of doctor.
|
||||
check_warn(f"Security advisory check failed: {e}")
|
||||
|
||||
# =========================================================================
|
||||
# Check: Python version
|
||||
|
|
|
|||
|
|
@ -10086,6 +10086,16 @@ def main():
|
|||
doctor_parser.add_argument(
|
||||
"--fix", action="store_true", help="Attempt to fix issues automatically"
|
||||
)
|
||||
doctor_parser.add_argument(
|
||||
"--ack",
|
||||
metavar="ADVISORY_ID",
|
||||
default=None,
|
||||
help=(
|
||||
"Acknowledge a security advisory by ID and exit. After ack, the "
|
||||
"advisory will no longer trigger startup banners. Run `hermes "
|
||||
"doctor` first to see active advisories and their IDs."
|
||||
),
|
||||
)
|
||||
doctor_parser.set_defaults(func=cmd_doctor)
|
||||
|
||||
# =========================================================================
|
||||
|
|
|
|||
451
hermes_cli/security_advisories.py
Normal file
451
hermes_cli/security_advisories.py
Normal file
|
|
@ -0,0 +1,451 @@
|
|||
"""
|
||||
Security advisory checker for Hermes Agent.
|
||||
|
||||
Detects known-compromised Python packages installed in the active venv
|
||||
(supply-chain attacks like the Mini Shai-Hulud worm of May 2026 that
|
||||
poisoned ``mistralai 2.4.6`` on PyPI) and surfaces remediation guidance to
|
||||
the user.
|
||||
|
||||
Design goals:
|
||||
|
||||
- **Cheap.** A single ``importlib.metadata.version()`` call per advisory
|
||||
package. Safe to run on every CLI startup.
|
||||
- **Loud when it matters, silent otherwise.** If no compromised package is
|
||||
installed, the user sees nothing.
|
||||
- **Acknowledgeable.** Once the user has read and acted on an advisory they
|
||||
can dismiss it via ``hermes doctor --ack <id>``; the ack is persisted to
|
||||
``config.security.acked_advisories`` and survives restart.
|
||||
- **Extensible.** Adding a new advisory is one entry in ``ADVISORIES``;
|
||||
adding a new compromised version is a one-line edit. No code changes
|
||||
needed when the next worm hits.
|
||||
|
||||
The check is invoked from three places:
|
||||
|
||||
1. ``hermes doctor`` (and ``hermes doctor --ack <id>``)
|
||||
2. CLI startup banner (one short line, then full guidance via
|
||||
``hermes doctor``)
|
||||
3. Gateway startup (logged to gateway.log; first interactive message gets
|
||||
a one-line operator banner)
|
||||
|
||||
This module is intentionally dependency-free beyond the stdlib so it can
|
||||
run in environments where the rest of Hermes failed to import.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Iterable, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Advisory catalog
|
||||
#
|
||||
# Each advisory is a community-facing security warning about one or more
|
||||
# specific package versions that are known to be compromised. To add a new
|
||||
# advisory:
|
||||
#
|
||||
# 1. Append a new ``Advisory`` to ``ADVISORIES`` below
|
||||
# 2. Set ``compromised`` to a tuple of ``(pkg_name, frozenset_of_versions)``
|
||||
# — version strings must match what ``importlib.metadata.version()``
|
||||
# returns. Use an empty frozenset to flag *any installed version*
|
||||
# (rare; only when the maintainer namespace itself is compromised).
|
||||
# 3. Write 2-4 short ``remediation`` lines a non-expert can copy/paste.
|
||||
#
|
||||
# Do NOT remove old advisories. Once an advisory ships, leave it in place so
|
||||
# users running an older release with the compromised package still get
|
||||
# warned. Mark superseded ones via ``superseded_by`` if needed.
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Advisory:
|
||||
"""One security advisory entry.
|
||||
|
||||
Attributes:
|
||||
id: stable identifier used for acks (e.g. ``shai-hulud-2026-05``).
|
||||
Lowercase-hyphen, never reused.
|
||||
title: one-line headline shown in banners.
|
||||
summary: 1-3 sentence description of what was compromised and how.
|
||||
url: reference URL (Socket advisory, GitHub advisory, PyPI page).
|
||||
compromised: tuple of ``(package_name, frozenset_of_versions)``
|
||||
pairs. Empty frozenset means "any version of this package is
|
||||
considered suspect" — use sparingly.
|
||||
remediation: ordered list of steps the user should take. First step
|
||||
should be the uninstall command; subsequent steps the credential
|
||||
audit / rotation guidance.
|
||||
published: ISO date string for sort order.
|
||||
"""
|
||||
|
||||
id: str
|
||||
title: str
|
||||
summary: str
|
||||
url: str
|
||||
compromised: tuple[tuple[str, frozenset[str]], ...]
|
||||
remediation: tuple[str, ...]
|
||||
published: str = ""
|
||||
severity: str = "high" # low / medium / high / critical
|
||||
|
||||
|
||||
ADVISORIES: tuple[Advisory, ...] = (
|
||||
Advisory(
|
||||
id="shai-hulud-2026-05",
|
||||
title="Mini Shai-Hulud worm — mistralai 2.4.6 compromised on PyPI",
|
||||
summary=(
|
||||
"PyPI quarantined the mistralai package on 2026-05-12 after a "
|
||||
"malicious 2.4.6 release. The worm steals credentials from "
|
||||
"environment variables and credential files (~/.npmrc, ~/.pypirc, "
|
||||
"~/.aws/credentials, GitHub PATs, cloud SDK tokens) and exfils "
|
||||
"them to a hardcoded webhook. If you ran any Python process that "
|
||||
"imported mistralai 2.4.6 — including hermes when configured "
|
||||
"with provider=mistral for TTS or STT — assume those credentials "
|
||||
"are exposed."
|
||||
),
|
||||
url="https://socket.dev/blog/mini-shai-hulud-worm-pypi",
|
||||
compromised=(
|
||||
("mistralai", frozenset({"2.4.6"})),
|
||||
),
|
||||
remediation=(
|
||||
"Run: pip uninstall -y mistralai (or: uv pip uninstall mistralai)",
|
||||
"Rotate API keys in ~/.hermes/.env (OpenRouter, Anthropic, OpenAI, "
|
||||
"Nous, GitHub, AWS, Google, Mistral, etc.).",
|
||||
"Audit ~/.npmrc, ~/.pypirc, ~/.aws/credentials, ~/.config/gh/hosts.yml, "
|
||||
"and any other credential files for tokens that may have been read.",
|
||||
"Check GitHub for unexpected new SSH keys, deploy keys, or webhook "
|
||||
"additions on repos you have admin on.",
|
||||
"After cleanup: hermes doctor --ack shai-hulud-2026-05 to dismiss "
|
||||
"this warning.",
|
||||
),
|
||||
published="2026-05-12",
|
||||
severity="critical",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Detection
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AdvisoryHit:
|
||||
"""One package-version match against an advisory."""
|
||||
|
||||
advisory: Advisory
|
||||
package: str
|
||||
installed_version: str
|
||||
|
||||
|
||||
def _installed_version(pkg_name: str) -> Optional[str]:
|
||||
"""Return the installed version of ``pkg_name``, or None if not installed.
|
||||
|
||||
Uses ``importlib.metadata`` so we don't depend on pip being importable
|
||||
inside the active venv (uv-created venvs may lack pip).
|
||||
"""
|
||||
try:
|
||||
from importlib.metadata import PackageNotFoundError, version
|
||||
except ImportError: # py<3.8 — Hermes requires 3.10+ but defensive.
|
||||
return None
|
||||
try:
|
||||
return version(pkg_name)
|
||||
except PackageNotFoundError:
|
||||
return None
|
||||
except Exception:
|
||||
# Some metadata corruption modes raise ValueError or OSError. Don't
|
||||
# let advisory checking crash the CLI startup path.
|
||||
logger.debug("importlib.metadata.version(%s) raised", pkg_name, exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
def detect_compromised(
|
||||
advisories: Iterable[Advisory] = ADVISORIES,
|
||||
) -> list[AdvisoryHit]:
|
||||
"""Scan installed packages and return all advisory hits.
|
||||
|
||||
A "hit" means an advisory's listed package is installed AND the version
|
||||
is in the compromised set (or the compromised set is empty, meaning
|
||||
*any* version is suspect).
|
||||
"""
|
||||
hits: list[AdvisoryHit] = []
|
||||
for advisory in advisories:
|
||||
for pkg_name, bad_versions in advisory.compromised:
|
||||
installed = _installed_version(pkg_name)
|
||||
if installed is None:
|
||||
continue
|
||||
if not bad_versions or installed in bad_versions:
|
||||
hits.append(AdvisoryHit(
|
||||
advisory=advisory,
|
||||
package=pkg_name,
|
||||
installed_version=installed,
|
||||
))
|
||||
return hits
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Acknowledgement persistence
|
||||
#
|
||||
# Acks live under ``security.acked_advisories`` in config.yaml as a list of
|
||||
# advisory IDs. The list is the only state — no per-host data, no
|
||||
# timestamps, no fingerprints. Users sharing a config.yaml across machines
|
||||
# (rare but possible) get the same dismissal everywhere, which is the
|
||||
# correct behavior for a global advisory.
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def get_acked_ids() -> set[str]:
|
||||
"""Return the set of advisory IDs the user has dismissed.
|
||||
|
||||
Returns an empty set if config can't be loaded (don't block startup
|
||||
just because config is broken — the advisory will keep firing until
|
||||
config is repaired, which is fine).
|
||||
"""
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
cfg = load_config()
|
||||
except Exception:
|
||||
logger.debug("Could not load config for advisory acks", exc_info=True)
|
||||
return set()
|
||||
sec = cfg.get("security") or {}
|
||||
raw = sec.get("acked_advisories") or []
|
||||
if not isinstance(raw, list):
|
||||
return set()
|
||||
return {str(x).strip() for x in raw if str(x).strip()}
|
||||
|
||||
|
||||
def ack_advisory(advisory_id: str) -> bool:
|
||||
"""Persist an ack for ``advisory_id``. Returns True on success.
|
||||
|
||||
Idempotent — acking an already-acked ID is a no-op.
|
||||
"""
|
||||
advisory_id = advisory_id.strip()
|
||||
if not advisory_id:
|
||||
return False
|
||||
try:
|
||||
from hermes_cli.config import load_config, save_config
|
||||
except Exception:
|
||||
logger.warning("Could not import config module to persist ack")
|
||||
return False
|
||||
try:
|
||||
cfg = load_config()
|
||||
sec = cfg.setdefault("security", {})
|
||||
existing = sec.get("acked_advisories") or []
|
||||
if not isinstance(existing, list):
|
||||
existing = []
|
||||
if advisory_id not in existing:
|
||||
existing.append(advisory_id)
|
||||
sec["acked_advisories"] = existing
|
||||
save_config(cfg)
|
||||
return True
|
||||
except Exception:
|
||||
logger.exception("Failed to persist advisory ack for %s", advisory_id)
|
||||
return False
|
||||
|
||||
|
||||
def filter_unacked(hits: list[AdvisoryHit]) -> list[AdvisoryHit]:
|
||||
"""Return only hits whose advisories the user has not dismissed."""
|
||||
if not hits:
|
||||
return []
|
||||
acked = get_acked_ids()
|
||||
return [h for h in hits if h.advisory.id not in acked]
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Rendering helpers
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def _term_supports_color() -> bool:
|
||||
if os.environ.get("NO_COLOR"):
|
||||
return False
|
||||
if not sys.stdout.isatty():
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def short_banner_lines(hits: list[AdvisoryHit]) -> list[str]:
|
||||
"""Return 1-3 short lines suitable for a startup banner.
|
||||
|
||||
Caller is responsible for color/styling. Always names the worst hit
|
||||
explicitly so the user knows what's wrong without running doctor.
|
||||
"""
|
||||
if not hits:
|
||||
return []
|
||||
primary = hits[0]
|
||||
lines = [
|
||||
f"SECURITY ADVISORY [{primary.advisory.id}]: {primary.advisory.title}",
|
||||
f" Detected: {primary.package}=={primary.installed_version}",
|
||||
" Run 'hermes doctor' for remediation steps.",
|
||||
]
|
||||
if len(hits) > 1:
|
||||
lines.insert(1, f" ({len(hits) - 1} additional advisor"
|
||||
f"{'ies' if len(hits) > 2 else 'y'} also active.)")
|
||||
return lines
|
||||
|
||||
|
||||
def full_remediation_text(hit: AdvisoryHit) -> list[str]:
|
||||
"""Return a multi-line block describing the advisory + remediation."""
|
||||
a = hit.advisory
|
||||
lines = [
|
||||
f"=== {a.title} ===",
|
||||
f"ID: {a.id} Severity: {a.severity} Published: {a.published}",
|
||||
f"Detected: {hit.package}=={hit.installed_version}",
|
||||
f"Reference: {a.url}",
|
||||
"",
|
||||
a.summary,
|
||||
"",
|
||||
"Remediation:",
|
||||
]
|
||||
for i, step in enumerate(a.remediation, 1):
|
||||
lines.append(f" {i}. {step}")
|
||||
return lines
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Startup-banner gating
|
||||
#
|
||||
# We do NOT want to hammer the user with the banner on every command. Once
|
||||
# they've seen it inside a 24h window we cache that fact in
|
||||
# ``~/.hermes/cache/advisory_banner_seen`` (a single line per advisory ID:
|
||||
# ``<id> <iso8601_timestamp>``).
|
||||
#
|
||||
# Acked advisories never re-banner. Cached-but-not-acked advisories
|
||||
# re-banner after 24h so the user doesn't fully forget.
|
||||
# =============================================================================
|
||||
|
||||
|
||||
_BANNER_CACHE_FILE = "advisory_banner_seen"
|
||||
_BANNER_REPEAT_HOURS = 24
|
||||
|
||||
|
||||
def _banner_cache_path() -> Optional[Path]:
|
||||
try:
|
||||
from hermes_constants import get_hermes_home
|
||||
cache_dir = Path(get_hermes_home()) / "cache"
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
return cache_dir / _BANNER_CACHE_FILE
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _read_banner_cache() -> dict[str, float]:
|
||||
p = _banner_cache_path()
|
||||
if p is None or not p.exists():
|
||||
return {}
|
||||
out: dict[str, float] = {}
|
||||
try:
|
||||
for line in p.read_text(encoding="utf-8").splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
parts = line.split(None, 1)
|
||||
if len(parts) != 2:
|
||||
continue
|
||||
advisory_id, ts = parts
|
||||
try:
|
||||
out[advisory_id] = float(ts)
|
||||
except ValueError:
|
||||
continue
|
||||
except Exception:
|
||||
return {}
|
||||
return out
|
||||
|
||||
|
||||
def _write_banner_cache(seen: dict[str, float]) -> None:
|
||||
p = _banner_cache_path()
|
||||
if p is None:
|
||||
return
|
||||
try:
|
||||
lines = [f"{aid} {ts}" for aid, ts in seen.items()]
|
||||
p.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||||
except Exception:
|
||||
logger.debug("Could not write advisory banner cache", exc_info=True)
|
||||
|
||||
|
||||
def hits_due_for_banner(
|
||||
hits: list[AdvisoryHit],
|
||||
*,
|
||||
repeat_hours: int = _BANNER_REPEAT_HOURS,
|
||||
) -> list[AdvisoryHit]:
|
||||
"""Return only hits whose banner is due (not acked, not recently shown).
|
||||
|
||||
Side effect: stamps the banner cache for any hit that's about to be
|
||||
shown. Callers should subsequently render the result.
|
||||
"""
|
||||
import time
|
||||
|
||||
fresh = filter_unacked(hits)
|
||||
if not fresh:
|
||||
return []
|
||||
now = time.time()
|
||||
cache = _read_banner_cache()
|
||||
cutoff = now - (repeat_hours * 3600)
|
||||
|
||||
due: list[AdvisoryHit] = []
|
||||
for hit in fresh:
|
||||
last = cache.get(hit.advisory.id, 0.0)
|
||||
if last < cutoff:
|
||||
due.append(hit)
|
||||
cache[hit.advisory.id] = now
|
||||
if due:
|
||||
_write_banner_cache(cache)
|
||||
return due
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Public entry points used by doctor / CLI / gateway
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def render_doctor_section(hits: list[AdvisoryHit]) -> tuple[bool, list[str]]:
|
||||
"""Render the security-advisory section for ``hermes doctor``.
|
||||
|
||||
Returns ``(has_problems, lines)``. Caller is responsible for printing
|
||||
with whatever color scheme it uses.
|
||||
"""
|
||||
fresh = filter_unacked(hits)
|
||||
if not fresh:
|
||||
return False, ["No active security advisories. ✓"]
|
||||
|
||||
lines: list[str] = []
|
||||
for i, hit in enumerate(fresh):
|
||||
if i:
|
||||
lines.append("")
|
||||
lines.extend(full_remediation_text(hit))
|
||||
return True, lines
|
||||
|
||||
|
||||
def startup_banner(hits: list[AdvisoryHit]) -> Optional[str]:
|
||||
"""Return a printable startup banner, or None if nothing is due.
|
||||
|
||||
Updates the banner cache as a side effect (so the next call within
|
||||
24h returns None for the same hit).
|
||||
"""
|
||||
due = hits_due_for_banner(hits)
|
||||
if not due:
|
||||
return None
|
||||
lines = short_banner_lines(due)
|
||||
if _term_supports_color():
|
||||
red = "\x1b[1;31m"
|
||||
reset = "\x1b[0m"
|
||||
return red + "\n".join(lines) + reset
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def gateway_log_message(hits: list[AdvisoryHit]) -> Optional[str]:
|
||||
"""Return a one-line log message for gateway operators, or None."""
|
||||
fresh = filter_unacked(hits)
|
||||
if not fresh:
|
||||
return None
|
||||
if len(fresh) == 1:
|
||||
h = fresh[0]
|
||||
return (f"Security advisory [{h.advisory.id}] active: "
|
||||
f"{h.package}=={h.installed_version} matches {h.advisory.title}. "
|
||||
f"See {h.advisory.url}")
|
||||
return (f"{len(fresh)} security advisories active "
|
||||
f"(IDs: {', '.join(h.advisory.id for h in fresh)}). "
|
||||
f"Run `hermes doctor` on the gateway host for details.")
|
||||
|
|
@ -56,10 +56,22 @@ try:
|
|||
from fastapi.staticfiles import StaticFiles
|
||||
from pydantic import BaseModel
|
||||
except ImportError:
|
||||
raise SystemExit(
|
||||
"Web UI requires fastapi and uvicorn.\n"
|
||||
f"Install with: {sys.executable} -m pip install 'fastapi' 'uvicorn[standard]'"
|
||||
)
|
||||
# First try lazy-installing the dashboard extras. Only the user actually
|
||||
# running `hermes dashboard` needs fastapi+uvicorn; lazy install keeps
|
||||
# them out of every other install path. After install, re-import.
|
||||
try:
|
||||
from tools.lazy_deps import ensure as _lazy_ensure
|
||||
_lazy_ensure("tool.dashboard", prompt=False)
|
||||
from fastapi import FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, Response
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from pydantic import BaseModel
|
||||
except Exception:
|
||||
raise SystemExit(
|
||||
"Web UI requires fastapi and uvicorn.\n"
|
||||
f"Install with: {sys.executable} -m pip install 'fastapi' 'uvicorn[standard]'"
|
||||
)
|
||||
|
||||
WEB_DIST = Path(os.environ["HERMES_WEB_DIST"]) if "HERMES_WEB_DIST" in os.environ else Path(__file__).parent / "web_dist"
|
||||
_log = logging.getLogger(__name__)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue