mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-10 08:32:09 +00:00
The shipped tri-state write_mode (on|off|approve) conflated two concepts —
whether writes are enabled and whether they're gated — so 'on' (writes flow
freely, gate inactive) read like 'gating is on'. Replace it with a single
clear boolean gate that defaults off.
memory.write_approval / skills.write_approval:
false (default) — write freely; the approval gate is off (pre-gate behaviour)
true — require approval: memory foreground prompts inline, memory
background-review + all skill writes stage for review
The old 'off = block all writes' mode is dropped; memory_enabled: false already
disables memory entirely, so a third 'block' state was redundant.
- tools/write_approval.py: get_write_mode/MODE_* → write_approval_enabled() bool;
evaluate_gate() loses the config-driven 'blocked' path (blocked now only comes
from an interactive user denial).
- tools/memory_tool.py, tools/skill_manager_tool.py: comment + behaviour follow.
- hermes_cli/config.py: memory/skills write_mode → write_approval (False);
_config_version 28→29 with a 28→29 migration that renames any persisted
write_mode (approve→true, on/off/unset→false) and drops the old key.
- slash commands: '/memory|/skills mode <on|off|approve>' → 'approval <on|off>'
('mode' kept as a back-compat alias); set_mode_fn callback now takes a bool.
- write_approval_commands.py, cli_commands_mixin.py, gateway/slash_commands.py,
commands.py: handlers + registry args/subcommands updated.
- docs + tests rewritten for the boolean model; added migration tests.
481 lines
18 KiB
Python
481 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""Write-approval gate + pending store for memory and skill writes.
|
|
|
|
Background
|
|
----------
|
|
The agent writes to two persistent stores that survive across sessions:
|
|
|
|
* **memory** — MEMORY.md / USER.md, small (~200 char) declarative entries
|
|
* **skills** — SKILL.md + supporting files, potentially huge (10-100 KB)
|
|
|
|
Both stores are written from two origins:
|
|
|
|
* **foreground** — a normal agent turn (user is present / chatting)
|
|
* **background_review** — the self-improvement review fork that runs after a
|
|
turn and autonomously decides what to save (the source of the
|
|
"wrong assumptions" users complained about)
|
|
|
|
This module lets the user gate those writes per-subsystem with a tri-state
|
|
``write_mode``:
|
|
|
|
* ``on`` — write freely (current behaviour, default)
|
|
* ``off`` — never write; the tool returns a clean "disabled" result
|
|
* ``approve`` — do not commit the write; **stage** it to a pending store and
|
|
surface it for the user to approve or reject out-of-band
|
|
|
|
The size asymmetry between memory and skills is real and unavoidable: a memory
|
|
entry can be reviewed inline in a chat bubble; a 100 KB SKILL.md cannot. So
|
|
``approve`` mode stages BOTH to disk, but review affordances differ by subsystem
|
|
(see ``hermes_cli`` slash handlers): memory shows full content, skills show
|
|
metadata + a one-line gist + a ``diff`` escape hatch (CLI/dashboard/file).
|
|
|
|
Staging is mandatory for background-origin writes under ``approve`` (a daemon
|
|
thread cannot block on an interactive prompt). Foreground memory writes may
|
|
additionally block inline via the dangerous-command approval gate; foreground
|
|
skill writes always stage (too big to eyeball mid-loop).
|
|
|
|
Pending records live under ``<HERMES_HOME>/pending/{memory,skills}/<id>.json``
|
|
so they survive process restarts and can be reviewed from CLI, gateway, or the
|
|
web dashboard.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from hermes_constants import get_hermes_home
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Subsystem identifiers
|
|
MEMORY = "memory"
|
|
SKILLS = "skills"
|
|
_SUBSYSTEMS = (MEMORY, SKILLS)
|
|
|
|
# Config key (per subsystem). A single boolean: the approval gate is OFF by
|
|
# default (writes flow freely, the pre-gate behaviour), and ON means stage /
|
|
# prompt every write for the user's approval. There is intentionally no third
|
|
# "block all writes" state — to disable a subsystem entirely use its own
|
|
# enable flag (e.g. ``memory.memory_enabled: false``).
|
|
CONFIG_KEY = "write_approval"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config resolution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def write_approval_enabled(subsystem: str) -> bool:
|
|
"""Return whether the approval gate is enabled for ``subsystem``.
|
|
|
|
Reads ``<subsystem>.write_approval`` from config.yaml. Defaults to
|
|
``False`` (gate off — writes flow freely) for any unset / invalid value so
|
|
existing installs keep their current behaviour until the user opts in.
|
|
"""
|
|
if subsystem not in _SUBSYSTEMS:
|
|
return False
|
|
try:
|
|
from hermes_cli.config import load_config, cfg_get
|
|
cfg = load_config()
|
|
raw = cfg_get(cfg, subsystem, CONFIG_KEY, default=False)
|
|
except Exception:
|
|
return False
|
|
return _normalize_enabled(raw)
|
|
|
|
|
|
def _normalize_enabled(value: Any) -> bool:
|
|
"""Coerce a config value to a bool. Default (unknown) is False (gate off).
|
|
|
|
Accepts real bools and the usual truthy/falsey strings. YAML 1.1 parses
|
|
bare ``on``/``off``/``yes``/``no`` as bools already, so the string branch
|
|
is mostly for hand-edited configs.
|
|
"""
|
|
if isinstance(value, bool):
|
|
return value
|
|
if isinstance(value, str):
|
|
return value.strip().lower() in {"on", "true", "yes", "1", "approve", "enabled"}
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pending store (file-backed)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _pending_dir(subsystem: str) -> Path:
|
|
return get_hermes_home() / "pending" / subsystem
|
|
|
|
|
|
def stage_write(subsystem: str, payload: Dict[str, Any],
|
|
*, summary: str, origin: str) -> Dict[str, Any]:
|
|
"""Persist a pending write and return a short record describing it.
|
|
|
|
Args:
|
|
subsystem: ``memory`` or ``skills``.
|
|
payload: the exact kwargs needed to replay the write when approved
|
|
(e.g. ``{"action": "add", "target": "user", "content": "..."}``
|
|
for memory, or the full ``skill_manage`` kwargs for skills).
|
|
summary: a one-line human-readable description shown in pending lists.
|
|
For skills this is the LLM/heuristic gist; for memory it can be the
|
|
entry text itself.
|
|
origin: ``foreground`` or ``background_review`` — recorded for audit.
|
|
|
|
Returns a dict with ``id`` and metadata. Best-effort: on disk failure it
|
|
logs and still returns a record (the write is simply lost, which is the
|
|
safe failure for an approval gate — nothing is silently committed).
|
|
"""
|
|
pid = uuid.uuid4().hex[:8]
|
|
record = {
|
|
"id": pid,
|
|
"subsystem": subsystem,
|
|
"action": payload.get("action", ""),
|
|
"summary": (summary or "").strip(),
|
|
"origin": origin or "foreground",
|
|
"created_at": time.time(),
|
|
"payload": payload,
|
|
}
|
|
try:
|
|
d = _pending_dir(subsystem)
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
path = d / f"{pid}.json"
|
|
tmp = path.with_suffix(".json.tmp")
|
|
tmp.write_text(json.dumps(record, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
os.replace(tmp, path)
|
|
except Exception as e: # pragma: no cover - disk failure path
|
|
logger.error("Failed to stage pending %s write: %s", subsystem, e, exc_info=True)
|
|
return record
|
|
|
|
|
|
def list_pending(subsystem: str) -> List[Dict[str, Any]]:
|
|
"""Return all pending records for ``subsystem``, oldest first."""
|
|
d = _pending_dir(subsystem)
|
|
if not d.exists():
|
|
return []
|
|
records: List[Dict[str, Any]] = []
|
|
for p in d.glob("*.json"):
|
|
try:
|
|
records.append(json.loads(p.read_text(encoding="utf-8")))
|
|
except Exception:
|
|
logger.warning("Skipping unreadable pending record: %s", p)
|
|
records.sort(key=lambda r: r.get("created_at", 0))
|
|
return records
|
|
|
|
|
|
def get_pending(subsystem: str, pending_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Return a single pending record by id, or None."""
|
|
path = _pending_dir(subsystem) / f"{pending_id}.json"
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def discard_pending(subsystem: str, pending_id: str) -> bool:
|
|
"""Delete a pending record. Returns True if it existed."""
|
|
path = _pending_dir(subsystem) / f"{pending_id}.json"
|
|
try:
|
|
if path.exists():
|
|
path.unlink()
|
|
return True
|
|
except Exception as e: # pragma: no cover
|
|
logger.error("Failed to discard pending %s/%s: %s", subsystem, pending_id, e)
|
|
return False
|
|
|
|
|
|
def pending_count(subsystem: str) -> int:
|
|
"""Cheap count of pending records (for notification badges)."""
|
|
d = _pending_dir(subsystem)
|
|
if not d.exists():
|
|
return 0
|
|
try:
|
|
return sum(1 for _ in d.glob("*.json"))
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Write origin
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def current_origin() -> str:
|
|
"""Return the active write origin: ``foreground`` or ``background_review``.
|
|
|
|
Reuses the skill-provenance ContextVar, which the background review fork
|
|
already sets (see ``agent.background_review`` /
|
|
``AIAgent._spawn_background_review``). Foreground agent turns leave it at
|
|
the default ``foreground``.
|
|
"""
|
|
try:
|
|
from tools.skill_provenance import get_current_write_origin
|
|
return get_current_write_origin()
|
|
except Exception:
|
|
return "foreground"
|
|
|
|
|
|
def is_background() -> bool:
|
|
return current_origin() == "background_review"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Gate decision
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class GateDecision:
|
|
"""Result of evaluating the write gate for a single write attempt.
|
|
|
|
Exactly one of the boolean flags is True:
|
|
* ``allow`` — proceed with the real write (mode ``on``, or an inline
|
|
approval was granted).
|
|
* ``blocked`` — refuse the write (mode ``off``, or an inline approval was
|
|
denied). ``message`` explains why; surface it to the agent.
|
|
* ``stage`` — do not write; the caller should stage the payload via
|
|
``stage_write`` (mode ``approve`` for a background write, or a
|
|
foreground write with no interactive prompt available). ``message`` is
|
|
the user-facing "staged for approval" note.
|
|
"""
|
|
|
|
__slots__ = ("allow", "blocked", "stage", "message")
|
|
|
|
def __init__(self, *, allow=False, blocked=False, stage=False, message=""):
|
|
self.allow = allow
|
|
self.blocked = blocked
|
|
self.stage = stage
|
|
self.message = message
|
|
|
|
|
|
def evaluate_gate(subsystem: str, *, inline_summary: str = "",
|
|
inline_detail: str = "") -> GateDecision:
|
|
"""Decide what to do with a pending write for ``subsystem``.
|
|
|
|
Args:
|
|
subsystem: ``memory`` or ``skills``.
|
|
inline_summary: short description used as the inline approval prompt
|
|
header (memory foreground path only).
|
|
inline_detail: full content shown in the inline prompt (memory entries
|
|
are small; skills never take the inline path).
|
|
|
|
Decision matrix:
|
|
gate off (default) → allow (writes flow freely)
|
|
gate on, memory + foreground → inline approve/deny prompt
|
|
gate on, memory + background → stage
|
|
gate on, skills (any origin) → stage (too big to review inline)
|
|
|
|
Note: there is no config-driven "blocked" outcome — the gate only ever
|
|
delays a write for approval, never silently refuses it. ``blocked`` is
|
|
still produced when the user *actively denies* an inline prompt.
|
|
"""
|
|
if not write_approval_enabled(subsystem):
|
|
return GateDecision(allow=True)
|
|
|
|
background = is_background()
|
|
|
|
# Skills always stage — a SKILL.md is too large to review inline, and a
|
|
# background skill write happens in a daemon thread with no user present.
|
|
if subsystem == SKILLS or background:
|
|
where = "/skills pending" if subsystem == SKILLS else "/memory pending"
|
|
return GateDecision(
|
|
stage=True,
|
|
message=(
|
|
f"Staged for approval ({subsystem}.write_approval is on). "
|
|
f"Not yet saved — review with {where}."
|
|
),
|
|
)
|
|
|
|
# Memory + foreground: if an interactive approval channel exists (CLI
|
|
# prompt_toolkit callback, or a gateway approve/deny round-trip), prompt
|
|
# inline — entries are small enough to show in full. Otherwise (script,
|
|
# batch, no listener) stage instead of forcing a blind deny.
|
|
if _interactive_approval_available():
|
|
granted = _prompt_inline_memory_approval(inline_summary, inline_detail)
|
|
if granted is True:
|
|
return GateDecision(allow=True)
|
|
if granted is False:
|
|
return GateDecision(
|
|
blocked=True,
|
|
message="Memory write denied by user. The change was not saved.",
|
|
)
|
|
# granted is None → prompt failed; fall through to staging.
|
|
|
|
return GateDecision(
|
|
stage=True,
|
|
message=(
|
|
"Staged for approval (memory.write_approval is on). "
|
|
"Not yet saved — review with /memory pending."
|
|
),
|
|
)
|
|
|
|
|
|
def _interactive_approval_available() -> bool:
|
|
"""True when a foreground memory write can be approved inline.
|
|
|
|
Either a per-thread approval callback is registered (interactive CLI), or
|
|
the call is inside a gateway/API session that supports the /approve //deny
|
|
round-trip. Scripts, cron, and background threads have neither → stage.
|
|
"""
|
|
try:
|
|
from tools.terminal_tool import _get_approval_callback
|
|
if _get_approval_callback() is not None:
|
|
return True
|
|
except Exception:
|
|
pass
|
|
try:
|
|
from tools.approval import _is_gateway_approval_context
|
|
return bool(_is_gateway_approval_context())
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _prompt_inline_memory_approval(summary: str, detail: str) -> Optional[bool]:
|
|
"""Prompt the user inline to approve a memory write.
|
|
|
|
Returns True (approved), False (denied), or None (no interactive prompt
|
|
available on this thread → caller should stage instead).
|
|
|
|
Reuses the dangerous-command approval machinery so the CLI prompt_toolkit
|
|
callback and the gateway ``/approve`` ``/deny`` round-trip both work without
|
|
duplicating that plumbing.
|
|
"""
|
|
try:
|
|
from tools.approval import prompt_dangerous_approval
|
|
except Exception:
|
|
return None
|
|
|
|
header = summary.strip() or "Save to memory?"
|
|
body = detail.strip()
|
|
description = f"Save to memory: {header}"
|
|
command = body if body else header
|
|
try:
|
|
choice = prompt_dangerous_approval(
|
|
command,
|
|
description,
|
|
allow_permanent=False,
|
|
)
|
|
except Exception as e: # pragma: no cover
|
|
logger.error("Inline memory approval prompt failed: %s", e)
|
|
return None
|
|
|
|
if choice in {"once", "session"}:
|
|
return True
|
|
if choice == "deny":
|
|
return False
|
|
# Any other outcome (e.g. timeout that returns "deny" already handled) →
|
|
# treat unknown as no-decision so we stage rather than silently drop.
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Skill-specific helpers (gist + diff for the review affordances)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def skill_gist(action: str, name: str, *, content: str = "",
|
|
file_path: str = "", old_string: str = "",
|
|
new_string: str = "") -> str:
|
|
"""Build a one-line human gist for a pending skill write.
|
|
|
|
Heuristic, no model call — the gist surfaces enough to decide approve/reject
|
|
in a chat bubble, while the full diff stays behind /skills diff (CLI/
|
|
dashboard/file). For create/edit it pulls the frontmatter ``description:``;
|
|
for patch/write_file it describes the size of the change.
|
|
"""
|
|
if action in {"create", "edit"} and content:
|
|
desc = _frontmatter_description(content)
|
|
size = f"{len(content) // 1024 + 1} KB" if len(content) >= 1024 else f"{len(content)} chars"
|
|
verb = "create" if action == "create" else "rewrite"
|
|
if desc:
|
|
return f"{verb} '{name}' — {desc} ({size})"
|
|
return f"{verb} '{name}' ({size})"
|
|
if action == "patch":
|
|
target = file_path or "SKILL.md"
|
|
removed = old_string.count("\n") + 1 if old_string else 0
|
|
added = new_string.count("\n") + 1 if new_string else 0
|
|
return f"patch '{name}' {target} (+{added}/-{removed} lines)"
|
|
if action == "write_file":
|
|
return f"write {file_path} in '{name}'"
|
|
if action == "remove_file":
|
|
return f"remove {file_path} from '{name}'"
|
|
if action == "delete":
|
|
return f"delete skill '{name}'"
|
|
return f"{action} '{name}'"
|
|
|
|
|
|
def _frontmatter_description(content: str) -> str:
|
|
"""Extract the ``description:`` value from SKILL.md YAML frontmatter."""
|
|
import re
|
|
m = re.search(r"^description:\s*(.+)$", content, re.MULTILINE)
|
|
if not m:
|
|
return ""
|
|
desc = m.group(1).strip().strip("'\"")
|
|
return desc[:140]
|
|
|
|
|
|
def skill_pending_diff(record: Dict[str, Any]) -> str:
|
|
"""Build a full unified diff (or full content) for a staged skill write.
|
|
|
|
Used by /skills diff <id> on a surface that can render it (CLI pager, web
|
|
dashboard, or by opening the pending JSON file). For create this is the new
|
|
file content; for edit/patch it is a unified diff against the current
|
|
on-disk skill.
|
|
"""
|
|
import difflib
|
|
payload = record.get("payload", {})
|
|
action = payload.get("action", "")
|
|
name = payload.get("name", "")
|
|
|
|
if action == "create":
|
|
return (payload.get("content") or "")
|
|
|
|
# Resolve current on-disk content for diffable actions.
|
|
try:
|
|
from tools.skill_manager_tool import _find_skill
|
|
except Exception:
|
|
_find_skill = None # type: ignore
|
|
|
|
current = ""
|
|
target_label = "SKILL.md"
|
|
if _find_skill is not None:
|
|
found = _find_skill(name)
|
|
if found:
|
|
base = found["path"]
|
|
if action == "edit":
|
|
p = base / "SKILL.md"
|
|
elif action in {"patch", "write_file"}:
|
|
rel = payload.get("file_path") or "SKILL.md"
|
|
p = base / rel
|
|
target_label = rel
|
|
else:
|
|
p = base / "SKILL.md"
|
|
try:
|
|
if p.exists():
|
|
current = p.read_text(encoding="utf-8")
|
|
except Exception:
|
|
current = ""
|
|
|
|
if action == "edit":
|
|
new = payload.get("content") or ""
|
|
elif action == "patch":
|
|
old_s = payload.get("old_string") or ""
|
|
new_s = payload.get("new_string") or ""
|
|
new = current.replace(old_s, new_s) if current else f"(patch {old_s!r} → {new_s!r})"
|
|
elif action == "write_file":
|
|
new = payload.get("file_content") or ""
|
|
elif action == "remove_file":
|
|
return f"remove file: {payload.get('file_path')} from skill '{name}'"
|
|
elif action == "delete":
|
|
return f"delete skill '{name}'"
|
|
else:
|
|
return f"({action} on '{name}')"
|
|
|
|
diff = difflib.unified_diff(
|
|
current.splitlines(keepends=True),
|
|
new.splitlines(keepends=True),
|
|
fromfile=f"a/{target_label}",
|
|
tofile=f"b/{target_label}",
|
|
)
|
|
text = "".join(diff)
|
|
return text or "(no textual change)"
|