diff --git a/agent/shell_hooks.py b/agent/shell_hooks.py new file mode 100644 index 000000000..b579ad5b8 --- /dev/null +++ b/agent/shell_hooks.py @@ -0,0 +1,831 @@ +""" +Shell-script hooks bridge. + +Reads the ``hooks:`` block from ``cli-config.yaml``, prompts the user for +consent on first use of each ``(event, command)`` pair, and registers +callbacks on the existing plugin hook manager so every existing +``invoke_hook()`` site dispatches to the configured shell scripts — with +zero changes to call sites. + +Design notes +------------ +* Python plugins and shell hooks compose naturally: both flow through + :func:`hermes_cli.plugins.invoke_hook` and its aggregators. Python + plugins are registered first (via ``discover_and_load()``) so their + block decisions win ties over shell-hook blocks. +* Subprocess execution uses ``shlex.split(os.path.expanduser(command))`` + with ``shell=False`` — no shell injection footguns. Users that need + pipes/redirection wrap their logic in a script. +* First-use consent is gated by the allowlist under + ``~/.hermes/shell-hooks-allowlist.json``. Non-TTY callers must pass + ``accept_hooks=True`` (resolved from ``--accept-hooks``, + ``HERMES_ACCEPT_HOOKS``, or ``hooks_auto_accept: true`` in config) + for registration to succeed without a prompt. +* Registration is idempotent — safe to invoke from both the CLI entry + point (``hermes_cli/main.py``) and the gateway entry point + (``gateway/run.py``). + +Wire protocol +------------- +**stdin** (JSON, piped to the script):: + + { + "hook_event_name": "pre_tool_call", + "tool_name": "terminal", + "tool_input": {"command": "rm -rf /"}, + "session_id": "sess_abc123", + "cwd": "/home/user/project", + "extra": {...} # event-specific kwargs + } + +**stdout** (JSON, optional — anything else is ignored):: + + # Block a pre_tool_call (either shape accepted; normalised internally): + {"decision": "block", "reason": "Forbidden command"} # Claude-Code-style + {"action": "block", "message": "Forbidden command"} # Hermes-canonical + + # Inject context for pre_llm_call: + {"context": "Today is Friday"} + + # Silent no-op: + +""" + +from __future__ import annotations + +import difflib +import json +import logging +import os +import re +import shlex +import subprocess +import sys +import tempfile +import threading +import time +from contextlib import contextmanager +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple + +try: + import fcntl # POSIX only; Windows falls back to best-effort without flock. +except ImportError: # pragma: no cover + fcntl = None # type: ignore[assignment] + +from hermes_constants import get_hermes_home + +logger = logging.getLogger(__name__) + +DEFAULT_TIMEOUT_SECONDS = 60 +MAX_TIMEOUT_SECONDS = 300 +ALLOWLIST_FILENAME = "shell-hooks-allowlist.json" + +# (event, matcher, command) triples that have been wired to the plugin +# manager in the current process. Matcher is part of the key because +# the same script can legitimately register for different matchers under +# the same event (e.g. one entry per tool the user wants to gate). +# Second registration attempts for the exact same triple become no-ops +# so the CLI and gateway can both call register_from_config() safely. +_registered: Set[Tuple[str, Optional[str], str]] = set() +_registered_lock = threading.Lock() + +# Intra-process lock for allowlist read-modify-write on platforms that +# lack ``fcntl`` (non-POSIX). Kept separate from ``_registered_lock`` +# because ``register_from_config`` already holds ``_registered_lock`` when +# it triggers ``_record_approval`` — reusing it here would self-deadlock +# (``threading.Lock`` is non-reentrant). POSIX callers use the sibling +# ``.lock`` file via ``fcntl.flock`` and bypass this. +_allowlist_write_lock = threading.Lock() + + +@dataclass +class ShellHookSpec: + """Parsed and validated representation of a single ``hooks:`` entry.""" + + event: str + command: str + matcher: Optional[str] = None + timeout: int = DEFAULT_TIMEOUT_SECONDS + compiled_matcher: Optional[re.Pattern] = field(default=None, repr=False) + + def __post_init__(self) -> None: + # Strip whitespace introduced by YAML quirks (e.g. multi-line string + # folding) — a matcher of " terminal" would otherwise silently fail + # to match "terminal" without any diagnostic. + if isinstance(self.matcher, str): + stripped = self.matcher.strip() + self.matcher = stripped if stripped else None + if self.matcher: + try: + self.compiled_matcher = re.compile(self.matcher) + except re.error as exc: + logger.warning( + "shell hook matcher %r is invalid (%s) — treating as " + "literal equality", self.matcher, exc, + ) + self.compiled_matcher = None + + def matches_tool(self, tool_name: Optional[str]) -> bool: + if not self.matcher: + return True + if tool_name is None: + return False + if self.compiled_matcher is not None: + return self.compiled_matcher.fullmatch(tool_name) is not None + # compiled_matcher is None only when the regex failed to compile, + # in which case we already warned and fall back to literal equality. + return tool_name == self.matcher + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def register_from_config( + cfg: Optional[Dict[str, Any]], + *, + accept_hooks: bool = False, +) -> List[ShellHookSpec]: + """Register every configured shell hook on the plugin manager. + + ``cfg`` is the full parsed config dict (``hermes_cli.config.load_config`` + output). The ``hooks:`` key is read out of it. Missing, empty, or + non-dict ``hooks`` is treated as zero configured hooks. + + ``accept_hooks=True`` skips the TTY consent prompt — the caller is + promising that the user has opted in via a flag, env var, or config + setting. ``HERMES_ACCEPT_HOOKS=1`` and ``hooks_auto_accept: true`` are + also honored inside this function so either CLI or gateway call sites + pick them up. + + Returns the list of :class:`ShellHookSpec` entries that ended up wired + up on the plugin manager. Skipped entries (unknown events, malformed, + not allowlisted, already registered) are logged but not returned. + """ + if not isinstance(cfg, dict): + return [] + + effective_accept = _resolve_effective_accept(cfg, accept_hooks) + + specs = _parse_hooks_block(cfg.get("hooks")) + if not specs: + return [] + + registered: List[ShellHookSpec] = [] + + # Import lazily — avoids circular imports at module-load time. + from hermes_cli.plugins import get_plugin_manager + + manager = get_plugin_manager() + + # Idempotence + allowlist read happen under the lock; the TTY + # prompt runs outside so other threads aren't parked on a blocking + # input(). Mutation re-takes the lock with a defensive idempotence + # re-check in case two callers ever race through the prompt. + for spec in specs: + key = (spec.event, spec.matcher, spec.command) + with _registered_lock: + if key in _registered: + continue + already_allowlisted = _is_allowlisted(spec.event, spec.command) + + if not already_allowlisted: + if not _prompt_and_record( + spec.event, spec.command, accept_hooks=effective_accept, + ): + logger.warning( + "shell hook for %s (%s) not allowlisted — skipped. " + "Use --accept-hooks / HERMES_ACCEPT_HOOKS=1 / " + "hooks_auto_accept: true, or approve at the TTY " + "prompt next run.", + spec.event, spec.command, + ) + continue + + with _registered_lock: + if key in _registered: + continue + manager._hooks.setdefault(spec.event, []).append(_make_callback(spec)) + _registered.add(key) + registered.append(spec) + logger.info( + "shell hook registered: %s -> %s (matcher=%s, timeout=%ds)", + spec.event, spec.command, spec.matcher, spec.timeout, + ) + + return registered + + +def iter_configured_hooks(cfg: Optional[Dict[str, Any]]) -> List[ShellHookSpec]: + """Return the parsed ``ShellHookSpec`` entries from config without + registering anything. Used by ``hermes hooks list`` and ``doctor``.""" + if not isinstance(cfg, dict): + return [] + return _parse_hooks_block(cfg.get("hooks")) + + +def reset_for_tests() -> None: + """Clear the idempotence set. Test-only helper.""" + with _registered_lock: + _registered.clear() + + +# --------------------------------------------------------------------------- +# Config parsing +# --------------------------------------------------------------------------- + +def _parse_hooks_block(hooks_cfg: Any) -> List[ShellHookSpec]: + """Normalise the ``hooks:`` dict into a flat list of ``ShellHookSpec``. + + Malformed entries warn-and-skip — we never raise from config parsing + because a broken hook must not crash the agent. + """ + from hermes_cli.plugins import VALID_HOOKS + + if not isinstance(hooks_cfg, dict): + return [] + + specs: List[ShellHookSpec] = [] + + for event_name, entries in hooks_cfg.items(): + if event_name not in VALID_HOOKS: + suggestion = difflib.get_close_matches( + str(event_name), VALID_HOOKS, n=1, cutoff=0.6, + ) + if suggestion: + logger.warning( + "unknown hook event %r in hooks: config — did you mean %r?", + event_name, suggestion[0], + ) + else: + logger.warning( + "unknown hook event %r in hooks: config (valid: %s)", + event_name, ", ".join(sorted(VALID_HOOKS)), + ) + continue + + if entries is None: + continue + + if not isinstance(entries, list): + logger.warning( + "hooks.%s must be a list of hook definitions; got %s", + event_name, type(entries).__name__, + ) + continue + + for i, raw in enumerate(entries): + spec = _parse_single_entry(event_name, i, raw) + if spec is not None: + specs.append(spec) + + return specs + + +def _parse_single_entry( + event: str, index: int, raw: Any, +) -> Optional[ShellHookSpec]: + if not isinstance(raw, dict): + logger.warning( + "hooks.%s[%d] must be a mapping with a 'command' key; got %s", + event, index, type(raw).__name__, + ) + return None + + command = raw.get("command") + if not isinstance(command, str) or not command.strip(): + logger.warning( + "hooks.%s[%d] is missing a non-empty 'command' field", + event, index, + ) + return None + + matcher = raw.get("matcher") + if matcher is not None and not isinstance(matcher, str): + logger.warning( + "hooks.%s[%d].matcher must be a string regex; ignoring", + event, index, + ) + matcher = None + + if matcher is not None and event not in ("pre_tool_call", "post_tool_call"): + logger.warning( + "hooks.%s[%d].matcher=%r will be ignored at runtime — the " + "matcher field is only honored for pre_tool_call / " + "post_tool_call. The hook will fire on every %s event.", + event, index, matcher, event, + ) + matcher = None + + timeout_raw = raw.get("timeout", DEFAULT_TIMEOUT_SECONDS) + try: + timeout = int(timeout_raw) + except (TypeError, ValueError): + logger.warning( + "hooks.%s[%d].timeout must be an int (got %r); using default %ds", + event, index, timeout_raw, DEFAULT_TIMEOUT_SECONDS, + ) + timeout = DEFAULT_TIMEOUT_SECONDS + + if timeout < 1: + logger.warning( + "hooks.%s[%d].timeout must be >=1; using default %ds", + event, index, DEFAULT_TIMEOUT_SECONDS, + ) + timeout = DEFAULT_TIMEOUT_SECONDS + + if timeout > MAX_TIMEOUT_SECONDS: + logger.warning( + "hooks.%s[%d].timeout=%ds exceeds max %ds; clamping", + event, index, timeout, MAX_TIMEOUT_SECONDS, + ) + timeout = MAX_TIMEOUT_SECONDS + + return ShellHookSpec( + event=event, + command=command.strip(), + matcher=matcher, + timeout=timeout, + ) + + +# --------------------------------------------------------------------------- +# Subprocess callback +# --------------------------------------------------------------------------- + +_TOP_LEVEL_PAYLOAD_KEYS = {"tool_name", "args", "session_id", "parent_session_id"} + + +def _spawn(spec: ShellHookSpec, stdin_json: str) -> Dict[str, Any]: + """Run ``spec.command`` as a subprocess with ``stdin_json`` on stdin. + + Returns a diagnostic dict with the same keys for every outcome + (``returncode``, ``stdout``, ``stderr``, ``timed_out``, + ``elapsed_seconds``, ``error``). This is the single place the + subprocess is actually invoked — both the live callback path + (:func:`_make_callback`) and the CLI test helper (:func:`run_once`) + go through it. + """ + result: Dict[str, Any] = { + "returncode": None, + "stdout": "", + "stderr": "", + "timed_out": False, + "elapsed_seconds": 0.0, + "error": None, + } + try: + argv = shlex.split(os.path.expanduser(spec.command)) + except ValueError as exc: + result["error"] = f"command {spec.command!r} cannot be parsed: {exc}" + return result + if not argv: + result["error"] = "empty command" + return result + + t0 = time.monotonic() + try: + proc = subprocess.run( + argv, + input=stdin_json, + capture_output=True, + timeout=spec.timeout, + text=True, + shell=False, + ) + except subprocess.TimeoutExpired: + result["timed_out"] = True + result["elapsed_seconds"] = round(time.monotonic() - t0, 3) + return result + except FileNotFoundError: + result["error"] = "command not found" + return result + except PermissionError: + result["error"] = "command not executable" + return result + except Exception as exc: # pragma: no cover — defensive + result["error"] = str(exc) + return result + + result["returncode"] = proc.returncode + result["stdout"] = proc.stdout or "" + result["stderr"] = proc.stderr or "" + result["elapsed_seconds"] = round(time.monotonic() - t0, 3) + return result + + +def _make_callback(spec: ShellHookSpec) -> Callable[..., Optional[Dict[str, Any]]]: + """Build the closure that ``invoke_hook()`` will call per firing.""" + + def _callback(**kwargs: Any) -> Optional[Dict[str, Any]]: + # Matcher gate — only meaningful for tool-scoped events. + if spec.event in ("pre_tool_call", "post_tool_call"): + if not spec.matches_tool(kwargs.get("tool_name")): + return None + + r = _spawn(spec, _serialize_payload(spec.event, kwargs)) + + if r["error"]: + logger.warning( + "shell hook failed (event=%s command=%s): %s", + spec.event, spec.command, r["error"], + ) + return None + if r["timed_out"]: + logger.warning( + "shell hook timed out after %.2fs (event=%s command=%s)", + r["elapsed_seconds"], spec.event, spec.command, + ) + return None + + stderr = r["stderr"].strip() + if stderr: + logger.debug( + "shell hook stderr (event=%s command=%s): %s", + spec.event, spec.command, stderr[:400], + ) + # Non-zero exits: log but still parse stdout so scripts that + # signal failure via exit code can also return a block directive. + if r["returncode"] != 0: + logger.warning( + "shell hook exited %d (event=%s command=%s); stderr=%s", + r["returncode"], spec.event, spec.command, stderr[:400], + ) + return _parse_response(spec.event, r["stdout"]) + + _callback.__name__ = f"shell_hook[{spec.event}:{spec.command}]" + _callback.__qualname__ = _callback.__name__ + return _callback + + +def _serialize_payload(event: str, kwargs: Dict[str, Any]) -> str: + """Render the stdin JSON payload. Unserialisable values are + stringified via ``default=str`` rather than dropped.""" + extras = {k: v for k, v in kwargs.items() if k not in _TOP_LEVEL_PAYLOAD_KEYS} + try: + cwd = str(Path.cwd()) + except OSError: + cwd = "" + payload = { + "hook_event_name": event, + "tool_name": kwargs.get("tool_name"), + "tool_input": kwargs.get("args") if isinstance(kwargs.get("args"), dict) else None, + "session_id": kwargs.get("session_id") or kwargs.get("parent_session_id") or "", + "cwd": cwd, + "extra": extras, + } + return json.dumps(payload, ensure_ascii=False, default=str) + + +def _parse_response(event: str, stdout: str) -> Optional[Dict[str, Any]]: + """Translate stdout JSON into a Hermes wire-shape dict. + + For ``pre_tool_call`` the Claude-Code-style ``{"decision": "block", + "reason": "..."}`` payload is translated into the canonical Hermes + ``{"action": "block", "message": "..."}`` shape expected by + :func:`hermes_cli.plugins.get_pre_tool_call_block_message`. This is + the single most important correctness invariant in this module — + skipping the translation silently breaks every ``pre_tool_call`` + block directive. + + For ``pre_llm_call``, ``{"context": "..."}`` is passed through + unchanged to match the existing plugin-hook contract. + + Anything else returns ``None``. + """ + stdout = (stdout or "").strip() + if not stdout: + return None + + try: + data = json.loads(stdout) + except json.JSONDecodeError: + logger.warning( + "shell hook stdout was not valid JSON (event=%s): %s", + event, stdout[:200], + ) + return None + + if not isinstance(data, dict): + return None + + if event == "pre_tool_call": + if data.get("action") == "block": + message = data.get("message") or data.get("reason") or "" + if isinstance(message, str) and message: + return {"action": "block", "message": message} + if data.get("decision") == "block": + message = data.get("reason") or data.get("message") or "" + if isinstance(message, str) and message: + return {"action": "block", "message": message} + return None + + context = data.get("context") + if isinstance(context, str) and context.strip(): + return {"context": context} + + return None + + +# --------------------------------------------------------------------------- +# Allowlist / consent +# --------------------------------------------------------------------------- + +def allowlist_path() -> Path: + """Path to the per-user shell-hook allowlist file.""" + return get_hermes_home() / ALLOWLIST_FILENAME + + +def load_allowlist() -> Dict[str, Any]: + """Return the parsed allowlist, or an empty skeleton if absent.""" + try: + raw = json.loads(allowlist_path().read_text()) + except (FileNotFoundError, json.JSONDecodeError, OSError): + return {"approvals": []} + if not isinstance(raw, dict): + return {"approvals": []} + approvals = raw.get("approvals") + if not isinstance(approvals, list): + raw["approvals"] = [] + return raw + + +def save_allowlist(data: Dict[str, Any]) -> None: + """Atomically persist the allowlist via per-process ``mkstemp`` + + ``os.replace``. Cross-process read-modify-write races are handled + by :func:`_locked_update_approvals` (``fcntl.flock``). On OSError + the failure is logged; the in-process hook still registers but + the approval won't survive across runs.""" + p = allowlist_path() + try: + p.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_path = tempfile.mkstemp( + prefix=f"{p.name}.", suffix=".tmp", dir=str(p.parent), + ) + try: + with os.fdopen(fd, "w") as fh: + fh.write(json.dumps(data, indent=2, sort_keys=True)) + os.replace(tmp_path, p) + except Exception: + try: + os.unlink(tmp_path) + except OSError: + pass + raise + except OSError as exc: + logger.warning( + "Failed to persist shell hook allowlist to %s: %s. " + "The approval is in-memory for this run, but the next " + "startup will re-prompt (or skip registration on non-TTY " + "runs without --accept-hooks / HERMES_ACCEPT_HOOKS).", + p, exc, + ) + + +def _is_allowlisted(event: str, command: str) -> bool: + data = load_allowlist() + return any( + isinstance(e, dict) + and e.get("event") == event + and e.get("command") == command + for e in data.get("approvals", []) + ) + + +@contextmanager +def _locked_update_approvals() -> Iterator[Dict[str, Any]]: + """Serialise read-modify-write on the allowlist across processes. + + Holds an exclusive ``flock`` on a sibling lock file for the duration + of the update so concurrent ``_record_approval``/``revoke`` callers + cannot clobber each other's changes (the race Codex reproduced with + 20–50 simultaneous writers). Falls back to an in-process lock on + platforms without ``fcntl``. + """ + p = allowlist_path() + p.parent.mkdir(parents=True, exist_ok=True) + lock_path = p.with_suffix(p.suffix + ".lock") + + if fcntl is None: # pragma: no cover — non-POSIX fallback + with _allowlist_write_lock: + data = load_allowlist() + yield data + save_allowlist(data) + return + + with open(lock_path, "a+") as lock_fh: + fcntl.flock(lock_fh.fileno(), fcntl.LOCK_EX) + try: + data = load_allowlist() + yield data + save_allowlist(data) + finally: + fcntl.flock(lock_fh.fileno(), fcntl.LOCK_UN) + + +def _prompt_and_record( + event: str, command: str, *, accept_hooks: bool, +) -> bool: + """Decide whether to approve an unseen ``(event, command)`` pair. + Returns ``True`` iff the approval was granted and recorded. + """ + if accept_hooks: + _record_approval(event, command) + logger.info( + "shell hook auto-approved via --accept-hooks / env / config: " + "%s -> %s", event, command, + ) + return True + + if not sys.stdin.isatty(): + return False + + print( + f"\n⚠ Hermes is about to register a shell hook that will run a\n" + f" command on your behalf.\n\n" + f" Event: {event}\n" + f" Command: {command}\n\n" + f" Commands run with your full user credentials. Only approve\n" + f" commands you trust." + ) + try: + answer = input("Allow this hook to run? [y/N]: ").strip().lower() + except (EOFError, KeyboardInterrupt): + print() # keep the terminal tidy after ^C + return False + + if answer in ("y", "yes"): + _record_approval(event, command) + return True + + return False + + +def _record_approval(event: str, command: str) -> None: + entry = { + "event": event, + "command": command, + "approved_at": _utc_now_iso(), + "script_mtime_at_approval": script_mtime_iso(command), + } + with _locked_update_approvals() as data: + data["approvals"] = [ + e for e in data.get("approvals", []) + if not ( + isinstance(e, dict) + and e.get("event") == event + and e.get("command") == command + ) + ] + [entry] + + +def _utc_now_iso() -> str: + return datetime.now(tz=timezone.utc).isoformat().replace("+00:00", "Z") + + +def revoke(command: str) -> int: + """Remove every allowlist entry matching ``command``. + + Returns the number of entries removed. Does not unregister any + callbacks that are already live on the plugin manager in the current + process — restart the CLI / gateway to drop them. + """ + with _locked_update_approvals() as data: + before = len(data.get("approvals", [])) + data["approvals"] = [ + e for e in data.get("approvals", []) + if not (isinstance(e, dict) and e.get("command") == command) + ] + after = len(data["approvals"]) + return before - after + + +_SCRIPT_EXTENSIONS: Tuple[str, ...] = ( + ".sh", ".bash", ".zsh", ".fish", + ".py", ".pyw", + ".rb", ".pl", ".lua", + ".js", ".mjs", ".cjs", ".ts", +) + + +def _command_script_path(command: str) -> str: + """Return the script path from ``command`` for doctor / drift checks. + + Prefers a token ending in a known script extension, then a token + containing ``/`` or leading ``~``, then the first token. Handles + ``python3 /path/hook.py``, ``/usr/bin/env bash hook.sh``, and the + common bare-path form. + """ + try: + parts = shlex.split(command) + except ValueError: + return command + if not parts: + return command + for part in parts: + if part.lower().endswith(_SCRIPT_EXTENSIONS): + return part + for part in parts: + if "/" in part or part.startswith("~"): + return part + return parts[0] + + +# --------------------------------------------------------------------------- +# Helpers for accept-hooks resolution +# --------------------------------------------------------------------------- + +def _resolve_effective_accept( + cfg: Dict[str, Any], accept_hooks_arg: bool, +) -> bool: + """Combine all three opt-in channels into a single boolean. + + Precedence (any truthy source flips us on): + 1. ``--accept-hooks`` flag (CLI) / explicit argument + 2. ``HERMES_ACCEPT_HOOKS`` env var + 3. ``hooks_auto_accept: true`` in ``cli-config.yaml`` + """ + if accept_hooks_arg: + return True + env = os.environ.get("HERMES_ACCEPT_HOOKS", "").strip().lower() + if env in ("1", "true", "yes", "on"): + return True + cfg_val = cfg.get("hooks_auto_accept", False) + return bool(cfg_val) + + +# --------------------------------------------------------------------------- +# Introspection (used by `hermes hooks` CLI) +# --------------------------------------------------------------------------- + +def allowlist_entry_for(event: str, command: str) -> Optional[Dict[str, Any]]: + """Return the allowlist record for this pair, if any.""" + for e in load_allowlist().get("approvals", []): + if ( + isinstance(e, dict) + and e.get("event") == event + and e.get("command") == command + ): + return e + return None + + +def script_mtime_iso(command: str) -> Optional[str]: + """ISO-8601 mtime of the resolved script path, or ``None`` if the + script is missing.""" + path = _command_script_path(command) + if not path: + return None + try: + expanded = os.path.expanduser(path) + return datetime.fromtimestamp( + os.path.getmtime(expanded), tz=timezone.utc, + ).isoformat().replace("+00:00", "Z") + except OSError: + return None + + +def script_is_executable(command: str) -> bool: + """Return ``True`` iff ``command`` is runnable as configured. + + For a bare invocation (``/path/hook.sh``) the script itself must be + executable. For interpreter-prefixed commands (``python3 + /path/hook.py``, ``/usr/bin/env bash hook.sh``) the script just has + to be readable — the interpreter doesn't care about the ``X_OK`` + bit. Mirrors what ``_spawn`` would actually do at runtime.""" + path = _command_script_path(command) + if not path: + return False + expanded = os.path.expanduser(path) + if not os.path.isfile(expanded): + return False + try: + argv = shlex.split(command) + except ValueError: + return False + is_bare_invocation = bool(argv) and argv[0] == path + required = os.X_OK if is_bare_invocation else os.R_OK + return os.access(expanded, required) + + +def run_once( + spec: ShellHookSpec, kwargs: Dict[str, Any], +) -> Dict[str, Any]: + """Fire a single shell-hook invocation with a synthetic payload. + Used by ``hermes hooks test`` and ``hermes hooks doctor``. + + ``kwargs`` is the same dict that :func:`hermes_cli.plugins.invoke_hook` + would pass at runtime. It is routed through :func:`_serialize_payload` + so the synthetic stdin exactly matches what a real hook firing would + produce — otherwise scripts tested via ``hermes hooks test`` could + diverge silently from production behaviour. + + Returns the :func:`_spawn` diagnostic dict plus a ``parsed`` field + holding the canonical Hermes-wire-shape response.""" + stdin_json = _serialize_payload(spec.event, kwargs) + result = _spawn(spec, stdin_json) + result["parsed"] = _parse_response(spec.event, result["stdout"]) + return result diff --git a/cli-config.yaml.example b/cli-config.yaml.example index 6d8750a2d..a4a5ffda7 100644 --- a/cli-config.yaml.example +++ b/cli-config.yaml.example @@ -917,3 +917,39 @@ display: # # Names and usernames are NOT affected (user-chosen, publicly visible). # # Routing/delivery still uses the original values internally. # redact_pii: false + +# ============================================================================= +# Shell-script hooks +# ============================================================================= +# Register shell scripts as plugin-hook callbacks. Each entry is executed as +# a subprocess (shell=False, shlex.split) with a JSON payload on stdin. On +# stdout the script may return JSON that either blocks the tool call or +# injects context into the next LLM call. +# +# Valid events (mirror hermes_cli.plugins.VALID_HOOKS): +# pre_tool_call, post_tool_call, pre_llm_call, post_llm_call, +# pre_api_request, post_api_request, on_session_start, on_session_end, +# on_session_finalize, on_session_reset, subagent_stop +# +# First-use consent: each (event, command) pair prompts once on a TTY, then +# is persisted to ~/.hermes/shell-hooks-allowlist.json. Non-interactive +# runs (gateway, cron) need --accept-hooks, HERMES_ACCEPT_HOOKS=1, or the +# hooks_auto_accept key below. +# +# See website/docs/user-guide/features/hooks.md for the full JSON wire +# protocol and worked examples. +# +# hooks: +# pre_tool_call: +# - matcher: "terminal" +# command: "~/.hermes/agent-hooks/block-rm-rf.sh" +# timeout: 10 +# post_tool_call: +# - matcher: "write_file|patch" +# command: "~/.hermes/agent-hooks/auto-format.sh" +# pre_llm_call: +# - command: "~/.hermes/agent-hooks/inject-cwd-context.sh" +# subagent_stop: +# - command: "~/.hermes/agent-hooks/log-orchestration.sh" +# +# hooks_auto_accept: false diff --git a/gateway/run.py b/gateway/run.py index 36c5655b1..3fba1d8d9 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1960,6 +1960,39 @@ class GatewayRunner: "or configure platform allowlists (e.g., TELEGRAM_ALLOWED_USERS=your_id)." ) + # Discover Python plugins before shell hooks so plugin block + # decisions take precedence in tie cases. The CLI startup path + # does this via an explicit call in hermes_cli/main.py; the + # gateway lazily imports run_agent inside per-request handlers, + # so the discover_plugins() side-effect in model_tools.py is NOT + # guaranteed to have run by the time we reach this point. + try: + from hermes_cli.plugins import discover_plugins + discover_plugins() + except Exception: + logger.debug( + "plugin discovery failed at gateway startup", exc_info=True, + ) + + # Register declarative shell hooks from cli-config.yaml. Gateway + # has no TTY, so consent has to come from one of the three opt-in + # channels (--accept-hooks on launch, HERMES_ACCEPT_HOOKS env var, + # or hooks_auto_accept: true in config.yaml). We pass + # accept_hooks=False here and let register_from_config resolve + # the effective value from env + config itself — the CLI-side + # registration already honored --accept-hooks, and re-reading + # hooks_auto_accept here would just duplicate that lookup. + # Failures are logged but must never block gateway startup. + try: + from hermes_cli.config import load_config + from agent.shell_hooks import register_from_config + register_from_config(load_config(), accept_hooks=False) + except Exception: + logger.debug( + "shell-hook registration failed at gateway startup", + exc_info=True, + ) + # Discover and load event hooks self.hooks.discover_and_load() diff --git a/hermes_cli/config.py b/hermes_cli/config.py index c046d2b28..5f10f0de2 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -773,6 +773,21 @@ DEFAULT_CONFIG = { "command_allowlist": [], # User-defined quick commands that bypass the agent loop (type: exec only) "quick_commands": {}, + + # Shell-script hooks — declarative bridge that invokes shell scripts + # on plugin-hook events (pre_tool_call, post_tool_call, pre_llm_call, + # subagent_stop, etc.). Each entry maps an event name to a list of + # {matcher, command, timeout} dicts. First registration of a new + # command prompts the user for consent; subsequent runs reuse the + # stored approval from ~/.hermes/shell-hooks-allowlist.json. + # See `website/docs/user-guide/features/hooks.md` for schema + examples. + "hooks": {}, + + # Auto-accept shell-hook registrations without a TTY prompt. Also + # toggleable per-invocation via --accept-hooks or HERMES_ACCEPT_HOOKS=1. + # Gateway / cron / non-interactive runs need this (or one of the other + # channels) to pick up newly-added hooks. + "hooks_auto_accept": False, # Custom personalities — add your own entries here # Supports string format: {"name": "system prompt"} # Or dict format: {"name": {"description": "...", "system_prompt": "...", "tone": "...", "style": "..."}} diff --git a/hermes_cli/hooks.py b/hermes_cli/hooks.py new file mode 100644 index 000000000..97d9e36b3 --- /dev/null +++ b/hermes_cli/hooks.py @@ -0,0 +1,385 @@ +"""hermes hooks — inspect and manage shell-script hooks. + +Usage:: + + hermes hooks list + hermes hooks test [--for-tool X] [--payload-file F] + hermes hooks revoke + hermes hooks doctor + +Consent records live under ``~/.hermes/shell-hooks-allowlist.json`` and +hook definitions come from the ``hooks:`` block in ``~/.hermes/config.yaml`` +(the same config read by the CLI / gateway at startup). + +This module is a thin CLI shell over :mod:`agent.shell_hooks`; every +shared concern (payload serialisation, response parsing, allowlist +format) lives there. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Any, Dict, List, Optional + + +def hooks_command(args) -> None: + """Entry point for ``hermes hooks`` — dispatches to the requested action.""" + sub = getattr(args, "hooks_action", None) + + if not sub: + print("Usage: hermes hooks {list|test|revoke|doctor}") + print("Run 'hermes hooks --help' for details.") + return + + if sub in ("list", "ls"): + _cmd_list(args) + elif sub == "test": + _cmd_test(args) + elif sub in ("revoke", "remove", "rm"): + _cmd_revoke(args) + elif sub == "doctor": + _cmd_doctor(args) + else: + print(f"Unknown hooks subcommand: {sub}") + + +# --------------------------------------------------------------------------- +# list +# --------------------------------------------------------------------------- + +def _cmd_list(_args) -> None: + from hermes_cli.config import load_config + from agent import shell_hooks + + specs = shell_hooks.iter_configured_hooks(load_config()) + + if not specs: + print("No shell hooks configured in ~/.hermes/config.yaml.") + print("See `hermes hooks --help` or") + print(" website/docs/user-guide/features/hooks.md") + print("for the config schema and worked examples.") + return + + by_event: Dict[str, List] = {} + for spec in specs: + by_event.setdefault(spec.event, []).append(spec) + + allowlist = shell_hooks.load_allowlist() + approved = { + (e.get("event"), e.get("command")) + for e in allowlist.get("approvals", []) + if isinstance(e, dict) + } + + print(f"Configured shell hooks ({len(specs)} total):\n") + + for event in sorted(by_event.keys()): + print(f" [{event}]") + for spec in by_event[event]: + is_approved = (spec.event, spec.command) in approved + status = "✓ allowed" if is_approved else "✗ not allowlisted" + matcher_part = f" matcher={spec.matcher!r}" if spec.matcher else "" + print( + f" - {spec.command}{matcher_part} " + f"(timeout={spec.timeout}s, {status})" + ) + + if is_approved: + entry = shell_hooks.allowlist_entry_for(spec.event, spec.command) + if entry and entry.get("approved_at"): + print(f" approved_at: {entry['approved_at']}") + mtime_now = shell_hooks.script_mtime_iso(spec.command) + mtime_at = entry.get("script_mtime_at_approval") + if mtime_now and mtime_at and mtime_now > mtime_at: + print( + f" ⚠ script modified since approval " + f"(was {mtime_at}, now {mtime_now}) — " + f"run `hermes hooks doctor` to re-validate" + ) + print() + + +# --------------------------------------------------------------------------- +# test +# --------------------------------------------------------------------------- + +# Synthetic kwargs matching the real invoke_hook() call sites — these are +# passed verbatim to agent.shell_hooks.run_once(), which routes them through +# the same _serialize_payload() that production firings use. That way the +# stdin a script sees under `hermes hooks test` and `hermes hooks doctor` +# is identical in shape to what it will see at runtime. +_DEFAULT_PAYLOADS = { + "pre_tool_call": { + "tool_name": "terminal", + "args": {"command": "echo hello"}, + "session_id": "test-session", + "task_id": "test-task", + "tool_call_id": "test-call", + }, + "post_tool_call": { + "tool_name": "terminal", + "args": {"command": "echo hello"}, + "session_id": "test-session", + "task_id": "test-task", + "tool_call_id": "test-call", + "result": '{"output": "hello"}', + }, + "pre_llm_call": { + "session_id": "test-session", + "user_message": "What is the weather?", + "conversation_history": [], + "is_first_turn": True, + "model": "gpt-4", + "platform": "cli", + }, + "post_llm_call": { + "session_id": "test-session", + "model": "gpt-4", + "platform": "cli", + }, + "on_session_start": {"session_id": "test-session"}, + "on_session_end": {"session_id": "test-session"}, + "on_session_finalize": {"session_id": "test-session"}, + "on_session_reset": {"session_id": "test-session"}, + "pre_api_request": { + "session_id": "test-session", + "task_id": "test-task", + "platform": "cli", + "model": "claude-sonnet-4-6", + "provider": "anthropic", + "base_url": "https://api.anthropic.com", + "api_mode": "anthropic_messages", + "api_call_count": 1, + "message_count": 4, + "tool_count": 12, + "approx_input_tokens": 2048, + "request_char_count": 8192, + "max_tokens": 4096, + }, + "post_api_request": { + "session_id": "test-session", + "task_id": "test-task", + "platform": "cli", + "model": "claude-sonnet-4-6", + "provider": "anthropic", + "base_url": "https://api.anthropic.com", + "api_mode": "anthropic_messages", + "api_call_count": 1, + "api_duration": 1.234, + "finish_reason": "stop", + "message_count": 4, + "response_model": "claude-sonnet-4-6", + "usage": {"input_tokens": 2048, "output_tokens": 512}, + "assistant_content_chars": 1200, + "assistant_tool_call_count": 0, + }, + "subagent_stop": { + "parent_session_id": "parent-sess", + "child_role": None, + "child_summary": "Synthetic summary for hooks test", + "child_status": "completed", + "duration_ms": 1234, + }, +} + + +def _cmd_test(args) -> None: + from hermes_cli.config import load_config + from hermes_cli.plugins import VALID_HOOKS + from agent import shell_hooks + + event = args.event + if event not in VALID_HOOKS: + print(f"Unknown event: {event!r}") + print(f"Valid events: {', '.join(sorted(VALID_HOOKS))}") + return + + # Synthetic kwargs in the same shape invoke_hook() would pass. Merged + # with --for-tool (overrides tool_name) and --payload-file (extra kwargs). + payload = dict(_DEFAULT_PAYLOADS.get(event, {"session_id": "test-session"})) + + if getattr(args, "for_tool", None): + payload["tool_name"] = args.for_tool + + if getattr(args, "payload_file", None): + try: + custom = json.loads(Path(args.payload_file).read_text()) + if isinstance(custom, dict): + payload.update(custom) + else: + print(f"Warning: {args.payload_file} is not a JSON object; ignoring") + except Exception as exc: + print(f"Error reading payload file: {exc}") + return + + specs = shell_hooks.iter_configured_hooks(load_config()) + specs = [s for s in specs if s.event == event] + + if getattr(args, "for_tool", None): + specs = [ + s for s in specs + if s.event not in ("pre_tool_call", "post_tool_call") + or s.matches_tool(args.for_tool) + ] + + if not specs: + print(f"No shell hooks configured for event: {event}") + if getattr(args, "for_tool", None): + print(f"(with matcher filter --for-tool={args.for_tool})") + return + + print(f"Firing {len(specs)} hook(s) for event '{event}':\n") + for spec in specs: + print(f" → {spec.command}") + result = shell_hooks.run_once(spec, payload) + _print_run_result(result) + print() + + +def _print_run_result(result: Dict[str, Any]) -> None: + if result.get("error"): + print(f" ✗ error: {result['error']}") + return + if result.get("timed_out"): + print(f" ✗ timed out after {result['elapsed_seconds']}s") + return + + rc = result.get("returncode") + elapsed = result.get("elapsed_seconds", 0) + print(f" exit={rc} elapsed={elapsed}s") + + stdout = (result.get("stdout") or "").strip() + stderr = (result.get("stderr") or "").strip() + if stdout: + print(f" stdout: {_truncate(stdout, 400)}") + if stderr: + print(f" stderr: {_truncate(stderr, 400)}") + + parsed = result.get("parsed") + if parsed: + print(f" parsed (Hermes wire shape): {json.dumps(parsed)}") + else: + print(" parsed: ") + + +def _truncate(s: str, n: int) -> str: + return s if len(s) <= n else s[: n - 3] + "..." + + +# --------------------------------------------------------------------------- +# revoke +# --------------------------------------------------------------------------- + +def _cmd_revoke(args) -> None: + from agent import shell_hooks + + removed = shell_hooks.revoke(args.command) + if removed == 0: + print(f"No allowlist entry found for command: {args.command}") + return + print(f"Removed {removed} allowlist entry/entries for: {args.command}") + print( + "Note: currently running CLI / gateway processes keep their " + "already-registered callbacks until they restart." + ) + + +# --------------------------------------------------------------------------- +# doctor +# --------------------------------------------------------------------------- + +def _cmd_doctor(_args) -> None: + from hermes_cli.config import load_config + from agent import shell_hooks + + specs = shell_hooks.iter_configured_hooks(load_config()) + + if not specs: + print("No shell hooks configured — nothing to check.") + return + + print(f"Checking {len(specs)} configured shell hook(s)...\n") + + problems = 0 + for spec in specs: + print(f" [{spec.event}] {spec.command}") + problems += _doctor_one(spec, shell_hooks) + print() + + if problems: + print(f"{problems} issue(s) found. Fix before relying on these hooks.") + else: + print("All shell hooks look healthy.") + + +def _doctor_one(spec, shell_hooks) -> int: + problems = 0 + + # 1. Script exists and is executable + if shell_hooks.script_is_executable(spec.command): + print(" ✓ script exists and is executable") + else: + problems += 1 + print(" ✗ script missing or not executable " + "(chmod +x the file, or fix the path)") + + # 2. Allowlist status + entry = shell_hooks.allowlist_entry_for(spec.event, spec.command) + if entry: + print(f" ✓ allowlisted (approved {entry.get('approved_at', '?')})") + else: + problems += 1 + print(" ✗ not allowlisted — hook will NOT fire at runtime " + "(run with --accept-hooks once, or confirm at the TTY prompt)") + + # 3. Mtime drift + if entry and entry.get("script_mtime_at_approval"): + mtime_now = shell_hooks.script_mtime_iso(spec.command) + mtime_at = entry["script_mtime_at_approval"] + if mtime_now and mtime_at and mtime_now > mtime_at: + problems += 1 + print(f" ⚠ script modified since approval " + f"(was {mtime_at}, now {mtime_now}) — review changes, " + f"then `hermes hooks revoke` + re-approve to refresh") + elif mtime_now and mtime_at and mtime_now == mtime_at: + print(" ✓ script unchanged since approval") + + # 4. Produces valid JSON for a synthetic payload — only when the entry + # is already allowlisted. Otherwise `hermes hooks doctor` would execute + # every script listed in a freshly-pulled config before the user has + # reviewed them, which directly contradicts the documented workflow + # ("spot newly-added hooks *before they register*"). + if not entry: + print(" ℹ skipped JSON smoke test — not allowlisted yet. " + "Approve the hook first (via TTY prompt or --accept-hooks), " + "then re-run `hermes hooks doctor`.") + elif shell_hooks.script_is_executable(spec.command): + payload = _DEFAULT_PAYLOADS.get(spec.event, {"extra": {}}) + result = shell_hooks.run_once(spec, payload) + if result.get("timed_out"): + problems += 1 + print(f" ✗ timed out after {result['elapsed_seconds']}s " + f"on synthetic payload (timeout={spec.timeout}s)") + elif result.get("error"): + problems += 1 + print(f" ✗ execution error: {result['error']}") + else: + rc = result.get("returncode") + elapsed = result.get("elapsed_seconds", 0) + stdout = (result.get("stdout") or "").strip() + if stdout: + try: + json.loads(stdout) + print(f" ✓ produced valid JSON on synthetic payload " + f"(exit={rc}, {elapsed}s)") + except json.JSONDecodeError: + problems += 1 + print(f" ✗ stdout was not valid JSON (exit={rc}, " + f"{elapsed}s): {_truncate(stdout, 120)}") + else: + print(f" ✓ ran clean with empty stdout " + f"(exit={rc}, {elapsed}s) — hook is observer-only") + + return problems diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 489a1652d..3da8424a7 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -51,6 +51,19 @@ import sys from pathlib import Path from typing import Optional +def _add_accept_hooks_flag(parser) -> None: + """Attach the ``--accept-hooks`` flag. Shared across every agent + subparser so the flag works regardless of CLI position.""" + parser.add_argument( + "--accept-hooks", + action="store_true", + default=argparse.SUPPRESS, + help=( + "Auto-approve unseen shell hooks without a TTY prompt " + "(equivalent to HERMES_ACCEPT_HOOKS=1 / hooks_auto_accept: true)." + ), + ) + def _require_tty(command_name: str) -> None: """Exit with a clear error if stdin is not a terminal. @@ -4092,6 +4105,12 @@ def cmd_webhook(args): webhook_command(args) +def cmd_hooks(args): + """Shell-hook inspection and management.""" + from hermes_cli.hooks import hooks_command + hooks_command(args) + + def cmd_doctor(args): """Check configuration and dependencies.""" from hermes_cli.doctor import run_doctor @@ -6371,6 +6390,17 @@ For more help on a command: default=False, help="Run in an isolated git worktree (for parallel agents)", ) + parser.add_argument( + "--accept-hooks", + action="store_true", + default=False, + help=( + "Auto-approve any unseen shell hooks declared in config.yaml " + "without a TTY prompt. Equivalent to HERMES_ACCEPT_HOOKS=1 or " + "hooks_auto_accept: true in config.yaml. Use on CI / headless " + "runs that can't prompt." + ), + ) parser.add_argument( "--skills", "-s", @@ -6493,6 +6523,16 @@ For more help on a command: default=argparse.SUPPRESS, help="Run in an isolated git worktree (for parallel agents on the same repo)", ) + chat_parser.add_argument( + "--accept-hooks", + action="store_true", + default=argparse.SUPPRESS, + help=( + "Auto-approve any unseen shell hooks declared in config.yaml " + "without a TTY prompt (see also HERMES_ACCEPT_HOOKS env var and " + "hooks_auto_accept: in config.yaml)." + ), + ) chat_parser.add_argument( "--checkpoints", action="store_true", @@ -6612,6 +6652,8 @@ For more help on a command: action="store_true", help="Replace any existing gateway instance (useful for systemd)", ) + _add_accept_hooks_flag(gateway_run) + _add_accept_hooks_flag(gateway_parser) # gateway start gateway_start = gateway_subparsers.add_parser( @@ -6976,6 +7018,7 @@ For more help on a command: "run", help="Run a job on the next scheduler tick" ) cron_run.add_argument("job_id", help="Job ID to trigger") + _add_accept_hooks_flag(cron_run) cron_remove = cron_subparsers.add_parser( "remove", aliases=["rm", "delete"], help="Remove a scheduled job" @@ -6986,8 +7029,9 @@ For more help on a command: cron_subparsers.add_parser("status", help="Check if cron scheduler is running") # cron tick (mostly for debugging) - cron_subparsers.add_parser("tick", help="Run due jobs once and exit") - + cron_tick = cron_subparsers.add_parser("tick", help="Run due jobs once and exit") + _add_accept_hooks_flag(cron_tick) + _add_accept_hooks_flag(cron_parser) cron_parser.set_defaults(func=cmd_cron) # ========================================================================= @@ -7054,6 +7098,67 @@ For more help on a command: webhook_parser.set_defaults(func=cmd_webhook) + # ========================================================================= + # hooks command — shell-hook inspection and management + # ========================================================================= + hooks_parser = subparsers.add_parser( + "hooks", + help="Inspect and manage shell-script hooks", + description=( + "Inspect shell-script hooks declared in ~/.hermes/config.yaml, " + "test them against synthetic payloads, and manage the first-use " + "consent allowlist at ~/.hermes/shell-hooks-allowlist.json." + ), + ) + hooks_subparsers = hooks_parser.add_subparsers(dest="hooks_action") + + hooks_subparsers.add_parser( + "list", aliases=["ls"], + help="List configured hooks with matcher, timeout, and consent status", + ) + + _hk_test = hooks_subparsers.add_parser( + "test", + help="Fire every hook matching against a synthetic payload", + ) + _hk_test.add_argument( + "event", + help="Hook event name (e.g. pre_tool_call, pre_llm_call, subagent_stop)", + ) + _hk_test.add_argument( + "--for-tool", dest="for_tool", default=None, + help=( + "Only fire hooks whose matcher matches this tool name " + "(used for pre_tool_call / post_tool_call)" + ), + ) + _hk_test.add_argument( + "--payload-file", dest="payload_file", default=None, + help=( + "Path to a JSON file whose contents are merged into the " + "synthetic payload before execution" + ), + ) + + _hk_revoke = hooks_subparsers.add_parser( + "revoke", aliases=["remove", "rm"], + help="Remove a command's allowlist entries (takes effect on next restart)", + ) + _hk_revoke.add_argument( + "command", + help="The exact command string to revoke (as declared in config.yaml)", + ) + + hooks_subparsers.add_parser( + "doctor", + help=( + "Check each configured hook: exec bit, allowlist, mtime drift, " + "JSON validity, and synthetic run timing" + ), + ) + + hooks_parser.set_defaults(func=cmd_hooks) + # ========================================================================= # doctor command # ========================================================================= @@ -7727,6 +7832,7 @@ Examples: action="store_true", help="Enable verbose logging on stderr", ) + _add_accept_hooks_flag(mcp_serve_p) mcp_add_p = mcp_sub.add_parser( "add", help="Add an MCP server (discovery-first install)" @@ -7765,6 +7871,8 @@ Examples: ) mcp_login_p.add_argument("name", help="Server name to re-authenticate") + _add_accept_hooks_flag(mcp_parser) + def cmd_mcp(args): from hermes_cli.mcp_config import mcp_command @@ -8176,6 +8284,7 @@ Examples: help="Run Hermes Agent as an ACP (Agent Client Protocol) server", description="Start Hermes Agent in ACP mode for editor integration (VS Code, Zed, JetBrains)", ) + _add_accept_hooks_flag(acp_parser) def cmd_acp(args): """Launch Hermes Agent as an ACP server.""" @@ -8449,6 +8558,42 @@ Examples: cmd_version(args) return + # Discover Python plugins and register shell hooks once, before any + # command that can fire lifecycle hooks. Both are idempotent; gated + # so introspection/management commands (hermes hooks list, cron + # list, gateway status, mcp add, ...) don't pay discovery cost or + # trigger consent prompts for hooks the user is still inspecting. + # Groups with mixed admin/CRUD vs. agent-running entries narrow via + # the nested subcommand (dest varies by parser). + _AGENT_COMMANDS = {None, "chat", "acp", "rl"} + _AGENT_SUBCOMMANDS = { + "cron": ("cron_command", {"run", "tick"}), + "gateway": ("gateway_command", {"run"}), + "mcp": ("mcp_action", {"serve"}), + } + _sub_attr, _sub_set = _AGENT_SUBCOMMANDS.get(args.command, (None, None)) + if ( + args.command in _AGENT_COMMANDS + or (_sub_attr and getattr(args, _sub_attr, None) in _sub_set) + ): + _accept_hooks = bool(getattr(args, "accept_hooks", False)) + try: + from hermes_cli.plugins import discover_plugins + discover_plugins() + except Exception: + logger.debug( + "plugin discovery failed at CLI startup", exc_info=True, + ) + try: + from hermes_cli.config import load_config + from agent.shell_hooks import register_from_config + register_from_config(load_config(), accept_hooks=_accept_hooks) + except Exception: + logger.debug( + "shell-hook registration failed at CLI startup", + exc_info=True, + ) + # Handle top-level --resume / --continue as shortcut to chat if (args.resume or args.continue_last) and args.command is None: args.command = "chat" diff --git a/hermes_cli/plugins.py b/hermes_cli/plugins.py index 62a092885..a593782e6 100644 --- a/hermes_cli/plugins.py +++ b/hermes_cli/plugins.py @@ -70,6 +70,7 @@ VALID_HOOKS: Set[str] = { "on_session_end", "on_session_finalize", "on_session_reset", + "subagent_stop", } ENTRY_POINTS_GROUP = "hermes_agent.plugins" diff --git a/tests/agent/test_shell_hooks.py b/tests/agent/test_shell_hooks.py new file mode 100644 index 000000000..088c23eb4 --- /dev/null +++ b/tests/agent/test_shell_hooks.py @@ -0,0 +1,716 @@ +"""Tests for the shell-hooks subprocess bridge (agent.shell_hooks). + +These tests focus on the pure translation layer — JSON serialisation, +JSON parsing, matcher behaviour, block-schema correctness, and the +subprocess runner's graceful error handling. Consent prompts are +covered in ``test_shell_hooks_consent.py``. +""" + +from __future__ import annotations + +import json +import os +import stat +from pathlib import Path +from typing import Any, Dict + +import pytest + +from agent import shell_hooks + + +# ── helpers ─────────────────────────────────────────────────────────────── + + +def _write_script(tmp_path: Path, name: str, body: str) -> Path: + path = tmp_path / name + path.write_text(body) + path.chmod(0o755) + return path + + +def _allowlist_pair(monkeypatch, tmp_path, event: str, command: str) -> None: + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_home")) + shell_hooks._record_approval(event, command) + + +@pytest.fixture(autouse=True) +def _reset_registration_state(): + shell_hooks.reset_for_tests() + yield + shell_hooks.reset_for_tests() + + +# ── _parse_response ─────────────────────────────────────────────────────── + + +class TestParseResponse: + def test_block_claude_code_style(self): + r = shell_hooks._parse_response( + "pre_tool_call", + '{"decision": "block", "reason": "nope"}', + ) + assert r == {"action": "block", "message": "nope"} + + def test_block_canonical_style(self): + r = shell_hooks._parse_response( + "pre_tool_call", + '{"action": "block", "message": "nope"}', + ) + assert r == {"action": "block", "message": "nope"} + + def test_block_canonical_wins_over_claude_style(self): + r = shell_hooks._parse_response( + "pre_tool_call", + '{"action": "block", "message": "canonical", ' + '"decision": "block", "reason": "claude"}', + ) + assert r == {"action": "block", "message": "canonical"} + + def test_empty_stdout_returns_none(self): + assert shell_hooks._parse_response("pre_tool_call", "") is None + assert shell_hooks._parse_response("pre_tool_call", " ") is None + + def test_invalid_json_returns_none(self): + assert shell_hooks._parse_response("pre_tool_call", "not json") is None + + def test_non_dict_json_returns_none(self): + assert shell_hooks._parse_response("pre_tool_call", "[1, 2]") is None + + def test_non_block_pre_tool_call_returns_none(self): + r = shell_hooks._parse_response("pre_tool_call", '{"decision": "allow"}') + assert r is None + + def test_pre_llm_call_context_passthrough(self): + r = shell_hooks._parse_response( + "pre_llm_call", '{"context": "today is Friday"}', + ) + assert r == {"context": "today is Friday"} + + def test_subagent_stop_context_passthrough(self): + r = shell_hooks._parse_response( + "subagent_stop", '{"context": "child role=leaf"}', + ) + assert r == {"context": "child role=leaf"} + + def test_pre_llm_call_block_ignored(self): + """Only pre_tool_call honors block directives.""" + r = shell_hooks._parse_response( + "pre_llm_call", '{"decision": "block", "reason": "no"}', + ) + assert r is None + + +# ── _serialize_payload ──────────────────────────────────────────────────── + + +class TestSerializePayload: + def test_basic_pre_tool_call_schema(self): + raw = shell_hooks._serialize_payload( + "pre_tool_call", + { + "tool_name": "terminal", + "args": {"command": "ls"}, + "session_id": "sess-1", + "task_id": "t-1", + "tool_call_id": "c-1", + }, + ) + payload = json.loads(raw) + assert payload["hook_event_name"] == "pre_tool_call" + assert payload["tool_name"] == "terminal" + assert payload["tool_input"] == {"command": "ls"} + assert payload["session_id"] == "sess-1" + assert "cwd" in payload + # task_id / tool_call_id end up under extra + assert payload["extra"]["task_id"] == "t-1" + assert payload["extra"]["tool_call_id"] == "c-1" + + def test_args_not_dict_becomes_null(self): + raw = shell_hooks._serialize_payload( + "pre_tool_call", {"args": ["not", "a", "dict"]}, + ) + payload = json.loads(raw) + assert payload["tool_input"] is None + + def test_parent_session_id_used_when_no_session_id(self): + raw = shell_hooks._serialize_payload( + "subagent_stop", {"parent_session_id": "p-1"}, + ) + payload = json.loads(raw) + assert payload["session_id"] == "p-1" + + def test_unserialisable_extras_stringified(self): + class Weird: + def __repr__(self) -> str: + return "" + + raw = shell_hooks._serialize_payload( + "on_session_start", {"obj": Weird()}, + ) + payload = json.loads(raw) + assert payload["extra"]["obj"] == "" + + +# ── Matcher behaviour ───────────────────────────────────────────────────── + + +class TestMatcher: + def test_no_matcher_fires_for_any_tool(self): + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", command="echo", matcher=None, + ) + assert spec.matches_tool("terminal") + assert spec.matches_tool("write_file") + + def test_single_name_matcher(self): + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", command="echo", matcher="terminal", + ) + assert spec.matches_tool("terminal") + assert not spec.matches_tool("web_search") + + def test_alternation_matcher(self): + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", command="echo", matcher="terminal|file", + ) + assert spec.matches_tool("terminal") + assert spec.matches_tool("file") + assert not spec.matches_tool("web") + + def test_invalid_regex_falls_back_to_literal(self): + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", command="echo", matcher="foo[bar", + ) + assert spec.matches_tool("foo[bar") + assert not spec.matches_tool("foo") + + def test_matcher_ignored_when_no_tool_name(self): + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", command="echo", matcher="terminal", + ) + assert not spec.matches_tool(None) + + def test_matcher_leading_whitespace_stripped(self): + """YAML quirks can introduce leading/trailing whitespace — must + not silently break the matcher.""" + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", command="echo", matcher=" terminal ", + ) + assert spec.matcher == "terminal" + assert spec.matches_tool("terminal") + + def test_matcher_trailing_newline_stripped(self): + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", command="echo", matcher="terminal\n", + ) + assert spec.matches_tool("terminal") + + def test_whitespace_only_matcher_becomes_none(self): + """A matcher that's pure whitespace is treated as 'no matcher'.""" + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", command="echo", matcher=" ", + ) + assert spec.matcher is None + assert spec.matches_tool("anything") + + +# ── End-to-end subprocess behaviour ─────────────────────────────────────── + + +class TestCallbackSubprocess: + def test_timeout_returns_none(self, tmp_path): + # Script that sleeps forever; we set a 1s timeout. + script = _write_script( + tmp_path, "slow.sh", + "#!/usr/bin/env bash\nsleep 60\n", + ) + spec = shell_hooks.ShellHookSpec( + event="post_tool_call", command=str(script), timeout=1, + ) + cb = shell_hooks._make_callback(spec) + assert cb(tool_name="terminal") is None + + def test_malformed_json_stdout_returns_none(self, tmp_path): + script = _write_script( + tmp_path, "bad_json.sh", + "#!/usr/bin/env bash\necho 'not json at all'\n", + ) + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", command=str(script), + ) + cb = shell_hooks._make_callback(spec) + # Matcher is None so the callback fires for any tool. + assert cb(tool_name="terminal") is None + + def test_non_zero_exit_with_block_stdout_still_blocks(self, tmp_path): + """A script that signals failure via exit code AND prints a block + directive must still block — scripts should be free to mix exit + codes with parseable output.""" + script = _write_script( + tmp_path, "exit1_block.sh", + "#!/usr/bin/env bash\n" + 'printf \'{"decision": "block", "reason": "via exit 1"}\\n\'\n' + "exit 1\n", + ) + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", command=str(script), + ) + cb = shell_hooks._make_callback(spec) + assert cb(tool_name="terminal") == {"action": "block", "message": "via exit 1"} + + def test_block_translation_end_to_end(self, tmp_path): + """v1 schema-bug regression gate. + + Shell hook returns the Claude-Code-style payload and the bridge + must translate it to the canonical Hermes block shape so that + get_pre_tool_call_block_message() surfaces the block. + """ + script = _write_script( + tmp_path, "blocker.sh", + "#!/usr/bin/env bash\n" + 'printf \'{"decision": "block", "reason": "no terminal"}\\n\'\n', + ) + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", + command=str(script), + matcher="terminal", + ) + cb = shell_hooks._make_callback(spec) + result = cb(tool_name="terminal", args={"command": "rm -rf /"}) + assert result == {"action": "block", "message": "no terminal"} + + def test_block_aggregation_through_plugin_manager(self, tmp_path, monkeypatch): + """Registering via register_from_config makes + get_pre_tool_call_block_message surface the block — the real + end-to-end control flow used by run_agent._invoke_tool.""" + from hermes_cli import plugins + + script = _write_script( + tmp_path, "block.sh", + "#!/usr/bin/env bash\n" + 'printf \'{"decision": "block", "reason": "blocked-by-shell"}\\n\'\n', + ) + + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) + monkeypatch.setenv("HERMES_ACCEPT_HOOKS", "1") + + # Fresh manager + plugins._plugin_manager = plugins.PluginManager() + + cfg = { + "hooks": { + "pre_tool_call": [ + {"matcher": "terminal", "command": str(script)}, + ], + }, + } + registered = shell_hooks.register_from_config(cfg, accept_hooks=True) + assert len(registered) == 1 + + msg = plugins.get_pre_tool_call_block_message( + tool_name="terminal", + args={"command": "rm"}, + ) + assert msg == "blocked-by-shell" + + def test_matcher_regex_filters_callback(self, tmp_path, monkeypatch): + """A matcher set to 'terminal' must not fire for 'web_search'.""" + calls = tmp_path / "calls.log" + script = _write_script( + tmp_path, "log.sh", + f"#!/usr/bin/env bash\n" + f"echo \"$(cat -)\" >> {calls}\n" + f"printf '{{}}\\n'\n", + ) + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", + command=str(script), + matcher="terminal", + ) + cb = shell_hooks._make_callback(spec) + cb(tool_name="terminal", args={"command": "ls"}) + cb(tool_name="web_search", args={"q": "x"}) + cb(tool_name="file_read", args={"path": "x"}) + assert calls.exists() + # Only the terminal call wrote to the log + assert calls.read_text().count("pre_tool_call") == 1 + + def test_payload_schema_delivered(self, tmp_path): + capture = tmp_path / "payload.json" + script = _write_script( + tmp_path, "capture.sh", + f"#!/usr/bin/env bash\ncat - > {capture}\nprintf '{{}}\\n'\n", + ) + spec = shell_hooks.ShellHookSpec( + event="pre_tool_call", command=str(script), + ) + cb = shell_hooks._make_callback(spec) + cb( + tool_name="terminal", + args={"command": "echo hi"}, + session_id="sess-77", + task_id="task-77", + ) + payload = json.loads(capture.read_text()) + assert payload["hook_event_name"] == "pre_tool_call" + assert payload["tool_name"] == "terminal" + assert payload["tool_input"] == {"command": "echo hi"} + assert payload["session_id"] == "sess-77" + assert "cwd" in payload + assert payload["extra"]["task_id"] == "task-77" + + def test_pre_llm_call_context_flows_through(self, tmp_path): + script = _write_script( + tmp_path, "ctx.sh", + "#!/usr/bin/env bash\n" + 'printf \'{"context": "env-note"}\\n\'\n', + ) + spec = shell_hooks.ShellHookSpec( + event="pre_llm_call", command=str(script), + ) + cb = shell_hooks._make_callback(spec) + result = cb( + session_id="s1", user_message="hello", + conversation_history=[], is_first_turn=True, + model="gpt-4", platform="cli", + ) + assert result == {"context": "env-note"} + + def test_shlex_handles_paths_with_spaces(self, tmp_path): + dir_with_space = tmp_path / "path with space" + dir_with_space.mkdir() + script = _write_script( + dir_with_space, "ok.sh", + "#!/usr/bin/env bash\nprintf '{}\\n'\n", + ) + # Quote the path so shlex keeps it as a single token. + spec = shell_hooks.ShellHookSpec( + event="post_tool_call", + command=f'"{script}"', + ) + cb = shell_hooks._make_callback(spec) + # No crash = shlex parsed it correctly. + assert cb(tool_name="terminal") is None # empty object parses to None + + def test_missing_binary_logged_not_raised(self, tmp_path): + spec = shell_hooks.ShellHookSpec( + event="on_session_start", + command=str(tmp_path / "does-not-exist"), + ) + cb = shell_hooks._make_callback(spec) + # Must not raise — agent loop should continue. + assert cb(session_id="s") is None + + def test_non_executable_binary_logged_not_raised(self, tmp_path): + path = tmp_path / "no-exec" + path.write_text("#!/usr/bin/env bash\necho hi\n") + # Intentionally do NOT chmod +x. + spec = shell_hooks.ShellHookSpec( + event="on_session_start", command=str(path), + ) + cb = shell_hooks._make_callback(spec) + assert cb(session_id="s") is None + + +# ── config parsing ──────────────────────────────────────────────────────── + + +class TestParseHooksBlock: + def test_valid_entry(self): + specs = shell_hooks._parse_hooks_block({ + "pre_tool_call": [ + {"matcher": "terminal", "command": "/tmp/hook.sh", "timeout": 30}, + ], + }) + assert len(specs) == 1 + assert specs[0].event == "pre_tool_call" + assert specs[0].matcher == "terminal" + assert specs[0].command == "/tmp/hook.sh" + assert specs[0].timeout == 30 + + def test_unknown_event_skipped(self, caplog): + specs = shell_hooks._parse_hooks_block({ + "pre_tools_call": [ # typo + {"command": "/tmp/hook.sh"}, + ], + }) + assert specs == [] + + def test_missing_command_skipped(self): + specs = shell_hooks._parse_hooks_block({ + "pre_tool_call": [{"matcher": "terminal"}], + }) + assert specs == [] + + def test_timeout_clamped_to_max(self): + specs = shell_hooks._parse_hooks_block({ + "post_tool_call": [ + {"command": "/tmp/slow.sh", "timeout": 9999}, + ], + }) + assert specs[0].timeout == shell_hooks.MAX_TIMEOUT_SECONDS + + def test_non_int_timeout_defaulted(self): + specs = shell_hooks._parse_hooks_block({ + "post_tool_call": [ + {"command": "/tmp/x.sh", "timeout": "thirty"}, + ], + }) + assert specs[0].timeout == shell_hooks.DEFAULT_TIMEOUT_SECONDS + + def test_non_list_event_skipped(self): + specs = shell_hooks._parse_hooks_block({ + "pre_tool_call": "not a list", + }) + assert specs == [] + + def test_none_hooks_block(self): + assert shell_hooks._parse_hooks_block(None) == [] + assert shell_hooks._parse_hooks_block("string") == [] + assert shell_hooks._parse_hooks_block([]) == [] + + def test_non_tool_event_matcher_warns_and_drops(self, caplog): + """matcher: is only honored for pre/post_tool_call; must warn + and drop on other events so the spec reflects runtime.""" + import logging + cfg = {"pre_llm_call": [{"matcher": "terminal", "command": "/bin/echo"}]} + with caplog.at_level(logging.WARNING, logger=shell_hooks.logger.name): + specs = shell_hooks._parse_hooks_block(cfg) + assert len(specs) == 1 and specs[0].matcher is None + assert any( + "only honored for pre_tool_call" in r.getMessage() + and "pre_llm_call" in r.getMessage() + for r in caplog.records + ) + + +# ── Idempotent registration ─────────────────────────────────────────────── + + +class TestIdempotentRegistration: + def test_double_call_registers_once(self, tmp_path, monkeypatch): + from hermes_cli import plugins + + script = _write_script(tmp_path, "h.sh", + "#!/usr/bin/env bash\nprintf '{}\\n'\n") + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) + monkeypatch.setenv("HERMES_ACCEPT_HOOKS", "1") + + plugins._plugin_manager = plugins.PluginManager() + + cfg = {"hooks": {"on_session_start": [{"command": str(script)}]}} + + first = shell_hooks.register_from_config(cfg, accept_hooks=True) + second = shell_hooks.register_from_config(cfg, accept_hooks=True) + assert len(first) == 1 + assert second == [] + # Only one callback on the manager + mgr = plugins.get_plugin_manager() + assert len(mgr._hooks.get("on_session_start", [])) == 1 + + def test_same_command_different_matcher_registers_both( + self, tmp_path, monkeypatch, + ): + """Same script used for different matchers under one event must + register both callbacks — dedupe keys on (event, matcher, command).""" + from hermes_cli import plugins + + script = _write_script(tmp_path, "h.sh", + "#!/usr/bin/env bash\nprintf '{}\\n'\n") + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) + monkeypatch.setenv("HERMES_ACCEPT_HOOKS", "1") + + plugins._plugin_manager = plugins.PluginManager() + + cfg = { + "hooks": { + "pre_tool_call": [ + {"matcher": "terminal", "command": str(script)}, + {"matcher": "web_search", "command": str(script)}, + ], + }, + } + + registered = shell_hooks.register_from_config(cfg, accept_hooks=True) + assert len(registered) == 2 + mgr = plugins.get_plugin_manager() + assert len(mgr._hooks.get("pre_tool_call", [])) == 2 + + +# ── Allowlist concurrency ───────────────────────────────────────────────── + + +class TestAllowlistConcurrency: + """Regression tests for the Codex#1 finding: simultaneous + _record_approval() calls used to collide on a fixed tmp path and + silently lose entries under read-modify-write races.""" + + def test_parallel_record_approval_does_not_lose_entries( + self, tmp_path, monkeypatch, + ): + import threading + + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) + + N = 32 + barrier = threading.Barrier(N) + errors: list = [] + + def worker(i: int) -> None: + try: + barrier.wait(timeout=5) + shell_hooks._record_approval( + "on_session_start", f"/bin/hook-{i}.sh", + ) + except Exception as exc: # pragma: no cover + errors.append(exc) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(N)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"worker errors: {errors}" + + data = shell_hooks.load_allowlist() + commands = {e["command"] for e in data["approvals"]} + assert commands == {f"/bin/hook-{i}.sh" for i in range(N)}, ( + f"expected all {N} entries, got {len(commands)}" + ) + + def test_non_posix_fallback_does_not_self_deadlock( + self, tmp_path, monkeypatch, + ): + """Regression: on platforms without fcntl, the fallback lock must + be separate from _registered_lock. register_from_config holds + _registered_lock while calling _record_approval (via the consent + prompt path), so a shared non-reentrant lock would self-deadlock.""" + import threading + + monkeypatch.setattr(shell_hooks, "fcntl", None) + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) + + completed = threading.Event() + errors: list = [] + + def target() -> None: + try: + with shell_hooks._registered_lock: + shell_hooks._record_approval( + "on_session_start", "/bin/x.sh", + ) + completed.set() + except Exception as exc: # pragma: no cover + errors.append(exc) + completed.set() + + t = threading.Thread(target=target, daemon=True) + t.start() + if not completed.wait(timeout=3.0): + pytest.fail( + "non-POSIX fallback self-deadlocked — " + "_locked_update_approvals must not reuse _registered_lock", + ) + t.join(timeout=1.0) + assert not errors, f"errors: {errors}" + assert shell_hooks._is_allowlisted( + "on_session_start", "/bin/x.sh", + ) + + def test_save_allowlist_failure_logs_actionable_warning( + self, tmp_path, monkeypatch, caplog, + ): + """Persistence failures must log the path, errno, and + re-prompt consequence so "hermes keeps asking" is debuggable.""" + import logging + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) + monkeypatch.setattr( + shell_hooks.tempfile, "mkstemp", + lambda *a, **kw: (_ for _ in ()).throw(OSError(28, "No space")), + ) + with caplog.at_level(logging.WARNING, logger=shell_hooks.logger.name): + shell_hooks.save_allowlist({"approvals": []}) + msg = next( + (r.getMessage() for r in caplog.records + if "Failed to persist" in r.getMessage()), "", + ) + assert "shell-hooks-allowlist.json" in msg + assert "No space" in msg + assert "re-prompt" in msg + + def test_script_is_executable_handles_interpreter_prefix(self, tmp_path): + """For ``python3 hook.py`` and similar the interpreter reads + the script, so X_OK on the script itself is not required — + only R_OK. Bare invocations still require X_OK.""" + script = tmp_path / "hook.py" + script.write_text("print()\n") # readable, NOT executable + + # Interpreter prefix: R_OK is enough. + assert shell_hooks.script_is_executable(f"python3 {script}") + assert shell_hooks.script_is_executable(f"/usr/bin/env python3 {script}") + + # Bare invocation on the same non-X_OK file: not runnable. + assert not shell_hooks.script_is_executable(str(script)) + + # Flip +x; bare invocation is now runnable too. + script.chmod(0o755) + assert shell_hooks.script_is_executable(str(script)) + + def test_command_script_path_resolution(self): + """Regression: ``_command_script_path`` used to return the first + shlex token, which picked the interpreter (``python3``, ``bash``, + ``/usr/bin/env``) instead of the actual script for any + interpreter-prefixed command. That broke + ``hermes hooks doctor``'s executability check and silently + disabled mtime drift detection for such hooks.""" + cases = [ + # bare path + ("/path/hook.sh", "/path/hook.sh"), + ("/bin/echo hi", "/bin/echo"), + ("~/hook.sh", "~/hook.sh"), + ("hook.sh", "hook.sh"), + # interpreter prefix + ("python3 /path/hook.py", "/path/hook.py"), + ("bash /path/hook.sh", "/path/hook.sh"), + ("bash ~/hook.sh", "~/hook.sh"), + ("python3 -u /path/hook.py", "/path/hook.py"), + ("nice -n 10 /path/hook.sh", "/path/hook.sh"), + # /usr/bin/env shebang form — must find the *script*, not env + ("/usr/bin/env python3 /path/hook.py", "/path/hook.py"), + ("/usr/bin/env bash /path/hook.sh", "/path/hook.sh"), + # no path-like tokens → fallback to first token + ("my-binary --verbose", "my-binary"), + ("python3 -c 'print(1)'", "python3"), + # unparseable (unbalanced quotes) → return command as-is + ("python3 'unterminated", "python3 'unterminated"), + # empty + ("", ""), + ] + for command, expected in cases: + got = shell_hooks._command_script_path(command) + assert got == expected, f"{command!r} -> {got!r}, expected {expected!r}" + + def test_save_allowlist_uses_unique_tmp_paths(self, tmp_path, monkeypatch): + """Two save_allowlist calls in flight must use distinct tmp files + so the loser's os.replace does not ENOENT on the winner's sweep.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) + p = shell_hooks.allowlist_path() + p.parent.mkdir(parents=True, exist_ok=True) + + tmp_paths_seen: list = [] + real_mkstemp = shell_hooks.tempfile.mkstemp + + def spying_mkstemp(*args, **kwargs): + fd, path = real_mkstemp(*args, **kwargs) + tmp_paths_seen.append(path) + return fd, path + + monkeypatch.setattr(shell_hooks.tempfile, "mkstemp", spying_mkstemp) + + shell_hooks.save_allowlist({"approvals": [{"event": "a", "command": "x"}]}) + shell_hooks.save_allowlist({"approvals": [{"event": "b", "command": "y"}]}) + + assert len(tmp_paths_seen) == 2 + assert tmp_paths_seen[0] != tmp_paths_seen[1] diff --git a/tests/agent/test_shell_hooks_consent.py b/tests/agent/test_shell_hooks_consent.py new file mode 100644 index 000000000..e1668e4a1 --- /dev/null +++ b/tests/agent/test_shell_hooks_consent.py @@ -0,0 +1,242 @@ +"""Consent-flow tests for the shell-hook allowlist. + +Covers the prompt/non-prompt decision tree: TTY vs non-TTY, and the +three accept-hooks channels (--accept-hooks, HERMES_ACCEPT_HOOKS env, +hooks_auto_accept: config key). +""" + +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import patch + +import pytest + +from agent import shell_hooks + + +@pytest.fixture(autouse=True) +def _isolated_home(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_home")) + monkeypatch.delenv("HERMES_ACCEPT_HOOKS", raising=False) + shell_hooks.reset_for_tests() + yield + shell_hooks.reset_for_tests() + + +def _write_hook_script(tmp_path: Path) -> Path: + script = tmp_path / "hook.sh" + script.write_text("#!/usr/bin/env bash\nprintf '{}\\n'\n") + script.chmod(0o755) + return script + + +# ── TTY prompt flow ─────────────────────────────────────────────────────── + + +class TestTTYPromptFlow: + def test_first_use_prompts_and_approves(self, tmp_path): + from hermes_cli import plugins + + script = _write_hook_script(tmp_path) + plugins._plugin_manager = plugins.PluginManager() + + with patch("sys.stdin") as mock_stdin, patch("builtins.input", return_value="y"): + mock_stdin.isatty.return_value = True + registered = shell_hooks.register_from_config( + {"hooks": {"on_session_start": [{"command": str(script)}]}}, + accept_hooks=False, + ) + assert len(registered) == 1 + + entry = shell_hooks.allowlist_entry_for("on_session_start", str(script)) + assert entry is not None + assert entry["event"] == "on_session_start" + assert entry["command"] == str(script) + + def test_first_use_prompts_and_rejects(self, tmp_path): + from hermes_cli import plugins + + script = _write_hook_script(tmp_path) + plugins._plugin_manager = plugins.PluginManager() + + with patch("sys.stdin") as mock_stdin, patch("builtins.input", return_value="n"): + mock_stdin.isatty.return_value = True + registered = shell_hooks.register_from_config( + {"hooks": {"on_session_start": [{"command": str(script)}]}}, + accept_hooks=False, + ) + assert registered == [] + assert shell_hooks.allowlist_entry_for( + "on_session_start", str(script), + ) is None + + def test_subsequent_use_does_not_prompt(self, tmp_path): + """After the first approval, re-registration must be silent.""" + from hermes_cli import plugins + + script = _write_hook_script(tmp_path) + plugins._plugin_manager = plugins.PluginManager() + + # First call: TTY, approved. + with patch("sys.stdin") as mock_stdin, patch("builtins.input", return_value="y"): + mock_stdin.isatty.return_value = True + shell_hooks.register_from_config( + {"hooks": {"on_session_start": [{"command": str(script)}]}}, + accept_hooks=False, + ) + + # Reset registration set but keep the allowlist on disk. + shell_hooks.reset_for_tests() + + # Second call: TTY, input() must NOT be called. + with patch("sys.stdin") as mock_stdin, patch( + "builtins.input", side_effect=AssertionError("should not prompt"), + ): + mock_stdin.isatty.return_value = True + registered = shell_hooks.register_from_config( + {"hooks": {"on_session_start": [{"command": str(script)}]}}, + accept_hooks=False, + ) + assert len(registered) == 1 + + +# ── non-TTY flow ────────────────────────────────────────────────────────── + + +class TestNonTTYFlow: + def test_no_tty_no_flag_skips_registration(self, tmp_path): + from hermes_cli import plugins + + script = _write_hook_script(tmp_path) + plugins._plugin_manager = plugins.PluginManager() + + with patch("sys.stdin") as mock_stdin: + mock_stdin.isatty.return_value = False + registered = shell_hooks.register_from_config( + {"hooks": {"on_session_start": [{"command": str(script)}]}}, + accept_hooks=False, + ) + assert registered == [] + + def test_no_tty_with_argument_flag_accepts(self, tmp_path): + from hermes_cli import plugins + + script = _write_hook_script(tmp_path) + plugins._plugin_manager = plugins.PluginManager() + + with patch("sys.stdin") as mock_stdin: + mock_stdin.isatty.return_value = False + registered = shell_hooks.register_from_config( + {"hooks": {"on_session_start": [{"command": str(script)}]}}, + accept_hooks=True, + ) + assert len(registered) == 1 + + def test_no_tty_with_env_accepts(self, tmp_path, monkeypatch): + from hermes_cli import plugins + + script = _write_hook_script(tmp_path) + plugins._plugin_manager = plugins.PluginManager() + monkeypatch.setenv("HERMES_ACCEPT_HOOKS", "1") + + with patch("sys.stdin") as mock_stdin: + mock_stdin.isatty.return_value = False + registered = shell_hooks.register_from_config( + {"hooks": {"on_session_start": [{"command": str(script)}]}}, + accept_hooks=False, + ) + assert len(registered) == 1 + + def test_no_tty_with_config_accepts(self, tmp_path): + from hermes_cli import plugins + + script = _write_hook_script(tmp_path) + plugins._plugin_manager = plugins.PluginManager() + + with patch("sys.stdin") as mock_stdin: + mock_stdin.isatty.return_value = False + registered = shell_hooks.register_from_config( + { + "hooks_auto_accept": True, + "hooks": {"on_session_start": [{"command": str(script)}]}, + }, + accept_hooks=False, + ) + assert len(registered) == 1 + + +# ── Allowlist + revoke + mtime ──────────────────────────────────────────── + + +class TestAllowlistOps: + def test_mtime_recorded_on_approval(self, tmp_path): + script = _write_hook_script(tmp_path) + shell_hooks._record_approval("on_session_start", str(script)) + + entry = shell_hooks.allowlist_entry_for( + "on_session_start", str(script), + ) + assert entry is not None + assert entry["script_mtime_at_approval"] is not None + # ISO-8601 Z-suffix + assert entry["script_mtime_at_approval"].endswith("Z") + + def test_revoke_removes_entry(self, tmp_path): + script = _write_hook_script(tmp_path) + shell_hooks._record_approval("on_session_start", str(script)) + assert shell_hooks.allowlist_entry_for( + "on_session_start", str(script), + ) is not None + + removed = shell_hooks.revoke(str(script)) + assert removed == 1 + assert shell_hooks.allowlist_entry_for( + "on_session_start", str(script), + ) is None + + def test_revoke_unknown_returns_zero(self, tmp_path): + assert shell_hooks.revoke(str(tmp_path / "never-approved.sh")) == 0 + + def test_tilde_path_approval_records_resolvable_mtime(self, tmp_path, monkeypatch): + """If the command uses ~ the approval must still find the file.""" + monkeypatch.setenv("HOME", str(tmp_path)) + target = tmp_path / "hook.sh" + target.write_text("#!/usr/bin/env bash\n") + target.chmod(0o755) + + shell_hooks._record_approval("on_session_start", "~/hook.sh") + entry = shell_hooks.allowlist_entry_for( + "on_session_start", "~/hook.sh", + ) + assert entry is not None + # Must not be None — the tilde was expanded before stat(). + assert entry["script_mtime_at_approval"] is not None + + def test_duplicate_approval_replaces_mtime(self, tmp_path): + """Re-approving the same pair refreshes the approval timestamp.""" + script = _write_hook_script(tmp_path) + shell_hooks._record_approval("on_session_start", str(script)) + original_entry = shell_hooks.allowlist_entry_for( + "on_session_start", str(script), + ) + assert original_entry is not None + + # Touch the script to bump its mtime then re-approve. + import os + import time + new_mtime = original_entry.get("script_mtime_at_approval") + time.sleep(0.01) + os.utime(script, None) # current time + + shell_hooks._record_approval("on_session_start", str(script)) + + # Exactly one entry per (event, command). + approvals = shell_hooks.load_allowlist().get("approvals", []) + matching = [ + e for e in approvals + if e.get("event") == "on_session_start" + and e.get("command") == str(script) + ] + assert len(matching) == 1 diff --git a/tests/agent/test_subagent_stop_hook.py b/tests/agent/test_subagent_stop_hook.py new file mode 100644 index 000000000..a2b417a07 --- /dev/null +++ b/tests/agent/test_subagent_stop_hook.py @@ -0,0 +1,224 @@ +"""Tests for the subagent_stop hook event. + +Covers wire-up from tools.delegate_tool.delegate_task: + * fires once per child in both single-task and batch modes + * runs on the parent thread (no re-entrancy for hook authors) + * carries child_role when the agent exposes _delegate_role + * carries child_role=None when _delegate_role is not set (pre-M3) +""" + +from __future__ import annotations + +import json +import threading +from unittest.mock import MagicMock, patch + +import pytest + +from tools.delegate_tool import delegate_task +from hermes_cli import plugins + + +def _make_parent(depth: int = 0, session_id: str = "parent-1"): + parent = MagicMock() + parent.base_url = "https://openrouter.ai/api/v1" + parent.api_key = "***" + parent.provider = "openrouter" + parent.api_mode = "chat_completions" + parent.model = "anthropic/claude-sonnet-4" + parent.platform = "cli" + parent.providers_allowed = None + parent.providers_ignored = None + parent.providers_order = None + parent.provider_sort = None + parent._session_db = None + parent._delegate_depth = depth + parent._active_children = [] + parent._active_children_lock = threading.Lock() + parent._print_fn = None + parent.tool_progress_callback = None + parent.thinking_callback = None + parent._memory_manager = None + parent.session_id = session_id + return parent + + +@pytest.fixture(autouse=True) +def _fresh_plugin_manager(): + """Each test gets a fresh PluginManager so hook callbacks don't + leak between tests.""" + original = plugins._plugin_manager + plugins._plugin_manager = plugins.PluginManager() + yield + plugins._plugin_manager = original + + +@pytest.fixture(autouse=True) +def _stub_child_builder(monkeypatch): + """Replace _build_child_agent with a MagicMock factory so delegate_task + never transitively imports run_agent / openai. Keeps the test runnable + in environments without heavyweight runtime deps installed.""" + def _fake_build_child(task_index, **kwargs): + child = MagicMock() + child._delegate_saved_tool_names = [] + child._credential_pool = None + return child + + monkeypatch.setattr( + "tools.delegate_tool._build_child_agent", _fake_build_child, + ) + + +def _register_capturing_hook(): + captured = [] + + def _cb(**kwargs): + kwargs["_thread"] = threading.current_thread() + captured.append(kwargs) + + mgr = plugins.get_plugin_manager() + mgr._hooks.setdefault("subagent_stop", []).append(_cb) + return captured + + +# ── single-task mode ────────────────────────────────────────────────────── + + +class TestSingleTask: + def test_fires_once(self): + captured = _register_capturing_hook() + + with patch("tools.delegate_tool._run_single_child") as mock_run: + mock_run.return_value = { + "task_index": 0, + "status": "completed", + "summary": "Done!", + "api_calls": 3, + "duration_seconds": 5.0, + "_child_role": "analyst", + } + delegate_task(goal="do X", parent_agent=_make_parent()) + + assert len(captured) == 1 + payload = captured[0] + assert payload["child_role"] == "analyst" + assert payload["child_status"] == "completed" + assert payload["child_summary"] == "Done!" + assert payload["duration_ms"] == 5000 + + def test_fires_on_parent_thread(self): + captured = _register_capturing_hook() + main_thread = threading.current_thread() + + with patch("tools.delegate_tool._run_single_child") as mock_run: + mock_run.return_value = { + "task_index": 0, "status": "completed", + "summary": "x", "api_calls": 1, "duration_seconds": 0.1, + "_child_role": None, + } + delegate_task(goal="go", parent_agent=_make_parent()) + + assert captured[0]["_thread"] is main_thread + + def test_payload_includes_parent_session_id(self): + captured = _register_capturing_hook() + + with patch("tools.delegate_tool._run_single_child") as mock_run: + mock_run.return_value = { + "task_index": 0, "status": "completed", + "summary": "x", "api_calls": 1, "duration_seconds": 0.1, + "_child_role": None, + } + delegate_task( + goal="go", + parent_agent=_make_parent(session_id="sess-xyz"), + ) + + assert captured[0]["parent_session_id"] == "sess-xyz" + + +# ── batch mode ──────────────────────────────────────────────────────────── + + +class TestBatchMode: + def test_fires_per_child(self): + captured = _register_capturing_hook() + + with patch("tools.delegate_tool._run_single_child") as mock_run: + mock_run.side_effect = [ + {"task_index": 0, "status": "completed", + "summary": "A", "api_calls": 1, "duration_seconds": 1.0, + "_child_role": "role-a"}, + {"task_index": 1, "status": "completed", + "summary": "B", "api_calls": 2, "duration_seconds": 2.0, + "_child_role": "role-b"}, + {"task_index": 2, "status": "completed", + "summary": "C", "api_calls": 3, "duration_seconds": 3.0, + "_child_role": "role-c"}, + ] + delegate_task( + tasks=[ + {"goal": "A"}, {"goal": "B"}, {"goal": "C"}, + ], + parent_agent=_make_parent(), + ) + + assert len(captured) == 3 + roles = sorted(c["child_role"] for c in captured) + assert roles == ["role-a", "role-b", "role-c"] + + def test_all_fires_on_parent_thread(self): + captured = _register_capturing_hook() + main_thread = threading.current_thread() + + with patch("tools.delegate_tool._run_single_child") as mock_run: + mock_run.side_effect = [ + {"task_index": 0, "status": "completed", + "summary": "A", "api_calls": 1, "duration_seconds": 1.0, + "_child_role": None}, + {"task_index": 1, "status": "completed", + "summary": "B", "api_calls": 2, "duration_seconds": 2.0, + "_child_role": None}, + ] + delegate_task( + tasks=[{"goal": "A"}, {"goal": "B"}], + parent_agent=_make_parent(), + ) + + for payload in captured: + assert payload["_thread"] is main_thread + + +# ── payload shape ───────────────────────────────────────────────────────── + + +class TestPayloadShape: + def test_role_absent_becomes_none(self): + captured = _register_capturing_hook() + + with patch("tools.delegate_tool._run_single_child") as mock_run: + mock_run.return_value = { + "task_index": 0, "status": "completed", + "summary": "x", "api_calls": 1, "duration_seconds": 0.1, + # Deliberately omit _child_role — pre-M3 shape. + } + delegate_task(goal="do X", parent_agent=_make_parent()) + + assert captured[0]["child_role"] is None + + def test_result_does_not_leak_child_role_field(self): + """The internal _child_role key must be stripped before the + result dict is serialised to JSON.""" + _register_capturing_hook() + + with patch("tools.delegate_tool._run_single_child") as mock_run: + mock_run.return_value = { + "task_index": 0, "status": "completed", + "summary": "x", "api_calls": 1, "duration_seconds": 0.1, + "_child_role": "leaf", + } + raw = delegate_task(goal="do X", parent_agent=_make_parent()) + + parsed = json.loads(raw) + assert "results" in parsed + assert "_child_role" not in parsed["results"][0] diff --git a/tests/hermes_cli/test_argparse_flag_propagation.py b/tests/hermes_cli/test_argparse_flag_propagation.py index 7787fdd6f..741425a82 100644 --- a/tests/hermes_cli/test_argparse_flag_propagation.py +++ b/tests/hermes_cli/test_argparse_flag_propagation.py @@ -91,3 +91,42 @@ class TestYoloEnvVar: args = parser.parse_args(["chat"]) self._simulate_cmd_chat_yolo_check(args) assert os.environ.get("HERMES_YOLO_MODE") is None + + +class TestAcceptHooksOnAgentSubparsers: + """Verify --accept-hooks is accepted at every agent-subcommand + position (before the subcommand, between group/subcommand, and + after the leaf subcommand) for gateway/cron/mcp/acp. Regression + against prior behaviour where the flag only worked on the root + parser and `chat`, so `hermes gateway run --accept-hooks` failed + with `unrecognized arguments`.""" + + @pytest.mark.parametrize("argv", [ + ["--accept-hooks", "gateway", "run", "--help"], + ["gateway", "--accept-hooks", "run", "--help"], + ["gateway", "run", "--accept-hooks", "--help"], + ["--accept-hooks", "cron", "tick", "--help"], + ["cron", "--accept-hooks", "tick", "--help"], + ["cron", "tick", "--accept-hooks", "--help"], + ["cron", "run", "--accept-hooks", "dummy-id", "--help"], + ["--accept-hooks", "mcp", "serve", "--help"], + ["mcp", "--accept-hooks", "serve", "--help"], + ["mcp", "serve", "--accept-hooks", "--help"], + ["acp", "--accept-hooks", "--help"], + ]) + def test_accepted_at_every_position(self, argv): + """Invoking `hermes ` must exit 0 (help) rather than + failing with `unrecognized arguments`.""" + import subprocess + result = subprocess.run( + [sys.executable, "-m", "hermes_cli.main", *argv], + capture_output=True, + text=True, + timeout=15, + ) + assert result.returncode == 0, ( + f"argv={argv!r} returned {result.returncode}\n" + f"stdout: {result.stdout[:300]}\n" + f"stderr: {result.stderr[:300]}" + ) + assert "unrecognized arguments" not in result.stderr diff --git a/tests/hermes_cli/test_hooks_cli.py b/tests/hermes_cli/test_hooks_cli.py new file mode 100644 index 000000000..6d4609c52 --- /dev/null +++ b/tests/hermes_cli/test_hooks_cli.py @@ -0,0 +1,268 @@ +"""Tests for the ``hermes hooks`` CLI subcommand.""" + +from __future__ import annotations + +import io +import json +import sys +from contextlib import redirect_stdout +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import patch + +import pytest + +from agent import shell_hooks +from hermes_cli import hooks as hooks_cli + + +@pytest.fixture(autouse=True) +def _isolated_home(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home")) + monkeypatch.delenv("HERMES_ACCEPT_HOOKS", raising=False) + shell_hooks.reset_for_tests() + yield + shell_hooks.reset_for_tests() + + +def _hook_script(tmp_path: Path, body: str, name: str = "hook.sh") -> Path: + p = tmp_path / name + p.write_text(body) + p.chmod(0o755) + return p + + +def _run(sub_args: SimpleNamespace) -> str: + """Capture stdout for a hooks_command invocation.""" + buf = io.StringIO() + with redirect_stdout(buf): + hooks_cli.hooks_command(sub_args) + return buf.getvalue() + + +# ── list ────────────────────────────────────────────────────────────────── + + +class TestHooksList: + def test_empty_config(self, tmp_path): + with patch("hermes_cli.config.load_config", return_value={}): + out = _run(SimpleNamespace(hooks_action="list")) + assert "No shell hooks configured" in out + + def test_shows_configured_and_consent_status(self, tmp_path): + script = _hook_script( + tmp_path, "#!/usr/bin/env bash\nprintf '{}\\n'\n", + ) + cfg = { + "hooks": { + "pre_tool_call": [ + {"matcher": "terminal", "command": str(script), "timeout": 30}, + ], + "on_session_start": [ + {"command": str(script)}, + ], + } + } + + # Approve one of the two so we can see both states in the output + shell_hooks._record_approval("pre_tool_call", str(script)) + + with patch("hermes_cli.config.load_config", return_value=cfg): + out = _run(SimpleNamespace(hooks_action="list")) + + assert "[pre_tool_call]" in out + assert "[on_session_start]" in out + assert "✓ allowed" in out + assert "✗ not allowlisted" in out + assert str(script) in out + + +# ── test ────────────────────────────────────────────────────────────────── + + +class TestHooksTest: + def test_synthetic_payload_matches_production_shape(self, tmp_path): + """`hermes hooks test` must feed the script stdin in the same + shape invoke_hook() would at runtime. Prior to this fix, + run_once bypassed _serialize_payload and the two paths diverged — + scripts tested with `hermes hooks test` saw different top-level + keys than at runtime, silently breaking in production.""" + capture = tmp_path / "captured.json" + script = _hook_script( + tmp_path, + f"#!/usr/bin/env bash\ncat - > {capture}\nprintf '{{}}\\n'\n", + ) + cfg = {"hooks": {"subagent_stop": [{"command": str(script)}]}} + with patch("hermes_cli.config.load_config", return_value=cfg): + _run(SimpleNamespace( + hooks_action="test", event="subagent_stop", + for_tool=None, payload_file=None, + )) + + seen = json.loads(capture.read_text()) + # Same top-level keys _serialize_payload produces at runtime + assert set(seen.keys()) == { + "hook_event_name", "tool_name", "tool_input", + "session_id", "cwd", "extra", + } + # parent_session_id was routed to top-level session_id (matches runtime) + assert seen["session_id"] == "parent-sess" + assert "parent_session_id" not in seen["extra"] + # subagent_stop has no tool, so tool_name / tool_input are null + assert seen["tool_name"] is None + assert seen["tool_input"] is None + + def test_fires_real_subprocess_and_parses_block(self, tmp_path): + block_script = _hook_script( + tmp_path, + "#!/usr/bin/env bash\n" + 'printf \'{"decision": "block", "reason": "nope"}\\n\'\n', + name="block.sh", + ) + cfg = { + "hooks": { + "pre_tool_call": [ + {"matcher": "terminal", "command": str(block_script)}, + ], + }, + } + with patch("hermes_cli.config.load_config", return_value=cfg): + out = _run(SimpleNamespace( + hooks_action="test", event="pre_tool_call", + for_tool="terminal", payload_file=None, + )) + + # Parsed block appears in output + assert '"action": "block"' in out + assert '"message": "nope"' in out + + def test_for_tool_matcher_filters(self, tmp_path): + script = _hook_script(tmp_path, "#!/usr/bin/env bash\nprintf '{}\\n'\n") + cfg = { + "hooks": { + "pre_tool_call": [ + {"matcher": "terminal", "command": str(script)}, + ], + } + } + with patch("hermes_cli.config.load_config", return_value=cfg): + out = _run(SimpleNamespace( + hooks_action="test", event="pre_tool_call", + for_tool="web_search", payload_file=None, + )) + assert "No shell hooks" in out + + def test_unknown_event(self): + with patch("hermes_cli.config.load_config", return_value={}): + out = _run(SimpleNamespace( + hooks_action="test", event="bogus_event", + for_tool=None, payload_file=None, + )) + assert "Unknown event" in out + + +# ── revoke ──────────────────────────────────────────────────────────────── + + +class TestHooksRevoke: + def test_revoke_removes_entry(self, tmp_path): + script = _hook_script(tmp_path, "#!/usr/bin/env bash\n") + shell_hooks._record_approval("on_session_start", str(script)) + + out = _run(SimpleNamespace(hooks_action="revoke", command=str(script))) + assert "Removed 1" in out + assert shell_hooks.allowlist_entry_for( + "on_session_start", str(script), + ) is None + + def test_revoke_unknown(self, tmp_path): + out = _run(SimpleNamespace( + hooks_action="revoke", command=str(tmp_path / "never.sh"), + )) + assert "No allowlist entry" in out + + +# ── doctor ──────────────────────────────────────────────────────────────── + + +class TestHooksDoctor: + def test_flags_missing_exec_bit(self, tmp_path): + script = tmp_path / "hook.sh" + script.write_text("#!/usr/bin/env bash\nprintf '{}\\n'\n") + # No chmod — intentionally not executable + cfg = {"hooks": {"on_session_start": [{"command": str(script)}]}} + with patch("hermes_cli.config.load_config", return_value=cfg): + out = _run(SimpleNamespace(hooks_action="doctor")) + assert "not executable" in out.lower() + + def test_flags_unallowlisted(self, tmp_path): + script = _hook_script(tmp_path, "#!/usr/bin/env bash\nprintf '{}\\n'\n") + cfg = {"hooks": {"on_session_start": [{"command": str(script)}]}} + with patch("hermes_cli.config.load_config", return_value=cfg): + out = _run(SimpleNamespace(hooks_action="doctor")) + assert "not allowlisted" in out.lower() + + def test_flags_invalid_json(self, tmp_path): + script = _hook_script( + tmp_path, + "#!/usr/bin/env bash\necho 'not json!'\n", + ) + shell_hooks._record_approval("on_session_start", str(script)) + cfg = {"hooks": {"on_session_start": [{"command": str(script)}]}} + with patch("hermes_cli.config.load_config", return_value=cfg): + out = _run(SimpleNamespace(hooks_action="doctor")) + assert "not valid JSON" in out + + def test_flags_mtime_drift(self, tmp_path, monkeypatch): + """Allowlist with older mtime than current -> drift warning.""" + script = _hook_script(tmp_path, "#!/usr/bin/env bash\nprintf '{}\\n'\n") + + # Manually stash an allowlist entry with an old mtime + from agent.shell_hooks import allowlist_path + allowlist_path().parent.mkdir(parents=True, exist_ok=True) + allowlist_path().write_text(json.dumps({ + "approvals": [ + { + "event": "on_session_start", + "command": str(script), + "approved_at": "2000-01-01T00:00:00Z", + "script_mtime_at_approval": "2000-01-01T00:00:00Z", + } + ] + })) + + cfg = {"hooks": {"on_session_start": [{"command": str(script)}]}} + with patch("hermes_cli.config.load_config", return_value=cfg): + out = _run(SimpleNamespace(hooks_action="doctor")) + assert "modified since approval" in out + + def test_clean_script_runs(self, tmp_path): + script = _hook_script(tmp_path, "#!/usr/bin/env bash\nprintf '{}\\n'\n") + shell_hooks._record_approval("on_session_start", str(script)) + cfg = {"hooks": {"on_session_start": [{"command": str(script)}]}} + with patch("hermes_cli.config.load_config", return_value=cfg): + out = _run(SimpleNamespace(hooks_action="doctor")) + assert "All shell hooks look healthy" in out + + def test_unallowlisted_script_is_not_executed(self, tmp_path): + """Regression for M4: `hermes hooks doctor` used to run every + listed script against a synthetic payload as part of its JSON + smoke test, which contradicted the documented workflow of + "spot newly-added hooks *before they register*". An un-allowlisted + script must not be executed during `doctor`.""" + sentinel = tmp_path / "executed" + # Script would touch the sentinel if executed; we assert it wasn't. + script = _hook_script( + tmp_path, + f"#!/usr/bin/env bash\ntouch {sentinel}\nprintf '{{}}\\n'\n", + ) + cfg = {"hooks": {"on_session_start": [{"command": str(script)}]}} + with patch("hermes_cli.config.load_config", return_value=cfg): + out = _run(SimpleNamespace(hooks_action="doctor")) + + assert not sentinel.exists(), ( + "doctor executed an un-allowlisted script — " + "M4 gate regressed" + ) + assert "not allowlisted" in out.lower() + assert "skipped JSON smoke test" in out diff --git a/tools/delegate_tool.py b/tools/delegate_tool.py index 1db6c0862..2e6065245 100644 --- a/tools/delegate_tool.py +++ b/tools/delegate_tool.py @@ -593,6 +593,10 @@ def _run_single_child( "output": _output_tokens if isinstance(_output_tokens, (int, float)) else 0, }, "tool_trace": tool_trace, + # Captured before the finally block calls child.close() so the + # parent thread can fire subagent_stop with the correct role. + # Stripped before the dict is serialised back to the model. + "_child_role": getattr(child, "_delegate_role", None), } if status == "failed": entry["error"] = result.get("error", "Subagent did not produce a response.") @@ -632,6 +636,7 @@ def _run_single_child( "error": str(exc), "api_calls": 0, "duration_seconds": duration, + "_child_role": getattr(child, "_delegate_role", None), } finally: @@ -815,6 +820,10 @@ def delegate_task( # the parent blocks forever even after interrupt propagation. # Instead, use wait() with a short timeout so we can bail # when the parent is interrupted. + # Map task_index -> child agent, so fabricated entries for + # still-pending futures can carry the correct _delegate_role. + _child_by_index = {i: child for (i, _, child) in children} + pending = set(futures.keys()) while pending: if getattr(parent_agent, "_interrupt_requested", False) is True: @@ -834,6 +843,9 @@ def delegate_task( "error": str(exc), "api_calls": 0, "duration_seconds": 0, + "_child_role": getattr( + _child_by_index.get(idx), "_delegate_role", None + ), } else: entry = { @@ -843,6 +855,9 @@ def delegate_task( "error": "Parent agent interrupted — child did not finish in time", "api_calls": 0, "duration_seconds": 0, + "_child_role": getattr( + _child_by_index.get(idx), "_delegate_role", None + ), } results.append(entry) completed_count += 1 @@ -862,6 +877,9 @@ def delegate_task( "error": str(exc), "api_calls": 0, "duration_seconds": 0, + "_child_role": getattr( + _child_by_index.get(idx), "_delegate_role", None + ), } results.append(entry) completed_count += 1 @@ -905,6 +923,33 @@ def delegate_task( except Exception: pass + # Fire subagent_stop hooks once per child, serialised on the parent thread. + # This keeps Python-plugin and shell-hook callbacks off of the worker threads + # that ran the children, so hook authors don't need to reason about + # concurrent invocation. Role was captured into the entry dict in + # _run_single_child (or the fabricated-entry branches above) before the + # child was closed. + _parent_session_id = getattr(parent_agent, "session_id", None) + try: + from hermes_cli.plugins import invoke_hook as _invoke_hook + except Exception: + _invoke_hook = None + for entry in results: + child_role = entry.pop("_child_role", None) + if _invoke_hook is None: + continue + try: + _invoke_hook( + "subagent_stop", + parent_session_id=_parent_session_id, + child_role=child_role, + child_summary=entry.get("summary"), + child_status=entry.get("status"), + duration_ms=int((entry.get("duration_seconds") or 0) * 1000), + ) + except Exception: + logger.debug("subagent_stop hook invocation failed", exc_info=True) + total_duration = round(time.monotonic() - overall_start, 2) return json.dumps({ diff --git a/website/docs/user-guide/features/hooks.md b/website/docs/user-guide/features/hooks.md index a64f32209..3dd07bc1c 100644 --- a/website/docs/user-guide/features/hooks.md +++ b/website/docs/user-guide/features/hooks.md @@ -6,14 +6,15 @@ description: "Run custom code at key lifecycle points — log activity, send ale # Event Hooks -Hermes has two hook systems that run custom code at key lifecycle points: +Hermes has three hook systems that run custom code at key lifecycle points: | System | Registered via | Runs in | Use case | |--------|---------------|---------|----------| | **[Gateway hooks](#gateway-event-hooks)** | `HOOK.yaml` + `handler.py` in `~/.hermes/hooks/` | Gateway only | Logging, alerts, webhooks | | **[Plugin hooks](#plugin-hooks)** | `ctx.register_hook()` in a [plugin](/docs/user-guide/features/plugins) | CLI + Gateway | Tool interception, metrics, guardrails | +| **[Shell hooks](#shell-hooks)** | `hooks:` block in `~/.hermes/config.yaml` pointing at shell scripts | CLI + Gateway | Drop-in scripts for blocking, auto-formatting, context injection | -Both systems are non-blocking — errors in any hook are caught and logged, never crashing the agent. +All three systems are non-blocking — errors in any hook are caught and logged, never crashing the agent. ## Gateway Event Hooks @@ -231,20 +232,21 @@ def register(ctx): - Callbacks receive **keyword arguments**. Always accept `**kwargs` for forward compatibility — new parameters may be added in future versions without breaking your plugin. - If a callback **crashes**, it's logged and skipped. Other hooks and the agent continue normally. A misbehaving plugin can never break the agent. -- All hooks are **fire-and-forget observers** whose return values are ignored — except `pre_llm_call`, which can [inject context](#pre_llm_call). +- Two hooks' return values affect behavior: [`pre_tool_call`](#pre_tool_call) can **block** the tool, and [`pre_llm_call`](#pre_llm_call) can **inject context** into the LLM call. All other hooks are fire-and-forget observers. ### Quick reference | Hook | Fires when | Returns | |------|-----------|---------| -| [`pre_tool_call`](#pre_tool_call) | Before any tool executes | ignored | +| [`pre_tool_call`](#pre_tool_call) | Before any tool executes | `{"action": "block", "message": str}` to veto the call | | [`post_tool_call`](#post_tool_call) | After any tool returns | ignored | -| [`pre_llm_call`](#pre_llm_call) | Once per turn, before the tool-calling loop | context injection | +| [`pre_llm_call`](#pre_llm_call) | Once per turn, before the tool-calling loop | `{"context": str}` to prepend context to the user message | | [`post_llm_call`](#post_llm_call) | Once per turn, after the tool-calling loop | ignored | | [`on_session_start`](#on_session_start) | New session created (first turn only) | ignored | | [`on_session_end`](#on_session_end) | Session ends | ignored | | [`on_session_finalize`](#on_session_finalize) | CLI/gateway tears down an active session (flush, save, stats) | ignored | | [`on_session_reset`](#on_session_reset) | Gateway swaps in a fresh session key (e.g. `/new`, `/reset`) | ignored | +| [`subagent_stop`](#subagent_stop) | A `delegate_task` child has exited | ignored | --- @@ -266,9 +268,15 @@ def my_callback(tool_name: str, args: dict, task_id: str, **kwargs): **Fires:** In `model_tools.py`, inside `handle_function_call()`, before the tool's handler runs. Fires once per tool call — if the model calls 3 tools in parallel, this fires 3 times. -**Return value:** Ignored. +**Return value — veto the call:** -**Use cases:** Logging, audit trails, tool call counters, blocking dangerous operations (print a warning), rate limiting. +```python +return {"action": "block", "message": "Reason the tool call was blocked"} +``` + +The agent short-circuits the tool with `message` as the error returned to the model. The first matching block directive wins (Python plugins registered first, then shell hooks). Any other return value is ignored, so existing observer-only callbacks keep working unchanged. + +**Use cases:** Logging, audit trails, tool call counters, blocking dangerous operations, rate limiting, per-user policy enforcement. **Example — tool call audit log:** @@ -649,3 +657,247 @@ def my_callback(session_id: str, platform: str, **kwargs): --- See the **[Build a Plugin guide](/docs/guides/build-a-hermes-plugin)** for the full walkthrough including tool schemas, handlers, and advanced hook patterns. + +--- + +### `subagent_stop` + +Fires **once per child agent** after `delegate_task` finishes. Whether you delegated a single task or a batch of three, this hook fires once for each child, serialised on the parent thread. + +**Callback signature:** + +```python +def my_callback(parent_session_id: str, child_role: str | None, + child_summary: str | None, child_status: str, + duration_ms: int, **kwargs): +``` + +| Parameter | Type | Description | +|-----------|------|-------------| +| `parent_session_id` | `str` | Session ID of the delegating parent agent | +| `child_role` | `str \| None` | Orchestrator role tag set on the child (`None` if the feature isn't enabled) | +| `child_summary` | `str \| None` | The final response the child returned to the parent | +| `child_status` | `str` | `"completed"`, `"failed"`, `"interrupted"`, or `"error"` | +| `duration_ms` | `int` | Wall-clock time spent running the child, in milliseconds | + +**Fires:** In `tools/delegate_tool.py`, after `ThreadPoolExecutor.as_completed()` drains all child futures. Firing is marshalled to the parent thread so hook authors don't have to reason about concurrent callback execution. + +**Return value:** Ignored. + +**Use cases:** Logging orchestration activity, accumulating child durations for billing, writing post-delegation audit records. + +**Example — log orchestrator activity:** + +```python +import logging +logger = logging.getLogger(__name__) + +def log_subagent(parent_session_id, child_role, child_status, duration_ms, **kwargs): + logger.info( + "SUBAGENT parent=%s role=%s status=%s duration_ms=%d", + parent_session_id, child_role, child_status, duration_ms, + ) + +def register(ctx): + ctx.register_hook("subagent_stop", log_subagent) +``` + +:::info +With heavy delegation (e.g. orchestrator roles × 5 leaves × nested depth), `subagent_stop` fires many times per turn. Keep your callback fast; push expensive work to a background queue. +::: + +--- + +## Shell Hooks + +Declare shell-script hooks in your `cli-config.yaml` and Hermes will run them as subprocesses whenever the corresponding plugin-hook event fires — in both CLI and gateway sessions. No Python plugin authoring required. + +Use shell hooks when you want a drop-in, single-file script (Bash, Python, anything with a shebang) to: + +- **Block a tool call** — reject dangerous `terminal` commands, enforce per-directory policies, require approval for destructive `write_file` / `patch` operations. +- **Run after a tool call** — auto-format Python or TypeScript files that the agent just wrote, log API calls, trigger a CI workflow. +- **Inject context into the next LLM turn** — prepend `git status` output, the current weekday, or retrieved documents to the user message (see [`pre_llm_call`](#pre_llm_call)). +- **Observe lifecycle events** — write a log line when a subagent completes (`subagent_stop`) or a session starts (`on_session_start`). + +Shell hooks are registered by calling `agent.shell_hooks.register_from_config(cfg)` at both CLI startup (`hermes_cli/main.py`) and gateway startup (`gateway/run.py`). They compose naturally with Python plugin hooks — both flow through the same dispatcher. + +### Comparison at a glance + +| Dimension | Shell hooks | [Plugin hooks](#plugin-hooks) | [Gateway hooks](#gateway-event-hooks) | +|-----------|-------------|-------------------------------|---------------------------------------| +| Declared in | `hooks:` block in `~/.hermes/config.yaml` | `register()` in a `plugin.yaml` plugin | `HOOK.yaml` + `handler.py` directory | +| Lives under | `~/.hermes/agent-hooks/` (by convention) | `~/.hermes/plugins//` | `~/.hermes/hooks//` | +| Language | Any (Bash, Python, Go binary, …) | Python only | Python only | +| Runs in | CLI + Gateway | CLI + Gateway | Gateway only | +| Events | `VALID_HOOKS` (incl. `subagent_stop`) | `VALID_HOOKS` | Gateway lifecycle (`gateway:startup`, `agent:*`, `command:*`) | +| Can block a tool call | Yes (`pre_tool_call`) | Yes (`pre_tool_call`) | No | +| Can inject LLM context | Yes (`pre_llm_call`) | Yes (`pre_llm_call`) | No | +| Consent | First-use prompt per `(event, command)` pair | Implicit (Python plugin trust) | Implicit (dir trust) | +| Inter-process isolation | Yes (subprocess) | No (in-process) | No (in-process) | + +### Configuration schema + +```yaml +hooks: + : # Must be in VALID_HOOKS + - matcher: "" # Optional; used for pre/post_tool_call only + command: "" # Required; runs via shlex.split, shell=False + timeout: # Optional; default 60, capped at 300 + +hooks_auto_accept: false # See "Consent model" below +``` + +Event names must be one of the [plugin hook events](#plugin-hooks); typos produce a "Did you mean X?" warning and are skipped. Unknown keys inside a single entry are ignored; missing `command` is a skip-with-warning. `timeout > 300` is clamped with a warning. + +### JSON wire protocol + +Each time the event fires, Hermes spawns a subprocess for every matching hook (matcher permitting), pipes a JSON payload to **stdin**, and reads **stdout** back as JSON. + +**stdin — payload the script receives:** + +```json +{ + "hook_event_name": "pre_tool_call", + "tool_name": "terminal", + "tool_input": {"command": "rm -rf /"}, + "session_id": "sess_abc123", + "cwd": "/home/user/project", + "extra": {"task_id": "...", "tool_call_id": "..."} +} +``` + +`tool_name` and `tool_input` are `null` for non-tool events (`pre_llm_call`, `subagent_stop`, session lifecycle). The `extra` dict carries all event-specific kwargs (`user_message`, `conversation_history`, `child_role`, `duration_ms`, …). Unserialisable values are stringified rather than omitted. + +**stdout — optional response:** + +```jsonc +// Block a pre_tool_call (both shapes accepted; normalised internally): +{"decision": "block", "reason": "Forbidden: rm -rf"} // Claude-Code style +{"action": "block", "message": "Forbidden: rm -rf"} // Hermes-canonical + +// Inject context for pre_llm_call: +{"context": "Today is Friday, 2026-04-17"} + +// Silent no-op — any empty / non-matching output is fine: +``` + +Malformed JSON, non-zero exit codes, and timeouts log a warning but never abort the agent loop. + +### Worked examples + +#### 1. Auto-format Python files after every write + +```yaml +# ~/.hermes/config.yaml +hooks: + post_tool_call: + - matcher: "write_file|patch" + command: "~/.hermes/agent-hooks/auto-format.sh" +``` + +```bash +#!/usr/bin/env bash +# ~/.hermes/agent-hooks/auto-format.sh +payload="$(cat -)" +path=$(echo "$payload" | jq -r '.tool_input.path // empty') +[[ "$path" == *.py ]] && command -v black >/dev/null && black "$path" 2>/dev/null +printf '{}\n' +``` + +The agent's in-context view of the file is **not** re-read automatically — the reformat only affects the file on disk. Subsequent `read_file` calls pick up the formatted version. + +#### 2. Block destructive `terminal` commands + +```yaml +hooks: + pre_tool_call: + - matcher: "terminal" + command: "~/.hermes/agent-hooks/block-rm-rf.sh" + timeout: 5 +``` + +```bash +#!/usr/bin/env bash +# ~/.hermes/agent-hooks/block-rm-rf.sh +payload="$(cat -)" +cmd=$(echo "$payload" | jq -r '.tool_input.command // empty') +if echo "$cmd" | grep -qE 'rm[[:space:]]+-rf?[[:space:]]+/'; then + printf '{"decision": "block", "reason": "blocked: rm -rf / is not permitted"}\n' +else + printf '{}\n' +fi +``` + +#### 3. Inject `git status` into every turn (Claude-Code `UserPromptSubmit` equivalent) + +```yaml +hooks: + pre_llm_call: + - command: "~/.hermes/agent-hooks/inject-cwd-context.sh" +``` + +```bash +#!/usr/bin/env bash +# ~/.hermes/agent-hooks/inject-cwd-context.sh +cat - >/dev/null # discard stdin payload +if status=$(git status --porcelain 2>/dev/null) && [[ -n "$status" ]]; then + jq --null-input --arg s "$status" \ + '{context: ("Uncommitted changes in cwd:\n" + $s)}' +else + printf '{}\n' +fi +``` + +Claude Code's `UserPromptSubmit` event is intentionally not a separate Hermes event — `pre_llm_call` fires at the same place and already supports context injection. Use it here. + +#### 4. Log every subagent completion + +```yaml +hooks: + subagent_stop: + - command: "~/.hermes/agent-hooks/log-orchestration.sh" +``` + +```bash +#!/usr/bin/env bash +# ~/.hermes/agent-hooks/log-orchestration.sh +log=~/.hermes/logs/orchestration.log +jq -c '{ts: now, parent: .session_id, extra: .extra}' < /dev/stdin >> "$log" +printf '{}\n' +``` + +### Consent model + +Each unique `(event, command)` pair prompts the user for approval the first time Hermes sees it, then persists the decision to `~/.hermes/shell-hooks-allowlist.json`. Subsequent runs (CLI or gateway) skip the prompt. + +Three escape hatches bypass the interactive prompt — any one is sufficient: + +1. `--accept-hooks` flag on the CLI (e.g. `hermes --accept-hooks chat`) +2. `HERMES_ACCEPT_HOOKS=1` environment variable +3. `hooks_auto_accept: true` in `cli-config.yaml` + +Non-TTY runs (gateway, cron, CI) need one of these three — otherwise any newly-added hook silently stays un-registered and logs a warning. + +**Script edits are silently trusted.** The allowlist keys on the exact command string, not the script's hash, so editing the script on disk does not invalidate consent. `hermes hooks doctor` flags mtime drift so you can spot edits and decide whether to re-approve. + +### The `hermes hooks` CLI + +| Command | What it does | +|---------|--------------| +| `hermes hooks list` | Dump configured hooks with matcher, timeout, and consent status | +| `hermes hooks test [--for-tool X] [--payload-file F]` | Fire every matching hook against a synthetic payload and print the parsed response | +| `hermes hooks revoke ` | Remove every allowlist entry matching `` (takes effect on next restart) | +| `hermes hooks doctor` | For every configured hook: check exec bit, allowlist status, mtime drift, JSON output validity, and rough execution time | + +### Security + +Shell hooks run with **your full user credentials** — same trust boundary as a cron entry or a shell alias. Treat the `hooks:` block in `config.yaml` as privileged configuration: + +- Only reference scripts you wrote or fully reviewed. +- Keep scripts inside `~/.hermes/agent-hooks/` so the path is easy to audit. +- Re-run `hermes hooks doctor` after you pull a shared config to spot newly-added hooks before they register. +- If your config.yaml is version-controlled across a team, review PRs that change the `hooks:` section the same way you'd review CI config. + +### Ordering and precedence + +Both Python plugin hooks and shell hooks flow through the same `invoke_hook()` dispatcher. Python plugins are registered first (`discover_and_load()`), shell hooks second (`register_from_config()`), so Python `pre_tool_call` block decisions take precedence in tie cases. The first valid block wins — the aggregator returns as soon as any callback produces `{"action": "block", "message": str}` with a non-empty message.