hermes-agent/tools/write_approval.py
Teknium 095f526b11
refactor(memory,skills): replace tri-state write_mode with boolean write_approval (default off) (#43354)
The shipped tri-state write_mode (on|off|approve) conflated two concepts —
whether writes are enabled and whether they're gated — so 'on' (writes flow
freely, gate inactive) read like 'gating is on'. Replace it with a single
clear boolean gate that defaults off.

  memory.write_approval / skills.write_approval:
    false (default) — write freely; the approval gate is off (pre-gate behaviour)
    true            — require approval: memory foreground prompts inline, memory
                      background-review + all skill writes stage for review

The old 'off = block all writes' mode is dropped; memory_enabled: false already
disables memory entirely, so a third 'block' state was redundant.

- tools/write_approval.py: get_write_mode/MODE_* → write_approval_enabled() bool;
  evaluate_gate() loses the config-driven 'blocked' path (blocked now only comes
  from an interactive user denial).
- tools/memory_tool.py, tools/skill_manager_tool.py: comment + behaviour follow.
- hermes_cli/config.py: memory/skills write_mode → write_approval (False);
  _config_version 28→29 with a 28→29 migration that renames any persisted
  write_mode (approve→true, on/off/unset→false) and drops the old key.
- slash commands: '/memory|/skills mode <on|off|approve>' → 'approval <on|off>'
  ('mode' kept as a back-compat alias); set_mode_fn callback now takes a bool.
- write_approval_commands.py, cli_commands_mixin.py, gateway/slash_commands.py,
  commands.py: handlers + registry args/subcommands updated.
- docs + tests rewritten for the boolean model; added migration tests.
2026-06-09 23:21:14 -07:00

481 lines
18 KiB
Python

#!/usr/bin/env python3
"""Write-approval gate + pending store for memory and skill writes.
Background
----------
The agent writes to two persistent stores that survive across sessions:
* **memory** — MEMORY.md / USER.md, small (~200 char) declarative entries
* **skills** — SKILL.md + supporting files, potentially huge (10-100 KB)
Both stores are written from two origins:
* **foreground** — a normal agent turn (user is present / chatting)
* **background_review** — the self-improvement review fork that runs after a
turn and autonomously decides what to save (the source of the
"wrong assumptions" users complained about)
This module lets the user gate those writes per-subsystem with a tri-state
``write_mode``:
* ``on`` — write freely (current behaviour, default)
* ``off`` — never write; the tool returns a clean "disabled" result
* ``approve`` — do not commit the write; **stage** it to a pending store and
surface it for the user to approve or reject out-of-band
The size asymmetry between memory and skills is real and unavoidable: a memory
entry can be reviewed inline in a chat bubble; a 100 KB SKILL.md cannot. So
``approve`` mode stages BOTH to disk, but review affordances differ by subsystem
(see ``hermes_cli`` slash handlers): memory shows full content, skills show
metadata + a one-line gist + a ``diff`` escape hatch (CLI/dashboard/file).
Staging is mandatory for background-origin writes under ``approve`` (a daemon
thread cannot block on an interactive prompt). Foreground memory writes may
additionally block inline via the dangerous-command approval gate; foreground
skill writes always stage (too big to eyeball mid-loop).
Pending records live under ``<HERMES_HOME>/pending/{memory,skills}/<id>.json``
so they survive process restarts and can be reviewed from CLI, gateway, or the
web dashboard.
"""
from __future__ import annotations
import json
import logging
import os
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# Subsystem identifiers
MEMORY = "memory"
SKILLS = "skills"
_SUBSYSTEMS = (MEMORY, SKILLS)
# Config key (per subsystem). A single boolean: the approval gate is OFF by
# default (writes flow freely, the pre-gate behaviour), and ON means stage /
# prompt every write for the user's approval. There is intentionally no third
# "block all writes" state — to disable a subsystem entirely use its own
# enable flag (e.g. ``memory.memory_enabled: false``).
CONFIG_KEY = "write_approval"
# ---------------------------------------------------------------------------
# Config resolution
# ---------------------------------------------------------------------------
def write_approval_enabled(subsystem: str) -> bool:
"""Return whether the approval gate is enabled for ``subsystem``.
Reads ``<subsystem>.write_approval`` from config.yaml. Defaults to
``False`` (gate off — writes flow freely) for any unset / invalid value so
existing installs keep their current behaviour until the user opts in.
"""
if subsystem not in _SUBSYSTEMS:
return False
try:
from hermes_cli.config import load_config, cfg_get
cfg = load_config()
raw = cfg_get(cfg, subsystem, CONFIG_KEY, default=False)
except Exception:
return False
return _normalize_enabled(raw)
def _normalize_enabled(value: Any) -> bool:
"""Coerce a config value to a bool. Default (unknown) is False (gate off).
Accepts real bools and the usual truthy/falsey strings. YAML 1.1 parses
bare ``on``/``off``/``yes``/``no`` as bools already, so the string branch
is mostly for hand-edited configs.
"""
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in {"on", "true", "yes", "1", "approve", "enabled"}
return False
# ---------------------------------------------------------------------------
# Pending store (file-backed)
# ---------------------------------------------------------------------------
def _pending_dir(subsystem: str) -> Path:
return get_hermes_home() / "pending" / subsystem
def stage_write(subsystem: str, payload: Dict[str, Any],
*, summary: str, origin: str) -> Dict[str, Any]:
"""Persist a pending write and return a short record describing it.
Args:
subsystem: ``memory`` or ``skills``.
payload: the exact kwargs needed to replay the write when approved
(e.g. ``{"action": "add", "target": "user", "content": "..."}``
for memory, or the full ``skill_manage`` kwargs for skills).
summary: a one-line human-readable description shown in pending lists.
For skills this is the LLM/heuristic gist; for memory it can be the
entry text itself.
origin: ``foreground`` or ``background_review`` — recorded for audit.
Returns a dict with ``id`` and metadata. Best-effort: on disk failure it
logs and still returns a record (the write is simply lost, which is the
safe failure for an approval gate — nothing is silently committed).
"""
pid = uuid.uuid4().hex[:8]
record = {
"id": pid,
"subsystem": subsystem,
"action": payload.get("action", ""),
"summary": (summary or "").strip(),
"origin": origin or "foreground",
"created_at": time.time(),
"payload": payload,
}
try:
d = _pending_dir(subsystem)
d.mkdir(parents=True, exist_ok=True)
path = d / f"{pid}.json"
tmp = path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(record, ensure_ascii=False, indent=2), encoding="utf-8")
os.replace(tmp, path)
except Exception as e: # pragma: no cover - disk failure path
logger.error("Failed to stage pending %s write: %s", subsystem, e, exc_info=True)
return record
def list_pending(subsystem: str) -> List[Dict[str, Any]]:
"""Return all pending records for ``subsystem``, oldest first."""
d = _pending_dir(subsystem)
if not d.exists():
return []
records: List[Dict[str, Any]] = []
for p in d.glob("*.json"):
try:
records.append(json.loads(p.read_text(encoding="utf-8")))
except Exception:
logger.warning("Skipping unreadable pending record: %s", p)
records.sort(key=lambda r: r.get("created_at", 0))
return records
def get_pending(subsystem: str, pending_id: str) -> Optional[Dict[str, Any]]:
"""Return a single pending record by id, or None."""
path = _pending_dir(subsystem) / f"{pending_id}.json"
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
def discard_pending(subsystem: str, pending_id: str) -> bool:
"""Delete a pending record. Returns True if it existed."""
path = _pending_dir(subsystem) / f"{pending_id}.json"
try:
if path.exists():
path.unlink()
return True
except Exception as e: # pragma: no cover
logger.error("Failed to discard pending %s/%s: %s", subsystem, pending_id, e)
return False
def pending_count(subsystem: str) -> int:
"""Cheap count of pending records (for notification badges)."""
d = _pending_dir(subsystem)
if not d.exists():
return 0
try:
return sum(1 for _ in d.glob("*.json"))
except Exception:
return 0
# ---------------------------------------------------------------------------
# Write origin
# ---------------------------------------------------------------------------
def current_origin() -> str:
"""Return the active write origin: ``foreground`` or ``background_review``.
Reuses the skill-provenance ContextVar, which the background review fork
already sets (see ``agent.background_review`` /
``AIAgent._spawn_background_review``). Foreground agent turns leave it at
the default ``foreground``.
"""
try:
from tools.skill_provenance import get_current_write_origin
return get_current_write_origin()
except Exception:
return "foreground"
def is_background() -> bool:
return current_origin() == "background_review"
# ---------------------------------------------------------------------------
# Gate decision
# ---------------------------------------------------------------------------
class GateDecision:
"""Result of evaluating the write gate for a single write attempt.
Exactly one of the boolean flags is True:
* ``allow`` — proceed with the real write (mode ``on``, or an inline
approval was granted).
* ``blocked`` — refuse the write (mode ``off``, or an inline approval was
denied). ``message`` explains why; surface it to the agent.
* ``stage`` — do not write; the caller should stage the payload via
``stage_write`` (mode ``approve`` for a background write, or a
foreground write with no interactive prompt available). ``message`` is
the user-facing "staged for approval" note.
"""
__slots__ = ("allow", "blocked", "stage", "message")
def __init__(self, *, allow=False, blocked=False, stage=False, message=""):
self.allow = allow
self.blocked = blocked
self.stage = stage
self.message = message
def evaluate_gate(subsystem: str, *, inline_summary: str = "",
inline_detail: str = "") -> GateDecision:
"""Decide what to do with a pending write for ``subsystem``.
Args:
subsystem: ``memory`` or ``skills``.
inline_summary: short description used as the inline approval prompt
header (memory foreground path only).
inline_detail: full content shown in the inline prompt (memory entries
are small; skills never take the inline path).
Decision matrix:
gate off (default) → allow (writes flow freely)
gate on, memory + foreground → inline approve/deny prompt
gate on, memory + background → stage
gate on, skills (any origin) → stage (too big to review inline)
Note: there is no config-driven "blocked" outcome — the gate only ever
delays a write for approval, never silently refuses it. ``blocked`` is
still produced when the user *actively denies* an inline prompt.
"""
if not write_approval_enabled(subsystem):
return GateDecision(allow=True)
background = is_background()
# Skills always stage — a SKILL.md is too large to review inline, and a
# background skill write happens in a daemon thread with no user present.
if subsystem == SKILLS or background:
where = "/skills pending" if subsystem == SKILLS else "/memory pending"
return GateDecision(
stage=True,
message=(
f"Staged for approval ({subsystem}.write_approval is on). "
f"Not yet saved — review with {where}."
),
)
# Memory + foreground: if an interactive approval channel exists (CLI
# prompt_toolkit callback, or a gateway approve/deny round-trip), prompt
# inline — entries are small enough to show in full. Otherwise (script,
# batch, no listener) stage instead of forcing a blind deny.
if _interactive_approval_available():
granted = _prompt_inline_memory_approval(inline_summary, inline_detail)
if granted is True:
return GateDecision(allow=True)
if granted is False:
return GateDecision(
blocked=True,
message="Memory write denied by user. The change was not saved.",
)
# granted is None → prompt failed; fall through to staging.
return GateDecision(
stage=True,
message=(
"Staged for approval (memory.write_approval is on). "
"Not yet saved — review with /memory pending."
),
)
def _interactive_approval_available() -> bool:
"""True when a foreground memory write can be approved inline.
Either a per-thread approval callback is registered (interactive CLI), or
the call is inside a gateway/API session that supports the /approve //deny
round-trip. Scripts, cron, and background threads have neither → stage.
"""
try:
from tools.terminal_tool import _get_approval_callback
if _get_approval_callback() is not None:
return True
except Exception:
pass
try:
from tools.approval import _is_gateway_approval_context
return bool(_is_gateway_approval_context())
except Exception:
return False
def _prompt_inline_memory_approval(summary: str, detail: str) -> Optional[bool]:
"""Prompt the user inline to approve a memory write.
Returns True (approved), False (denied), or None (no interactive prompt
available on this thread → caller should stage instead).
Reuses the dangerous-command approval machinery so the CLI prompt_toolkit
callback and the gateway ``/approve`` ``/deny`` round-trip both work without
duplicating that plumbing.
"""
try:
from tools.approval import prompt_dangerous_approval
except Exception:
return None
header = summary.strip() or "Save to memory?"
body = detail.strip()
description = f"Save to memory: {header}"
command = body if body else header
try:
choice = prompt_dangerous_approval(
command,
description,
allow_permanent=False,
)
except Exception as e: # pragma: no cover
logger.error("Inline memory approval prompt failed: %s", e)
return None
if choice in {"once", "session"}:
return True
if choice == "deny":
return False
# Any other outcome (e.g. timeout that returns "deny" already handled) →
# treat unknown as no-decision so we stage rather than silently drop.
return None
# ---------------------------------------------------------------------------
# Skill-specific helpers (gist + diff for the review affordances)
# ---------------------------------------------------------------------------
def skill_gist(action: str, name: str, *, content: str = "",
file_path: str = "", old_string: str = "",
new_string: str = "") -> str:
"""Build a one-line human gist for a pending skill write.
Heuristic, no model call — the gist surfaces enough to decide approve/reject
in a chat bubble, while the full diff stays behind /skills diff (CLI/
dashboard/file). For create/edit it pulls the frontmatter ``description:``;
for patch/write_file it describes the size of the change.
"""
if action in {"create", "edit"} and content:
desc = _frontmatter_description(content)
size = f"{len(content) // 1024 + 1} KB" if len(content) >= 1024 else f"{len(content)} chars"
verb = "create" if action == "create" else "rewrite"
if desc:
return f"{verb} '{name}'{desc} ({size})"
return f"{verb} '{name}' ({size})"
if action == "patch":
target = file_path or "SKILL.md"
removed = old_string.count("\n") + 1 if old_string else 0
added = new_string.count("\n") + 1 if new_string else 0
return f"patch '{name}' {target} (+{added}/-{removed} lines)"
if action == "write_file":
return f"write {file_path} in '{name}'"
if action == "remove_file":
return f"remove {file_path} from '{name}'"
if action == "delete":
return f"delete skill '{name}'"
return f"{action} '{name}'"
def _frontmatter_description(content: str) -> str:
"""Extract the ``description:`` value from SKILL.md YAML frontmatter."""
import re
m = re.search(r"^description:\s*(.+)$", content, re.MULTILINE)
if not m:
return ""
desc = m.group(1).strip().strip("'\"")
return desc[:140]
def skill_pending_diff(record: Dict[str, Any]) -> str:
"""Build a full unified diff (or full content) for a staged skill write.
Used by /skills diff <id> on a surface that can render it (CLI pager, web
dashboard, or by opening the pending JSON file). For create this is the new
file content; for edit/patch it is a unified diff against the current
on-disk skill.
"""
import difflib
payload = record.get("payload", {})
action = payload.get("action", "")
name = payload.get("name", "")
if action == "create":
return (payload.get("content") or "")
# Resolve current on-disk content for diffable actions.
try:
from tools.skill_manager_tool import _find_skill
except Exception:
_find_skill = None # type: ignore
current = ""
target_label = "SKILL.md"
if _find_skill is not None:
found = _find_skill(name)
if found:
base = found["path"]
if action == "edit":
p = base / "SKILL.md"
elif action in {"patch", "write_file"}:
rel = payload.get("file_path") or "SKILL.md"
p = base / rel
target_label = rel
else:
p = base / "SKILL.md"
try:
if p.exists():
current = p.read_text(encoding="utf-8")
except Exception:
current = ""
if action == "edit":
new = payload.get("content") or ""
elif action == "patch":
old_s = payload.get("old_string") or ""
new_s = payload.get("new_string") or ""
new = current.replace(old_s, new_s) if current else f"(patch {old_s!r}{new_s!r})"
elif action == "write_file":
new = payload.get("file_content") or ""
elif action == "remove_file":
return f"remove file: {payload.get('file_path')} from skill '{name}'"
elif action == "delete":
return f"delete skill '{name}'"
else:
return f"({action} on '{name}')"
diff = difflib.unified_diff(
current.splitlines(keepends=True),
new.splitlines(keepends=True),
fromfile=f"a/{target_label}",
tofile=f"b/{target_label}",
)
text = "".join(diff)
return text or "(no textual change)"