#!/usr/bin/env python3 """Write-approval gate + pending store for memory and skill writes. Background ---------- The agent writes to two persistent stores that survive across sessions: * **memory** — MEMORY.md / USER.md, small (~200 char) declarative entries * **skills** — SKILL.md + supporting files, potentially huge (10-100 KB) Both stores are written from two origins: * **foreground** — a normal agent turn (user is present / chatting) * **background_review** — the self-improvement review fork that runs after a turn and autonomously decides what to save (the source of the "wrong assumptions" users complained about) This module lets the user gate those writes per-subsystem with a tri-state ``write_mode``: * ``on`` — write freely (current behaviour, default) * ``off`` — never write; the tool returns a clean "disabled" result * ``approve`` — do not commit the write; **stage** it to a pending store and surface it for the user to approve or reject out-of-band The size asymmetry between memory and skills is real and unavoidable: a memory entry can be reviewed inline in a chat bubble; a 100 KB SKILL.md cannot. So ``approve`` mode stages BOTH to disk, but review affordances differ by subsystem (see ``hermes_cli`` slash handlers): memory shows full content, skills show metadata + a one-line gist + a ``diff`` escape hatch (CLI/dashboard/file). Staging is mandatory for background-origin writes under ``approve`` (a daemon thread cannot block on an interactive prompt). Foreground memory writes may additionally block inline via the dangerous-command approval gate; foreground skill writes always stage (too big to eyeball mid-loop). Pending records live under ``/pending/{memory,skills}/.json`` so they survive process restarts and can be reviewed from CLI, gateway, or the web dashboard. """ from __future__ import annotations import json import logging import os import time import uuid from pathlib import Path from typing import Any, Dict, List, Optional from hermes_constants import get_hermes_home logger = logging.getLogger(__name__) # Subsystem identifiers MEMORY = "memory" SKILLS = "skills" _SUBSYSTEMS = (MEMORY, SKILLS) # Tri-state write modes MODE_ON = "on" MODE_OFF = "off" MODE_APPROVE = "approve" _VALID_MODES = (MODE_ON, MODE_OFF, MODE_APPROVE) # --------------------------------------------------------------------------- # Config resolution # --------------------------------------------------------------------------- def get_write_mode(subsystem: str) -> str: """Return the configured write_mode for ``subsystem`` (memory|skills). Reads ``.write_mode`` from config.yaml. Falls back to ``on`` (current behaviour) for any unset / invalid value so existing installs are unaffected until the user opts in. """ if subsystem not in _SUBSYSTEMS: return MODE_ON try: from hermes_cli.config import load_config, cfg_get cfg = load_config() raw = cfg_get(cfg, subsystem, "write_mode", default=MODE_ON) except Exception: return MODE_ON return _normalize_mode(raw) def _normalize_mode(value: Any) -> str: """Coerce a config value to a valid mode string. YAML 1.1 parses bare ``off`` / ``on`` as booleans, so handle bools the way the approval-mode normalizer does. """ if isinstance(value, bool): return MODE_OFF if value is False else MODE_ON if isinstance(value, str): v = value.strip().lower() if v in _VALID_MODES: return v return MODE_ON # --------------------------------------------------------------------------- # Pending store (file-backed) # --------------------------------------------------------------------------- def _pending_dir(subsystem: str) -> Path: return get_hermes_home() / "pending" / subsystem def stage_write(subsystem: str, payload: Dict[str, Any], *, summary: str, origin: str) -> Dict[str, Any]: """Persist a pending write and return a short record describing it. Args: subsystem: ``memory`` or ``skills``. payload: the exact kwargs needed to replay the write when approved (e.g. ``{"action": "add", "target": "user", "content": "..."}`` for memory, or the full ``skill_manage`` kwargs for skills). summary: a one-line human-readable description shown in pending lists. For skills this is the LLM/heuristic gist; for memory it can be the entry text itself. origin: ``foreground`` or ``background_review`` — recorded for audit. Returns a dict with ``id`` and metadata. Best-effort: on disk failure it logs and still returns a record (the write is simply lost, which is the safe failure for an approval gate — nothing is silently committed). """ pid = uuid.uuid4().hex[:8] record = { "id": pid, "subsystem": subsystem, "action": payload.get("action", ""), "summary": (summary or "").strip(), "origin": origin or "foreground", "created_at": time.time(), "payload": payload, } try: d = _pending_dir(subsystem) d.mkdir(parents=True, exist_ok=True) path = d / f"{pid}.json" tmp = path.with_suffix(".json.tmp") tmp.write_text(json.dumps(record, ensure_ascii=False, indent=2), encoding="utf-8") os.replace(tmp, path) except Exception as e: # pragma: no cover - disk failure path logger.error("Failed to stage pending %s write: %s", subsystem, e, exc_info=True) return record def list_pending(subsystem: str) -> List[Dict[str, Any]]: """Return all pending records for ``subsystem``, oldest first.""" d = _pending_dir(subsystem) if not d.exists(): return [] records: List[Dict[str, Any]] = [] for p in d.glob("*.json"): try: records.append(json.loads(p.read_text(encoding="utf-8"))) except Exception: logger.warning("Skipping unreadable pending record: %s", p) records.sort(key=lambda r: r.get("created_at", 0)) return records def get_pending(subsystem: str, pending_id: str) -> Optional[Dict[str, Any]]: """Return a single pending record by id, or None.""" path = _pending_dir(subsystem) / f"{pending_id}.json" if not path.exists(): return None try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return None def discard_pending(subsystem: str, pending_id: str) -> bool: """Delete a pending record. Returns True if it existed.""" path = _pending_dir(subsystem) / f"{pending_id}.json" try: if path.exists(): path.unlink() return True except Exception as e: # pragma: no cover logger.error("Failed to discard pending %s/%s: %s", subsystem, pending_id, e) return False def pending_count(subsystem: str) -> int: """Cheap count of pending records (for notification badges).""" d = _pending_dir(subsystem) if not d.exists(): return 0 try: return sum(1 for _ in d.glob("*.json")) except Exception: return 0 # --------------------------------------------------------------------------- # Write origin # --------------------------------------------------------------------------- def current_origin() -> str: """Return the active write origin: ``foreground`` or ``background_review``. Reuses the skill-provenance ContextVar, which the background review fork already sets (see ``agent.background_review`` / ``AIAgent._spawn_background_review``). Foreground agent turns leave it at the default ``foreground``. """ try: from tools.skill_provenance import get_current_write_origin return get_current_write_origin() except Exception: return "foreground" def is_background() -> bool: return current_origin() == "background_review" # --------------------------------------------------------------------------- # Gate decision # --------------------------------------------------------------------------- class GateDecision: """Result of evaluating the write gate for a single write attempt. Exactly one of the boolean flags is True: * ``allow`` — proceed with the real write (mode ``on``, or an inline approval was granted). * ``blocked`` — refuse the write (mode ``off``, or an inline approval was denied). ``message`` explains why; surface it to the agent. * ``stage`` — do not write; the caller should stage the payload via ``stage_write`` (mode ``approve`` for a background write, or a foreground write with no interactive prompt available). ``message`` is the user-facing "staged for approval" note. """ __slots__ = ("allow", "blocked", "stage", "message") def __init__(self, *, allow=False, blocked=False, stage=False, message=""): self.allow = allow self.blocked = blocked self.stage = stage self.message = message def evaluate_gate(subsystem: str, *, inline_summary: str = "", inline_detail: str = "") -> GateDecision: """Decide what to do with a pending write for ``subsystem``. Args: subsystem: ``memory`` or ``skills``. inline_summary: short description used as the inline approval prompt header (memory foreground path only). inline_detail: full content shown in the inline prompt (memory entries are small; skills never take the inline path). Mode matrix: on → allow off → blocked approve → memory + foreground → inline approve/deny prompt memory + background → stage skills (any origin) → stage (too big to review inline) """ mode = get_write_mode(subsystem) if mode == MODE_ON: return GateDecision(allow=True) if mode == MODE_OFF: return GateDecision( blocked=True, message=( f"{subsystem.capitalize()} writes are disabled " f"({subsystem}.write_mode = off). The change was not saved. " f"Set {subsystem}.write_mode to 'on' or 'approve' to allow writes." ), ) # mode == approve background = is_background() # Skills always stage — a SKILL.md is too large to review inline, and a # background skill write happens in a daemon thread with no user present. if subsystem == SKILLS or background: where = "/skills pending" if subsystem == SKILLS else "/memory pending" return GateDecision( stage=True, message=( f"Staged for approval ({subsystem}.write_mode = approve). " f"Not yet saved — review with {where}." ), ) # Memory + foreground: if an interactive approval channel exists (CLI # prompt_toolkit callback, or a gateway approve/deny round-trip), prompt # inline — entries are small enough to show in full. Otherwise (script, # batch, no listener) stage instead of forcing a blind deny. if _interactive_approval_available(): granted = _prompt_inline_memory_approval(inline_summary, inline_detail) if granted is True: return GateDecision(allow=True) if granted is False: return GateDecision( blocked=True, message="Memory write denied by user. The change was not saved.", ) # granted is None → prompt failed; fall through to staging. return GateDecision( stage=True, message=( "Staged for approval (memory.write_mode = approve). " "Not yet saved — review with /memory pending." ), ) def _interactive_approval_available() -> bool: """True when a foreground memory write can be approved inline. Either a per-thread approval callback is registered (interactive CLI), or the call is inside a gateway/API session that supports the /approve //deny round-trip. Scripts, cron, and background threads have neither → stage. """ try: from tools.terminal_tool import _get_approval_callback if _get_approval_callback() is not None: return True except Exception: pass try: from tools.approval import _is_gateway_approval_context return bool(_is_gateway_approval_context()) except Exception: return False def _prompt_inline_memory_approval(summary: str, detail: str) -> Optional[bool]: """Prompt the user inline to approve a memory write. Returns True (approved), False (denied), or None (no interactive prompt available on this thread → caller should stage instead). Reuses the dangerous-command approval machinery so the CLI prompt_toolkit callback and the gateway ``/approve`` ``/deny`` round-trip both work without duplicating that plumbing. """ try: from tools.approval import prompt_dangerous_approval except Exception: return None header = summary.strip() or "Save to memory?" body = detail.strip() description = f"Save to memory: {header}" command = body if body else header try: choice = prompt_dangerous_approval( command, description, allow_permanent=False, ) except Exception as e: # pragma: no cover logger.error("Inline memory approval prompt failed: %s", e) return None if choice in {"once", "session"}: return True if choice == "deny": return False # Any other outcome (e.g. timeout that returns "deny" already handled) → # treat unknown as no-decision so we stage rather than silently drop. return None # --------------------------------------------------------------------------- # Skill-specific helpers (gist + diff for the review affordances) # --------------------------------------------------------------------------- def skill_gist(action: str, name: str, *, content: str = "", file_path: str = "", old_string: str = "", new_string: str = "") -> str: """Build a one-line human gist for a pending skill write. Heuristic, no model call — the gist surfaces enough to decide approve/reject in a chat bubble, while the full diff stays behind /skills diff (CLI/ dashboard/file). For create/edit it pulls the frontmatter ``description:``; for patch/write_file it describes the size of the change. """ if action in {"create", "edit"} and content: desc = _frontmatter_description(content) size = f"{len(content) // 1024 + 1} KB" if len(content) >= 1024 else f"{len(content)} chars" verb = "create" if action == "create" else "rewrite" if desc: return f"{verb} '{name}' — {desc} ({size})" return f"{verb} '{name}' ({size})" if action == "patch": target = file_path or "SKILL.md" removed = old_string.count("\n") + 1 if old_string else 0 added = new_string.count("\n") + 1 if new_string else 0 return f"patch '{name}' {target} (+{added}/-{removed} lines)" if action == "write_file": return f"write {file_path} in '{name}'" if action == "remove_file": return f"remove {file_path} from '{name}'" if action == "delete": return f"delete skill '{name}'" return f"{action} '{name}'" def _frontmatter_description(content: str) -> str: """Extract the ``description:`` value from SKILL.md YAML frontmatter.""" import re m = re.search(r"^description:\s*(.+)$", content, re.MULTILINE) if not m: return "" desc = m.group(1).strip().strip("'\"") return desc[:140] def skill_pending_diff(record: Dict[str, Any]) -> str: """Build a full unified diff (or full content) for a staged skill write. Used by /skills diff on a surface that can render it (CLI pager, web dashboard, or by opening the pending JSON file). For create this is the new file content; for edit/patch it is a unified diff against the current on-disk skill. """ import difflib payload = record.get("payload", {}) action = payload.get("action", "") name = payload.get("name", "") if action == "create": return (payload.get("content") or "") # Resolve current on-disk content for diffable actions. try: from tools.skill_manager_tool import _find_skill except Exception: _find_skill = None # type: ignore current = "" target_label = "SKILL.md" if _find_skill is not None: found = _find_skill(name) if found: base = found["path"] if action == "edit": p = base / "SKILL.md" elif action in {"patch", "write_file"}: rel = payload.get("file_path") or "SKILL.md" p = base / rel target_label = rel else: p = base / "SKILL.md" try: if p.exists(): current = p.read_text(encoding="utf-8") except Exception: current = "" if action == "edit": new = payload.get("content") or "" elif action == "patch": old_s = payload.get("old_string") or "" new_s = payload.get("new_string") or "" new = current.replace(old_s, new_s) if current else f"(patch {old_s!r} → {new_s!r})" elif action == "write_file": new = payload.get("file_content") or "" elif action == "remove_file": return f"remove file: {payload.get('file_path')} from skill '{name}'" elif action == "delete": return f"delete skill '{name}'" else: return f"({action} on '{name}')" diff = difflib.unified_diff( current.splitlines(keepends=True), new.splitlines(keepends=True), fromfile=f"a/{target_label}", tofile=f"b/{target_label}", ) text = "".join(diff) return text or "(no textual change)"