mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Users can declare shell scripts in config.yaml under a hooks: block that fire on plugin-hook events (pre_tool_call, post_tool_call, pre_llm_call, subagent_stop, etc). Scripts receive JSON on stdin, can return JSON on stdout to block tool calls or inject context pre-LLM. Key design: - Registers closures on existing PluginManager._hooks dict — zero changes to invoke_hook() call sites - subprocess.run(shell=False) via shlex.split — no shell injection - First-use consent per (event, command) pair, persisted to allowlist JSON - Bypass via --accept-hooks, HERMES_ACCEPT_HOOKS=1, or hooks_auto_accept - hermes hooks list/test/revoke/doctor CLI subcommands - Adds subagent_stop hook event fired after delegate_task children exit - Claude Code compatible response shapes accepted Cherry-picked from PR #13143 by @pefontana.
831 lines
29 KiB
Python
831 lines
29 KiB
Python
"""
|
||
Shell-script hooks bridge.
|
||
|
||
Reads the ``hooks:`` block from ``cli-config.yaml``, prompts the user for
|
||
consent on first use of each ``(event, command)`` pair, and registers
|
||
callbacks on the existing plugin hook manager so every existing
|
||
``invoke_hook()`` site dispatches to the configured shell scripts — with
|
||
zero changes to call sites.
|
||
|
||
Design notes
|
||
------------
|
||
* Python plugins and shell hooks compose naturally: both flow through
|
||
:func:`hermes_cli.plugins.invoke_hook` and its aggregators. Python
|
||
plugins are registered first (via ``discover_and_load()``) so their
|
||
block decisions win ties over shell-hook blocks.
|
||
* Subprocess execution uses ``shlex.split(os.path.expanduser(command))``
|
||
with ``shell=False`` — no shell injection footguns. Users that need
|
||
pipes/redirection wrap their logic in a script.
|
||
* First-use consent is gated by the allowlist under
|
||
``~/.hermes/shell-hooks-allowlist.json``. Non-TTY callers must pass
|
||
``accept_hooks=True`` (resolved from ``--accept-hooks``,
|
||
``HERMES_ACCEPT_HOOKS``, or ``hooks_auto_accept: true`` in config)
|
||
for registration to succeed without a prompt.
|
||
* Registration is idempotent — safe to invoke from both the CLI entry
|
||
point (``hermes_cli/main.py``) and the gateway entry point
|
||
(``gateway/run.py``).
|
||
|
||
Wire protocol
|
||
-------------
|
||
**stdin** (JSON, piped to the script)::
|
||
|
||
{
|
||
"hook_event_name": "pre_tool_call",
|
||
"tool_name": "terminal",
|
||
"tool_input": {"command": "rm -rf /"},
|
||
"session_id": "sess_abc123",
|
||
"cwd": "/home/user/project",
|
||
"extra": {...} # event-specific kwargs
|
||
}
|
||
|
||
**stdout** (JSON, optional — anything else is ignored)::
|
||
|
||
# Block a pre_tool_call (either shape accepted; normalised internally):
|
||
{"decision": "block", "reason": "Forbidden command"} # Claude-Code-style
|
||
{"action": "block", "message": "Forbidden command"} # Hermes-canonical
|
||
|
||
# Inject context for pre_llm_call:
|
||
{"context": "Today is Friday"}
|
||
|
||
# Silent no-op:
|
||
<empty or any non-matching JSON object>
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import difflib
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import shlex
|
||
import subprocess
|
||
import sys
|
||
import tempfile
|
||
import threading
|
||
import time
|
||
from contextlib import contextmanager
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple
|
||
|
||
try:
|
||
import fcntl # POSIX only; Windows falls back to best-effort without flock.
|
||
except ImportError: # pragma: no cover
|
||
fcntl = None # type: ignore[assignment]
|
||
|
||
from hermes_constants import get_hermes_home
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
DEFAULT_TIMEOUT_SECONDS = 60
|
||
MAX_TIMEOUT_SECONDS = 300
|
||
ALLOWLIST_FILENAME = "shell-hooks-allowlist.json"
|
||
|
||
# (event, matcher, command) triples that have been wired to the plugin
|
||
# manager in the current process. Matcher is part of the key because
|
||
# the same script can legitimately register for different matchers under
|
||
# the same event (e.g. one entry per tool the user wants to gate).
|
||
# Second registration attempts for the exact same triple become no-ops
|
||
# so the CLI and gateway can both call register_from_config() safely.
|
||
_registered: Set[Tuple[str, Optional[str], str]] = set()
|
||
_registered_lock = threading.Lock()
|
||
|
||
# Intra-process lock for allowlist read-modify-write on platforms that
|
||
# lack ``fcntl`` (non-POSIX). Kept separate from ``_registered_lock``
|
||
# because ``register_from_config`` already holds ``_registered_lock`` when
|
||
# it triggers ``_record_approval`` — reusing it here would self-deadlock
|
||
# (``threading.Lock`` is non-reentrant). POSIX callers use the sibling
|
||
# ``.lock`` file via ``fcntl.flock`` and bypass this.
|
||
_allowlist_write_lock = threading.Lock()
|
||
|
||
|
||
@dataclass
|
||
class ShellHookSpec:
|
||
"""Parsed and validated representation of a single ``hooks:`` entry."""
|
||
|
||
event: str
|
||
command: str
|
||
matcher: Optional[str] = None
|
||
timeout: int = DEFAULT_TIMEOUT_SECONDS
|
||
compiled_matcher: Optional[re.Pattern] = field(default=None, repr=False)
|
||
|
||
def __post_init__(self) -> None:
|
||
# Strip whitespace introduced by YAML quirks (e.g. multi-line string
|
||
# folding) — a matcher of " terminal" would otherwise silently fail
|
||
# to match "terminal" without any diagnostic.
|
||
if isinstance(self.matcher, str):
|
||
stripped = self.matcher.strip()
|
||
self.matcher = stripped if stripped else None
|
||
if self.matcher:
|
||
try:
|
||
self.compiled_matcher = re.compile(self.matcher)
|
||
except re.error as exc:
|
||
logger.warning(
|
||
"shell hook matcher %r is invalid (%s) — treating as "
|
||
"literal equality", self.matcher, exc,
|
||
)
|
||
self.compiled_matcher = None
|
||
|
||
def matches_tool(self, tool_name: Optional[str]) -> bool:
|
||
if not self.matcher:
|
||
return True
|
||
if tool_name is None:
|
||
return False
|
||
if self.compiled_matcher is not None:
|
||
return self.compiled_matcher.fullmatch(tool_name) is not None
|
||
# compiled_matcher is None only when the regex failed to compile,
|
||
# in which case we already warned and fall back to literal equality.
|
||
return tool_name == self.matcher
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Public API
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def register_from_config(
|
||
cfg: Optional[Dict[str, Any]],
|
||
*,
|
||
accept_hooks: bool = False,
|
||
) -> List[ShellHookSpec]:
|
||
"""Register every configured shell hook on the plugin manager.
|
||
|
||
``cfg`` is the full parsed config dict (``hermes_cli.config.load_config``
|
||
output). The ``hooks:`` key is read out of it. Missing, empty, or
|
||
non-dict ``hooks`` is treated as zero configured hooks.
|
||
|
||
``accept_hooks=True`` skips the TTY consent prompt — the caller is
|
||
promising that the user has opted in via a flag, env var, or config
|
||
setting. ``HERMES_ACCEPT_HOOKS=1`` and ``hooks_auto_accept: true`` are
|
||
also honored inside this function so either CLI or gateway call sites
|
||
pick them up.
|
||
|
||
Returns the list of :class:`ShellHookSpec` entries that ended up wired
|
||
up on the plugin manager. Skipped entries (unknown events, malformed,
|
||
not allowlisted, already registered) are logged but not returned.
|
||
"""
|
||
if not isinstance(cfg, dict):
|
||
return []
|
||
|
||
effective_accept = _resolve_effective_accept(cfg, accept_hooks)
|
||
|
||
specs = _parse_hooks_block(cfg.get("hooks"))
|
||
if not specs:
|
||
return []
|
||
|
||
registered: List[ShellHookSpec] = []
|
||
|
||
# Import lazily — avoids circular imports at module-load time.
|
||
from hermes_cli.plugins import get_plugin_manager
|
||
|
||
manager = get_plugin_manager()
|
||
|
||
# Idempotence + allowlist read happen under the lock; the TTY
|
||
# prompt runs outside so other threads aren't parked on a blocking
|
||
# input(). Mutation re-takes the lock with a defensive idempotence
|
||
# re-check in case two callers ever race through the prompt.
|
||
for spec in specs:
|
||
key = (spec.event, spec.matcher, spec.command)
|
||
with _registered_lock:
|
||
if key in _registered:
|
||
continue
|
||
already_allowlisted = _is_allowlisted(spec.event, spec.command)
|
||
|
||
if not already_allowlisted:
|
||
if not _prompt_and_record(
|
||
spec.event, spec.command, accept_hooks=effective_accept,
|
||
):
|
||
logger.warning(
|
||
"shell hook for %s (%s) not allowlisted — skipped. "
|
||
"Use --accept-hooks / HERMES_ACCEPT_HOOKS=1 / "
|
||
"hooks_auto_accept: true, or approve at the TTY "
|
||
"prompt next run.",
|
||
spec.event, spec.command,
|
||
)
|
||
continue
|
||
|
||
with _registered_lock:
|
||
if key in _registered:
|
||
continue
|
||
manager._hooks.setdefault(spec.event, []).append(_make_callback(spec))
|
||
_registered.add(key)
|
||
registered.append(spec)
|
||
logger.info(
|
||
"shell hook registered: %s -> %s (matcher=%s, timeout=%ds)",
|
||
spec.event, spec.command, spec.matcher, spec.timeout,
|
||
)
|
||
|
||
return registered
|
||
|
||
|
||
def iter_configured_hooks(cfg: Optional[Dict[str, Any]]) -> List[ShellHookSpec]:
|
||
"""Return the parsed ``ShellHookSpec`` entries from config without
|
||
registering anything. Used by ``hermes hooks list`` and ``doctor``."""
|
||
if not isinstance(cfg, dict):
|
||
return []
|
||
return _parse_hooks_block(cfg.get("hooks"))
|
||
|
||
|
||
def reset_for_tests() -> None:
|
||
"""Clear the idempotence set. Test-only helper."""
|
||
with _registered_lock:
|
||
_registered.clear()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Config parsing
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _parse_hooks_block(hooks_cfg: Any) -> List[ShellHookSpec]:
|
||
"""Normalise the ``hooks:`` dict into a flat list of ``ShellHookSpec``.
|
||
|
||
Malformed entries warn-and-skip — we never raise from config parsing
|
||
because a broken hook must not crash the agent.
|
||
"""
|
||
from hermes_cli.plugins import VALID_HOOKS
|
||
|
||
if not isinstance(hooks_cfg, dict):
|
||
return []
|
||
|
||
specs: List[ShellHookSpec] = []
|
||
|
||
for event_name, entries in hooks_cfg.items():
|
||
if event_name not in VALID_HOOKS:
|
||
suggestion = difflib.get_close_matches(
|
||
str(event_name), VALID_HOOKS, n=1, cutoff=0.6,
|
||
)
|
||
if suggestion:
|
||
logger.warning(
|
||
"unknown hook event %r in hooks: config — did you mean %r?",
|
||
event_name, suggestion[0],
|
||
)
|
||
else:
|
||
logger.warning(
|
||
"unknown hook event %r in hooks: config (valid: %s)",
|
||
event_name, ", ".join(sorted(VALID_HOOKS)),
|
||
)
|
||
continue
|
||
|
||
if entries is None:
|
||
continue
|
||
|
||
if not isinstance(entries, list):
|
||
logger.warning(
|
||
"hooks.%s must be a list of hook definitions; got %s",
|
||
event_name, type(entries).__name__,
|
||
)
|
||
continue
|
||
|
||
for i, raw in enumerate(entries):
|
||
spec = _parse_single_entry(event_name, i, raw)
|
||
if spec is not None:
|
||
specs.append(spec)
|
||
|
||
return specs
|
||
|
||
|
||
def _parse_single_entry(
|
||
event: str, index: int, raw: Any,
|
||
) -> Optional[ShellHookSpec]:
|
||
if not isinstance(raw, dict):
|
||
logger.warning(
|
||
"hooks.%s[%d] must be a mapping with a 'command' key; got %s",
|
||
event, index, type(raw).__name__,
|
||
)
|
||
return None
|
||
|
||
command = raw.get("command")
|
||
if not isinstance(command, str) or not command.strip():
|
||
logger.warning(
|
||
"hooks.%s[%d] is missing a non-empty 'command' field",
|
||
event, index,
|
||
)
|
||
return None
|
||
|
||
matcher = raw.get("matcher")
|
||
if matcher is not None and not isinstance(matcher, str):
|
||
logger.warning(
|
||
"hooks.%s[%d].matcher must be a string regex; ignoring",
|
||
event, index,
|
||
)
|
||
matcher = None
|
||
|
||
if matcher is not None and event not in ("pre_tool_call", "post_tool_call"):
|
||
logger.warning(
|
||
"hooks.%s[%d].matcher=%r will be ignored at runtime — the "
|
||
"matcher field is only honored for pre_tool_call / "
|
||
"post_tool_call. The hook will fire on every %s event.",
|
||
event, index, matcher, event,
|
||
)
|
||
matcher = None
|
||
|
||
timeout_raw = raw.get("timeout", DEFAULT_TIMEOUT_SECONDS)
|
||
try:
|
||
timeout = int(timeout_raw)
|
||
except (TypeError, ValueError):
|
||
logger.warning(
|
||
"hooks.%s[%d].timeout must be an int (got %r); using default %ds",
|
||
event, index, timeout_raw, DEFAULT_TIMEOUT_SECONDS,
|
||
)
|
||
timeout = DEFAULT_TIMEOUT_SECONDS
|
||
|
||
if timeout < 1:
|
||
logger.warning(
|
||
"hooks.%s[%d].timeout must be >=1; using default %ds",
|
||
event, index, DEFAULT_TIMEOUT_SECONDS,
|
||
)
|
||
timeout = DEFAULT_TIMEOUT_SECONDS
|
||
|
||
if timeout > MAX_TIMEOUT_SECONDS:
|
||
logger.warning(
|
||
"hooks.%s[%d].timeout=%ds exceeds max %ds; clamping",
|
||
event, index, timeout, MAX_TIMEOUT_SECONDS,
|
||
)
|
||
timeout = MAX_TIMEOUT_SECONDS
|
||
|
||
return ShellHookSpec(
|
||
event=event,
|
||
command=command.strip(),
|
||
matcher=matcher,
|
||
timeout=timeout,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Subprocess callback
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_TOP_LEVEL_PAYLOAD_KEYS = {"tool_name", "args", "session_id", "parent_session_id"}
|
||
|
||
|
||
def _spawn(spec: ShellHookSpec, stdin_json: str) -> Dict[str, Any]:
|
||
"""Run ``spec.command`` as a subprocess with ``stdin_json`` on stdin.
|
||
|
||
Returns a diagnostic dict with the same keys for every outcome
|
||
(``returncode``, ``stdout``, ``stderr``, ``timed_out``,
|
||
``elapsed_seconds``, ``error``). This is the single place the
|
||
subprocess is actually invoked — both the live callback path
|
||
(:func:`_make_callback`) and the CLI test helper (:func:`run_once`)
|
||
go through it.
|
||
"""
|
||
result: Dict[str, Any] = {
|
||
"returncode": None,
|
||
"stdout": "",
|
||
"stderr": "",
|
||
"timed_out": False,
|
||
"elapsed_seconds": 0.0,
|
||
"error": None,
|
||
}
|
||
try:
|
||
argv = shlex.split(os.path.expanduser(spec.command))
|
||
except ValueError as exc:
|
||
result["error"] = f"command {spec.command!r} cannot be parsed: {exc}"
|
||
return result
|
||
if not argv:
|
||
result["error"] = "empty command"
|
||
return result
|
||
|
||
t0 = time.monotonic()
|
||
try:
|
||
proc = subprocess.run(
|
||
argv,
|
||
input=stdin_json,
|
||
capture_output=True,
|
||
timeout=spec.timeout,
|
||
text=True,
|
||
shell=False,
|
||
)
|
||
except subprocess.TimeoutExpired:
|
||
result["timed_out"] = True
|
||
result["elapsed_seconds"] = round(time.monotonic() - t0, 3)
|
||
return result
|
||
except FileNotFoundError:
|
||
result["error"] = "command not found"
|
||
return result
|
||
except PermissionError:
|
||
result["error"] = "command not executable"
|
||
return result
|
||
except Exception as exc: # pragma: no cover — defensive
|
||
result["error"] = str(exc)
|
||
return result
|
||
|
||
result["returncode"] = proc.returncode
|
||
result["stdout"] = proc.stdout or ""
|
||
result["stderr"] = proc.stderr or ""
|
||
result["elapsed_seconds"] = round(time.monotonic() - t0, 3)
|
||
return result
|
||
|
||
|
||
def _make_callback(spec: ShellHookSpec) -> Callable[..., Optional[Dict[str, Any]]]:
|
||
"""Build the closure that ``invoke_hook()`` will call per firing."""
|
||
|
||
def _callback(**kwargs: Any) -> Optional[Dict[str, Any]]:
|
||
# Matcher gate — only meaningful for tool-scoped events.
|
||
if spec.event in ("pre_tool_call", "post_tool_call"):
|
||
if not spec.matches_tool(kwargs.get("tool_name")):
|
||
return None
|
||
|
||
r = _spawn(spec, _serialize_payload(spec.event, kwargs))
|
||
|
||
if r["error"]:
|
||
logger.warning(
|
||
"shell hook failed (event=%s command=%s): %s",
|
||
spec.event, spec.command, r["error"],
|
||
)
|
||
return None
|
||
if r["timed_out"]:
|
||
logger.warning(
|
||
"shell hook timed out after %.2fs (event=%s command=%s)",
|
||
r["elapsed_seconds"], spec.event, spec.command,
|
||
)
|
||
return None
|
||
|
||
stderr = r["stderr"].strip()
|
||
if stderr:
|
||
logger.debug(
|
||
"shell hook stderr (event=%s command=%s): %s",
|
||
spec.event, spec.command, stderr[:400],
|
||
)
|
||
# Non-zero exits: log but still parse stdout so scripts that
|
||
# signal failure via exit code can also return a block directive.
|
||
if r["returncode"] != 0:
|
||
logger.warning(
|
||
"shell hook exited %d (event=%s command=%s); stderr=%s",
|
||
r["returncode"], spec.event, spec.command, stderr[:400],
|
||
)
|
||
return _parse_response(spec.event, r["stdout"])
|
||
|
||
_callback.__name__ = f"shell_hook[{spec.event}:{spec.command}]"
|
||
_callback.__qualname__ = _callback.__name__
|
||
return _callback
|
||
|
||
|
||
def _serialize_payload(event: str, kwargs: Dict[str, Any]) -> str:
|
||
"""Render the stdin JSON payload. Unserialisable values are
|
||
stringified via ``default=str`` rather than dropped."""
|
||
extras = {k: v for k, v in kwargs.items() if k not in _TOP_LEVEL_PAYLOAD_KEYS}
|
||
try:
|
||
cwd = str(Path.cwd())
|
||
except OSError:
|
||
cwd = ""
|
||
payload = {
|
||
"hook_event_name": event,
|
||
"tool_name": kwargs.get("tool_name"),
|
||
"tool_input": kwargs.get("args") if isinstance(kwargs.get("args"), dict) else None,
|
||
"session_id": kwargs.get("session_id") or kwargs.get("parent_session_id") or "",
|
||
"cwd": cwd,
|
||
"extra": extras,
|
||
}
|
||
return json.dumps(payload, ensure_ascii=False, default=str)
|
||
|
||
|
||
def _parse_response(event: str, stdout: str) -> Optional[Dict[str, Any]]:
|
||
"""Translate stdout JSON into a Hermes wire-shape dict.
|
||
|
||
For ``pre_tool_call`` the Claude-Code-style ``{"decision": "block",
|
||
"reason": "..."}`` payload is translated into the canonical Hermes
|
||
``{"action": "block", "message": "..."}`` shape expected by
|
||
:func:`hermes_cli.plugins.get_pre_tool_call_block_message`. This is
|
||
the single most important correctness invariant in this module —
|
||
skipping the translation silently breaks every ``pre_tool_call``
|
||
block directive.
|
||
|
||
For ``pre_llm_call``, ``{"context": "..."}`` is passed through
|
||
unchanged to match the existing plugin-hook contract.
|
||
|
||
Anything else returns ``None``.
|
||
"""
|
||
stdout = (stdout or "").strip()
|
||
if not stdout:
|
||
return None
|
||
|
||
try:
|
||
data = json.loads(stdout)
|
||
except json.JSONDecodeError:
|
||
logger.warning(
|
||
"shell hook stdout was not valid JSON (event=%s): %s",
|
||
event, stdout[:200],
|
||
)
|
||
return None
|
||
|
||
if not isinstance(data, dict):
|
||
return None
|
||
|
||
if event == "pre_tool_call":
|
||
if data.get("action") == "block":
|
||
message = data.get("message") or data.get("reason") or ""
|
||
if isinstance(message, str) and message:
|
||
return {"action": "block", "message": message}
|
||
if data.get("decision") == "block":
|
||
message = data.get("reason") or data.get("message") or ""
|
||
if isinstance(message, str) and message:
|
||
return {"action": "block", "message": message}
|
||
return None
|
||
|
||
context = data.get("context")
|
||
if isinstance(context, str) and context.strip():
|
||
return {"context": context}
|
||
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Allowlist / consent
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def allowlist_path() -> Path:
|
||
"""Path to the per-user shell-hook allowlist file."""
|
||
return get_hermes_home() / ALLOWLIST_FILENAME
|
||
|
||
|
||
def load_allowlist() -> Dict[str, Any]:
|
||
"""Return the parsed allowlist, or an empty skeleton if absent."""
|
||
try:
|
||
raw = json.loads(allowlist_path().read_text())
|
||
except (FileNotFoundError, json.JSONDecodeError, OSError):
|
||
return {"approvals": []}
|
||
if not isinstance(raw, dict):
|
||
return {"approvals": []}
|
||
approvals = raw.get("approvals")
|
||
if not isinstance(approvals, list):
|
||
raw["approvals"] = []
|
||
return raw
|
||
|
||
|
||
def save_allowlist(data: Dict[str, Any]) -> None:
|
||
"""Atomically persist the allowlist via per-process ``mkstemp`` +
|
||
``os.replace``. Cross-process read-modify-write races are handled
|
||
by :func:`_locked_update_approvals` (``fcntl.flock``). On OSError
|
||
the failure is logged; the in-process hook still registers but
|
||
the approval won't survive across runs."""
|
||
p = allowlist_path()
|
||
try:
|
||
p.parent.mkdir(parents=True, exist_ok=True)
|
||
fd, tmp_path = tempfile.mkstemp(
|
||
prefix=f"{p.name}.", suffix=".tmp", dir=str(p.parent),
|
||
)
|
||
try:
|
||
with os.fdopen(fd, "w") as fh:
|
||
fh.write(json.dumps(data, indent=2, sort_keys=True))
|
||
os.replace(tmp_path, p)
|
||
except Exception:
|
||
try:
|
||
os.unlink(tmp_path)
|
||
except OSError:
|
||
pass
|
||
raise
|
||
except OSError as exc:
|
||
logger.warning(
|
||
"Failed to persist shell hook allowlist to %s: %s. "
|
||
"The approval is in-memory for this run, but the next "
|
||
"startup will re-prompt (or skip registration on non-TTY "
|
||
"runs without --accept-hooks / HERMES_ACCEPT_HOOKS).",
|
||
p, exc,
|
||
)
|
||
|
||
|
||
def _is_allowlisted(event: str, command: str) -> bool:
|
||
data = load_allowlist()
|
||
return any(
|
||
isinstance(e, dict)
|
||
and e.get("event") == event
|
||
and e.get("command") == command
|
||
for e in data.get("approvals", [])
|
||
)
|
||
|
||
|
||
@contextmanager
|
||
def _locked_update_approvals() -> Iterator[Dict[str, Any]]:
|
||
"""Serialise read-modify-write on the allowlist across processes.
|
||
|
||
Holds an exclusive ``flock`` on a sibling lock file for the duration
|
||
of the update so concurrent ``_record_approval``/``revoke`` callers
|
||
cannot clobber each other's changes (the race Codex reproduced with
|
||
20–50 simultaneous writers). Falls back to an in-process lock on
|
||
platforms without ``fcntl``.
|
||
"""
|
||
p = allowlist_path()
|
||
p.parent.mkdir(parents=True, exist_ok=True)
|
||
lock_path = p.with_suffix(p.suffix + ".lock")
|
||
|
||
if fcntl is None: # pragma: no cover — non-POSIX fallback
|
||
with _allowlist_write_lock:
|
||
data = load_allowlist()
|
||
yield data
|
||
save_allowlist(data)
|
||
return
|
||
|
||
with open(lock_path, "a+") as lock_fh:
|
||
fcntl.flock(lock_fh.fileno(), fcntl.LOCK_EX)
|
||
try:
|
||
data = load_allowlist()
|
||
yield data
|
||
save_allowlist(data)
|
||
finally:
|
||
fcntl.flock(lock_fh.fileno(), fcntl.LOCK_UN)
|
||
|
||
|
||
def _prompt_and_record(
|
||
event: str, command: str, *, accept_hooks: bool,
|
||
) -> bool:
|
||
"""Decide whether to approve an unseen ``(event, command)`` pair.
|
||
Returns ``True`` iff the approval was granted and recorded.
|
||
"""
|
||
if accept_hooks:
|
||
_record_approval(event, command)
|
||
logger.info(
|
||
"shell hook auto-approved via --accept-hooks / env / config: "
|
||
"%s -> %s", event, command,
|
||
)
|
||
return True
|
||
|
||
if not sys.stdin.isatty():
|
||
return False
|
||
|
||
print(
|
||
f"\n⚠ Hermes is about to register a shell hook that will run a\n"
|
||
f" command on your behalf.\n\n"
|
||
f" Event: {event}\n"
|
||
f" Command: {command}\n\n"
|
||
f" Commands run with your full user credentials. Only approve\n"
|
||
f" commands you trust."
|
||
)
|
||
try:
|
||
answer = input("Allow this hook to run? [y/N]: ").strip().lower()
|
||
except (EOFError, KeyboardInterrupt):
|
||
print() # keep the terminal tidy after ^C
|
||
return False
|
||
|
||
if answer in ("y", "yes"):
|
||
_record_approval(event, command)
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def _record_approval(event: str, command: str) -> None:
|
||
entry = {
|
||
"event": event,
|
||
"command": command,
|
||
"approved_at": _utc_now_iso(),
|
||
"script_mtime_at_approval": script_mtime_iso(command),
|
||
}
|
||
with _locked_update_approvals() as data:
|
||
data["approvals"] = [
|
||
e for e in data.get("approvals", [])
|
||
if not (
|
||
isinstance(e, dict)
|
||
and e.get("event") == event
|
||
and e.get("command") == command
|
||
)
|
||
] + [entry]
|
||
|
||
|
||
def _utc_now_iso() -> str:
|
||
return datetime.now(tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
||
|
||
|
||
def revoke(command: str) -> int:
|
||
"""Remove every allowlist entry matching ``command``.
|
||
|
||
Returns the number of entries removed. Does not unregister any
|
||
callbacks that are already live on the plugin manager in the current
|
||
process — restart the CLI / gateway to drop them.
|
||
"""
|
||
with _locked_update_approvals() as data:
|
||
before = len(data.get("approvals", []))
|
||
data["approvals"] = [
|
||
e for e in data.get("approvals", [])
|
||
if not (isinstance(e, dict) and e.get("command") == command)
|
||
]
|
||
after = len(data["approvals"])
|
||
return before - after
|
||
|
||
|
||
_SCRIPT_EXTENSIONS: Tuple[str, ...] = (
|
||
".sh", ".bash", ".zsh", ".fish",
|
||
".py", ".pyw",
|
||
".rb", ".pl", ".lua",
|
||
".js", ".mjs", ".cjs", ".ts",
|
||
)
|
||
|
||
|
||
def _command_script_path(command: str) -> str:
|
||
"""Return the script path from ``command`` for doctor / drift checks.
|
||
|
||
Prefers a token ending in a known script extension, then a token
|
||
containing ``/`` or leading ``~``, then the first token. Handles
|
||
``python3 /path/hook.py``, ``/usr/bin/env bash hook.sh``, and the
|
||
common bare-path form.
|
||
"""
|
||
try:
|
||
parts = shlex.split(command)
|
||
except ValueError:
|
||
return command
|
||
if not parts:
|
||
return command
|
||
for part in parts:
|
||
if part.lower().endswith(_SCRIPT_EXTENSIONS):
|
||
return part
|
||
for part in parts:
|
||
if "/" in part or part.startswith("~"):
|
||
return part
|
||
return parts[0]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers for accept-hooks resolution
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _resolve_effective_accept(
|
||
cfg: Dict[str, Any], accept_hooks_arg: bool,
|
||
) -> bool:
|
||
"""Combine all three opt-in channels into a single boolean.
|
||
|
||
Precedence (any truthy source flips us on):
|
||
1. ``--accept-hooks`` flag (CLI) / explicit argument
|
||
2. ``HERMES_ACCEPT_HOOKS`` env var
|
||
3. ``hooks_auto_accept: true`` in ``cli-config.yaml``
|
||
"""
|
||
if accept_hooks_arg:
|
||
return True
|
||
env = os.environ.get("HERMES_ACCEPT_HOOKS", "").strip().lower()
|
||
if env in ("1", "true", "yes", "on"):
|
||
return True
|
||
cfg_val = cfg.get("hooks_auto_accept", False)
|
||
return bool(cfg_val)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Introspection (used by `hermes hooks` CLI)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def allowlist_entry_for(event: str, command: str) -> Optional[Dict[str, Any]]:
|
||
"""Return the allowlist record for this pair, if any."""
|
||
for e in load_allowlist().get("approvals", []):
|
||
if (
|
||
isinstance(e, dict)
|
||
and e.get("event") == event
|
||
and e.get("command") == command
|
||
):
|
||
return e
|
||
return None
|
||
|
||
|
||
def script_mtime_iso(command: str) -> Optional[str]:
|
||
"""ISO-8601 mtime of the resolved script path, or ``None`` if the
|
||
script is missing."""
|
||
path = _command_script_path(command)
|
||
if not path:
|
||
return None
|
||
try:
|
||
expanded = os.path.expanduser(path)
|
||
return datetime.fromtimestamp(
|
||
os.path.getmtime(expanded), tz=timezone.utc,
|
||
).isoformat().replace("+00:00", "Z")
|
||
except OSError:
|
||
return None
|
||
|
||
|
||
def script_is_executable(command: str) -> bool:
|
||
"""Return ``True`` iff ``command`` is runnable as configured.
|
||
|
||
For a bare invocation (``/path/hook.sh``) the script itself must be
|
||
executable. For interpreter-prefixed commands (``python3
|
||
/path/hook.py``, ``/usr/bin/env bash hook.sh``) the script just has
|
||
to be readable — the interpreter doesn't care about the ``X_OK``
|
||
bit. Mirrors what ``_spawn`` would actually do at runtime."""
|
||
path = _command_script_path(command)
|
||
if not path:
|
||
return False
|
||
expanded = os.path.expanduser(path)
|
||
if not os.path.isfile(expanded):
|
||
return False
|
||
try:
|
||
argv = shlex.split(command)
|
||
except ValueError:
|
||
return False
|
||
is_bare_invocation = bool(argv) and argv[0] == path
|
||
required = os.X_OK if is_bare_invocation else os.R_OK
|
||
return os.access(expanded, required)
|
||
|
||
|
||
def run_once(
|
||
spec: ShellHookSpec, kwargs: Dict[str, Any],
|
||
) -> Dict[str, Any]:
|
||
"""Fire a single shell-hook invocation with a synthetic payload.
|
||
Used by ``hermes hooks test`` and ``hermes hooks doctor``.
|
||
|
||
``kwargs`` is the same dict that :func:`hermes_cli.plugins.invoke_hook`
|
||
would pass at runtime. It is routed through :func:`_serialize_payload`
|
||
so the synthetic stdin exactly matches what a real hook firing would
|
||
produce — otherwise scripts tested via ``hermes hooks test`` could
|
||
diverge silently from production behaviour.
|
||
|
||
Returns the :func:`_spawn` diagnostic dict plus a ``parsed`` field
|
||
holding the canonical Hermes-wire-shape response."""
|
||
stdin_json = _serialize_payload(spec.event, kwargs)
|
||
result = _spawn(spec, stdin_json)
|
||
result["parsed"] = _parse_response(spec.event, result["stdout"])
|
||
return result
|