feat(memory,skills): approve/deny gate for memory + skill writes (#38199)

Adds memory.write_mode and skills.write_mode (on|off|approve), applied to
both foreground turns and the background self-improvement review fork — the
source of the unprompted 'wrong assumption' saves users reported.

- on (default): write freely, unchanged behaviour
- off: never write; the tool returns a clean disabled result
- approve: don't commit. Memory foreground writes prompt inline (small,
  reviewable in a chat bubble); background memory writes and ALL skill writes
  stage to a pending store instead (a SKILL.md is too large to review inline,
  and a daemon thread can't block on a prompt)

Review staged writes from CLI or any messaging platform:
  /memory pending|approve|reject|mode
  /skills pending|approve|reject|diff|mode

Skill review respects the size asymmetry: inline you see a one-line gist;
the full unified diff stays out-of-band (/skills diff, dashboard, or the
staged JSON file).

New: tools/write_approval.py (gate + pending store), hermes_cli/
write_approval_commands.py (shared CLI+gateway handlers). Gates wired at the
single entry points memory_tool() and skill_manage(), using the existing
write-origin ContextVar to distinguish foreground from background_review.
This commit is contained in:
Teknium 2026-06-09 21:51:43 -07:00 committed by GitHub
parent fdc90346ea
commit 96af61b6ef
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1270 additions and 11 deletions

3
cli.py
View file

@ -6932,7 +6932,6 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
def _show_gateway_status(self):
"""Show status of the gateway and connected messaging platforms."""
from gateway.config import load_gateway_config, Platform
@ -7245,6 +7244,8 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
elif canonical == "skills":
with self._busy_command(self._slow_command_status(cmd_original)):
self._handle_skills_command(cmd_original)
elif canonical == "memory":
self._handle_memory_command(cmd_original)
elif canonical == "platforms":
self._show_gateway_status()
elif canonical == "status":

View file

@ -7056,6 +7056,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
if canonical == "reasoning":
return await self._handle_reasoning_command(event)
if canonical == "memory":
return await self._handle_memory_command(event)
if canonical == "fast":
return await self._handle_fast_command(event)

View file

@ -1954,6 +1954,48 @@ class GatewaySlashCommandsMixin:
self._evict_cached_agent(session_key)
return t("gateway.reasoning.set_session", effort=effort)
async def _handle_memory_command(self, event: MessageEvent) -> str:
"""Handle /memory — review pending memory writes + set write mode.
Memory entries are small enough to review inline in a chat bubble, so
the full pending/approve/reject/mode flow works on every platform.
Mode changes persist to config.yaml and evict the cached agent so the
new write_mode takes effect on the next message.
"""
from gateway.run import _hermes_home
from hermes_cli.write_approval_commands import handle_pending_subcommand
from tools import write_approval as wa
from tools.memory_tool import MemoryStore
raw_args = event.get_command_args().strip()
args = raw_args.split() if raw_args else []
session_key = self._session_key_for_source(event.source)
config_path = _hermes_home / "config.yaml"
def _set_mode(mode: str):
import yaml
user_config = {}
if config_path.exists():
with open(config_path, encoding="utf-8") as f:
user_config = yaml.safe_load(f) or {}
user_config.setdefault("memory", {})["write_mode"] = mode
atomic_yaml_write(config_path, user_config)
# New write_mode must take effect next message → drop cached agent.
self._evict_cached_agent(session_key)
# Apply approved writes against a fresh on-disk store (the gateway has
# no long-lived agent; the store persists to the same MEMORY/USER.md).
store = MemoryStore()
store.load_from_disk()
out = handle_pending_subcommand(
wa.MEMORY, args, memory_store=store, set_mode_fn=_set_mode,
)
if out is None:
out = ("Unknown /memory subcommand. Use: pending, approve <id>, "
"reject <id>, mode <on|off|approve>.")
return out
async def _handle_fast_command(self, event: MessageEvent) -> str:
"""Handle /fast — mirror the CLI Priority Processing toggle in gateway chats."""
from gateway.run import _hermes_home, _load_gateway_config, _resolve_gateway_model

View file

@ -1301,9 +1301,46 @@ class CLICommandsMixin:
def _handle_skills_command(self, cmd: str):
"""Handle /skills slash command — delegates to hermes_cli.skills_hub."""
from cli import ChatConsole
# Intercept write-approval review subcommands first (pending/approve/
# reject/diff/mode); everything else goes to the skills hub.
parts = cmd.strip().split()
args = parts[1:] if len(parts) > 1 else []
if args and args[0].lower() in {"pending", "approve", "apply", "reject",
"deny", "drop", "diff", "mode"}:
from hermes_cli.write_approval_commands import handle_pending_subcommand
from tools import write_approval as wa
out = handle_pending_subcommand(
wa.SKILLS, args,
set_mode_fn=lambda m: self._save_write_mode("skills", m),
)
if out is not None:
print(out)
return
from hermes_cli.skills_hub import handle_skills_slash
handle_skills_slash(cmd, ChatConsole())
def _handle_memory_command(self, cmd: str):
"""Handle /memory slash command — pending review + write-mode control."""
from hermes_cli.write_approval_commands import handle_pending_subcommand
from tools import write_approval as wa
parts = cmd.strip().split()
args = parts[1:] if len(parts) > 1 else []
store = getattr(self.agent, "_memory_store", None) if getattr(self, "agent", None) else None
out = handle_pending_subcommand(
wa.MEMORY, args,
memory_store=store,
set_mode_fn=lambda m: self._save_write_mode("memory", m),
)
if out is None:
out = ("Unknown /memory subcommand. "
"Use: pending, approve <id>, reject <id>, mode <on|off|approve>.")
print(out)
def _save_write_mode(self, subsystem: str, mode: str):
"""Persist <subsystem>.write_mode to config (for /memory|/skills mode)."""
from cli import save_config_value
save_config_value(f"{subsystem}.write_mode", mode)
def _handle_background_command(self, cmd: str):
"""Handle /background <prompt> — run a prompt in a separate background session.

View file

@ -167,7 +167,12 @@ COMMAND_REGISTRY: list[CommandDef] = [
cli_only=True),
CommandDef("skills", "Search, install, inspect, or manage skills",
"Tools & Skills", cli_only=True,
subcommands=("search", "browse", "inspect", "install", "audit")),
subcommands=("search", "browse", "inspect", "install", "audit",
"pending", "approve", "reject", "diff", "mode")),
CommandDef("memory", "Review pending memory writes / set write mode",
"Tools & Skills",
args_hint="[pending|approve|reject|mode] [id|on|off|approve]",
subcommands=("pending", "approve", "reject", "mode")),
CommandDef("bundles", "List skill bundles (aliases /<name> for multiple skills)",
"Tools & Skills"),
CommandDef("cron", "Manage scheduled tasks", "Tools & Skills",

View file

@ -1653,6 +1653,18 @@ DEFAULT_CONFIG = {
"memory": {
"memory_enabled": True,
"user_profile_enabled": True,
# Write gate for the memory tool (add/replace/remove), applied to BOTH
# foreground agent turns and the background self-improvement review fork
# (the source of unprompted "wrong assumption" saves users reported):
# on — write freely (default, current behaviour)
# off — never write; the memory tool returns a clean disabled result
# approve — foreground writes block on an inline approve/deny prompt
# (entries are small enough to review in a chat bubble);
# background-review writes are staged for review instead of
# committed (a daemon thread cannot block on a prompt).
# Pending entries: /memory pending, /memory approve <id>,
# /memory reject <id>.
"write_mode": "on",
"memory_char_limit": 2200, # ~800 tokens at 2.75 chars/token
"user_char_limit": 1375, # ~500 tokens at 2.75 chars/token
# External memory provider plugin (empty = built-in only).
@ -1757,6 +1769,17 @@ DEFAULT_CONFIG = {
# External hub installs (trusted/community sources) are always
# scanned regardless of this setting.
"guard_agent_created": False,
# Write gate for skill_manage (create/edit/patch/write_file/delete/
# remove_file), applied to BOTH foreground agent turns and the
# background self-improvement review fork:
# on — write freely (default, current behaviour)
# off — never write; skill_manage returns a clean disabled result
# approve — stage the write for review instead of committing.
# Pending skills are listed with /skills pending, reviewed
# with /skills diff <id> (full diff — CLI/dashboard/file,
# never crammed into a chat bubble), and applied with
# /skills approve <id> or dropped with /skills reject <id>.
"write_mode": "on",
},
# Curator — background skill maintenance.

View file

@ -0,0 +1,197 @@
#!/usr/bin/env python3
"""Shared handlers for the /memory and /skills write-approval subcommands.
Both the interactive CLI (``cli.py``) and the gateway (``gateway/run.py``) call
into this module so the pending-review UX (list / approve / reject / diff /
mode) lives in one place. Each caller owns only its surface concerns:
formatting the returned text and, for the gateway, persisting config + evicting
the cached agent on a mode change.
Every public handler returns a plain text string suitable for both a terminal
and a chat message. Skill diffs are intentionally NOT inlined here the
``diff`` handler returns the full diff for the CLI pager, but on a messaging
platform the gateway truncates it and points the user at the dashboard / file.
"""
from __future__ import annotations
import json
from typing import List, Optional
from tools import write_approval as wa
_VALID_MODES = (wa.MODE_ON, wa.MODE_OFF, wa.MODE_APPROVE)
# ---------------------------------------------------------------------------
# Formatting helpers
# ---------------------------------------------------------------------------
def _fmt_pending_list(subsystem: str) -> str:
records = wa.list_pending(subsystem)
if not records:
return f"No pending {subsystem} writes."
lines = [f"Pending {subsystem} writes ({len(records)}):"]
for r in records:
origin = r.get("origin", "foreground")
tag = " [auto]" if origin == "background_review" else ""
lines.append(f" {r['id']}{tag} {r.get('summary', '')}")
where = "/{s} approve <id>".format(s=subsystem)
lines.append("")
lines.append(f"Apply: {where} Reject: /{subsystem} reject <id>")
if subsystem == wa.SKILLS:
lines.append("Review full diff: /skills diff <id>")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Subcommand dispatch
# ---------------------------------------------------------------------------
def handle_pending_subcommand(
subsystem: str,
args: List[str],
*,
memory_store=None,
set_mode_fn=None,
) -> Optional[str]:
"""Dispatch a /memory or /skills subcommand.
Args:
subsystem: ``memory`` or ``skills``.
args: tokens after the slash command (e.g. ``["approve", "a1b2"]``).
memory_store: live MemoryStore for applying approved memory writes
(CLI passes ``self.agent._memory_store``; gateway applies against a
freshly loaded store).
set_mode_fn: optional callable ``(mode: str) -> None`` that persists the
new write_mode to config (gateway provides this; CLI uses its own
``save_config_value`` and passes a closure).
Returns a text string to show the user. Returns None when the args are not
a write-approval subcommand (caller falls through to its other handling,
e.g. /skills search).
"""
if not args:
# Bare /memory or /skills with no sub → show pending + current mode.
mode = wa.get_write_mode(subsystem)
return f"{subsystem}.write_mode = {mode}\n\n" + _fmt_pending_list(subsystem)
sub = args[0].lower()
rest = args[1:]
if sub == "pending":
return _fmt_pending_list(subsystem)
if sub in {"approve", "apply"}:
return _approve(subsystem, rest, memory_store)
if sub in {"reject", "deny", "drop"}:
return _reject(subsystem, rest)
if sub == "diff" and subsystem == wa.SKILLS:
return _diff(rest)
if sub == "mode":
return _set_mode(subsystem, rest, set_mode_fn)
return None # not ours — caller handles
def _resolve_one(subsystem: str, rest: List[str]):
if not rest:
return None, f"Usage: /{subsystem} approve|reject <id> (or 'all')"
return rest[0], None
def _approve(subsystem: str, rest: List[str], memory_store) -> str:
target, err = _resolve_one(subsystem, rest)
if err or target is None:
return err or f"Usage: /{subsystem} approve <id>"
records = wa.list_pending(subsystem)
if not records:
return f"No pending {subsystem} writes."
if target.lower() == "all":
targets = list(records)
else:
rec = wa.get_pending(subsystem, target)
if not rec:
return f"No pending {subsystem} write with id '{target}'."
targets = [rec]
applied, failed = 0, []
for rec in targets:
ok, msg = _apply_one(subsystem, rec, memory_store)
if ok:
wa.discard_pending(subsystem, rec["id"])
applied += 1
else:
failed.append(f"{rec['id']}: {msg}")
out = [f"Approved {applied} {subsystem} write(s)."]
if failed:
out.append("Failed:")
out.extend(f" {f}" for f in failed)
return "\n".join(out)
def _apply_one(subsystem: str, rec, memory_store):
payload = rec.get("payload", {})
try:
if subsystem == wa.MEMORY:
if memory_store is None:
return False, "memory store unavailable"
from tools.memory_tool import apply_memory_pending
result = apply_memory_pending(payload, memory_store)
return bool(result.get("success")), result.get("error", "")
else:
from tools.skill_manager_tool import apply_skill_pending
result = json.loads(apply_skill_pending(payload))
return bool(result.get("success")), result.get("error", "")
except Exception as e:
return False, str(e)
def _reject(subsystem: str, rest: List[str]) -> str:
target, err = _resolve_one(subsystem, rest)
if err or target is None:
return err or f"Usage: /{subsystem} reject <id>"
if target.lower() == "all":
n = 0
for rec in wa.list_pending(subsystem):
if wa.discard_pending(subsystem, rec["id"]):
n += 1
return f"Rejected {n} pending {subsystem} write(s)."
if wa.discard_pending(subsystem, target):
return f"Rejected pending {subsystem} write '{target}'."
return f"No pending {subsystem} write with id '{target}'."
def _diff(rest: List[str]) -> str:
if not rest:
return "Usage: /skills diff <id>"
rec = wa.get_pending(wa.SKILLS, rest[0])
if not rec:
return f"No pending skill write with id '{rest[0]}'."
diff = wa.skill_pending_diff(rec)
header = f"# Pending skill write {rec['id']}: {rec.get('summary', '')}\n"
return header + "\n" + diff
def _set_mode(subsystem: str, rest: List[str], set_mode_fn) -> str:
if not rest:
cur = wa.get_write_mode(subsystem)
return (f"{subsystem}.write_mode = {cur}\n"
f"Set with: /{subsystem} mode <on|off|approve>")
mode = rest[0].lower()
if mode not in _VALID_MODES:
return f"Invalid mode '{mode}'. Use: on, off, approve."
if set_mode_fn is None:
return (f"To change {subsystem} write mode, run:\n"
f" hermes config set {subsystem}.write_mode {mode}")
try:
set_mode_fn(mode)
except Exception as e:
return f"Failed to set {subsystem}.write_mode: {e}"
return f"{subsystem}.write_mode set to '{mode}'."

View file

@ -336,19 +336,23 @@ class TestSlackNativeSlashes:
)
def test_includes_aliases_as_first_class_slashes(self):
"""Aliases (/btw, /bg, /reset) must be registered as standalone
"""Aliases (/btw, /bg, /reset, …) must be registered as standalone
slashes this is the whole point of native-slashes parity.
Note: Slack's manifest hard-caps slash commands at 50
(``_SLACK_MAX_SLASH_COMMANDS``). Canonical names win slots first,
then aliases, so the lowest-priority aliases can be clamped off
once the registry fills the cap (e.g. ``/q`` once ``/version``
landed). The surviving aliases below still prove alias parity;
anything dropped remains reachable via ``/hermes <command>``."""
names = {n for n, _d, _h in slack_native_slashes()}
Asserts the contract (aliases are surfaced as first-class slashes),
not a specific alias's survival of Slack's 50-slash clamp which alias
lands last shifts whenever a canonical command is added, so pinning one
name (previously ``q``) made this a change-detector.
"""
slashes = slack_native_slashes()
names = {n for n, _d, _h in slashes}
# Aliases that sort early in the registry always fit under the cap.
assert "btw" in names
assert "bg" in names
assert "reset" in names
# And at least one alias is surfaced as an alias entry (description
# carries the "Alias for /…" marker), proving the alias pass ran.
assert any(d.startswith("Alias for /") for _n, d, _h in slashes)
def test_telegram_parity(self):
"""Every Telegram bot command must be registerable on Slack too.

View file

@ -0,0 +1,241 @@
"""Tests for the memory/skill write-approval gate (tools/write_approval.py)
and the shared slash-command handlers (hermes_cli/write_approval_commands.py).
Covers the tri-state write_mode (on/off/approve) for both subsystems, the
foreground-vs-background staging split, pending store CRUD, and the
list/approve/reject/diff/mode subcommand dispatch.
"""
import json
import os
import tempfile
import shutil
import pytest
@pytest.fixture
def hermes_home(monkeypatch):
d = tempfile.mkdtemp(prefix="hermes_wa_test_")
home = os.path.join(d, ".hermes")
os.makedirs(home)
monkeypatch.setenv("HERMES_HOME", home)
yield home
shutil.rmtree(d, ignore_errors=True)
def _set_mode(subsystem, mode):
import hermes_cli.config as cfg
c = cfg.load_config()
c.setdefault(subsystem, {})["write_mode"] = mode
cfg.save_config(c)
# ---------------------------------------------------------------------------
# Mode resolution
# ---------------------------------------------------------------------------
def test_default_write_mode_is_on(hermes_home):
from tools import write_approval as wa
assert wa.get_write_mode("memory") == "on"
assert wa.get_write_mode("skills") == "on"
def test_invalid_subsystem_returns_on(hermes_home):
from tools import write_approval as wa
assert wa.get_write_mode("bogus") == "on"
def test_normalize_mode_handles_yaml_bool():
from tools import write_approval as wa
assert wa._normalize_mode(False) == "off"
assert wa._normalize_mode(True) == "on"
assert wa._normalize_mode("approve") == "approve"
assert wa._normalize_mode("garbage") == "on"
# ---------------------------------------------------------------------------
# Memory gate
# ---------------------------------------------------------------------------
def test_memory_off_blocks_write(hermes_home):
from tools.memory_tool import memory_tool, MemoryStore
_set_mode("memory", "off")
store = MemoryStore(); store.load_from_disk()
r = json.loads(memory_tool("add", "user", "should not save", store=store))
assert r["success"] is False
assert "disabled" in r["error"].lower()
assert store.user_entries == []
def test_memory_on_allows_write(hermes_home):
from tools.memory_tool import memory_tool, MemoryStore
_set_mode("memory", "on")
store = MemoryStore(); store.load_from_disk()
r = json.loads(memory_tool("add", "user", "save me", store=store))
assert r["success"] is True
assert r["entry_count"] == 1
def test_memory_approve_no_interactive_stages(hermes_home):
# No approval callback registered and not a gateway context → stage.
from tools.memory_tool import memory_tool, MemoryStore
from tools import write_approval as wa
_set_mode("memory", "approve")
store = MemoryStore(); store.load_from_disk()
r = json.loads(memory_tool("add", "memory", "stage me", store=store))
assert r.get("staged") is True
assert r.get("pending_id")
# Not written to the live store yet.
assert store.memory_entries == []
pend = wa.list_pending("memory")
assert len(pend) == 1
assert pend[0]["id"] == r["pending_id"]
def test_memory_approve_then_apply(hermes_home):
from tools.memory_tool import memory_tool, MemoryStore, apply_memory_pending
from tools import write_approval as wa
_set_mode("memory", "approve")
store = MemoryStore(); store.load_from_disk()
r = json.loads(memory_tool("add", "user", "approved entry", store=store))
pid = r["pending_id"]
rec = wa.get_pending("memory", pid)
result = apply_memory_pending(rec["payload"], store)
assert result["success"] is True
assert "approved entry" in store.user_entries[0]
# ---------------------------------------------------------------------------
# Skill gate
# ---------------------------------------------------------------------------
_SKILL = (
"---\nname: test-skill\ndescription: A test skill\nversion: 1.0.0\n---\n"
"# Test\nbody\n"
)
def test_skill_off_blocks_create(hermes_home):
from tools.skill_manager_tool import skill_manage
_set_mode("skills", "off")
r = json.loads(skill_manage("create", "blocked-skill", content=_SKILL))
assert r["success"] is False
assert "disabled" in r["error"].lower()
def test_skill_approve_always_stages(hermes_home):
# Skills stage even in the foreground (too big to review inline).
from tools.skill_manager_tool import skill_manage
from tools import write_approval as wa
_set_mode("skills", "approve")
r = json.loads(skill_manage("create", "staged-skill", content=_SKILL))
assert r.get("staged") is True
assert "staged-skill" in r.get("gist", "")
assert wa.pending_count("skills") == 1
def test_skill_approve_then_apply_writes_file(hermes_home):
# SKILLS_DIR is resolved at import time, so reload the skill module under
# this test's HERMES_HOME to exercise the real on-disk write path.
import importlib
import tools.skill_manager_tool as smt
importlib.reload(smt)
from tools import write_approval as wa
_set_mode("skills", "approve")
r = json.loads(smt.skill_manage("create", "applied-skill", content=_SKILL))
rec = wa.get_pending("skills", r["pending_id"])
res = json.loads(smt.apply_skill_pending(rec["payload"]))
assert res["success"] is True
assert smt._find_skill("applied-skill") is not None
def test_skill_create_diff_is_full_content(hermes_home):
from tools.skill_manager_tool import skill_manage
from tools import write_approval as wa
_set_mode("skills", "approve")
r = json.loads(skill_manage("create", "diff-skill", content=_SKILL))
rec = wa.get_pending("skills", r["pending_id"])
diff = wa.skill_pending_diff(rec)
assert "name: test-skill" in diff
# ---------------------------------------------------------------------------
# Pending store CRUD
# ---------------------------------------------------------------------------
def test_pending_store_roundtrip(hermes_home):
from tools import write_approval as wa
rec = wa.stage_write("memory", {"action": "add", "target": "user", "content": "x"},
summary="add x", origin="foreground")
assert wa.pending_count("memory") == 1
got = wa.get_pending("memory", rec["id"])
assert got["payload"]["content"] == "x"
assert wa.discard_pending("memory", rec["id"]) is True
assert wa.pending_count("memory") == 0
assert wa.get_pending("memory", rec["id"]) is None
# ---------------------------------------------------------------------------
# Shared command handler
# ---------------------------------------------------------------------------
def test_handle_pending_list_empty(hermes_home):
from hermes_cli.write_approval_commands import handle_pending_subcommand
from tools import write_approval as wa
out = handle_pending_subcommand(wa.MEMORY, ["pending"])
assert "No pending memory" in out
def test_handle_approve_all(hermes_home):
from hermes_cli.write_approval_commands import handle_pending_subcommand
from tools.memory_tool import MemoryStore
from tools import write_approval as wa
store = MemoryStore(); store.load_from_disk()
wa.stage_write("memory", {"action": "add", "target": "user", "content": "a"},
summary="a", origin="foreground")
wa.stage_write("memory", {"action": "add", "target": "user", "content": "b"},
summary="b", origin="foreground")
out = handle_pending_subcommand(wa.MEMORY, ["approve", "all"], memory_store=store)
assert "Approved 2" in out
assert wa.pending_count("memory") == 0
assert len(store.user_entries) == 2
def test_handle_reject(hermes_home):
from hermes_cli.write_approval_commands import handle_pending_subcommand
from tools import write_approval as wa
rec = wa.stage_write("skills", {"action": "create", "name": "s"},
summary="create s", origin="background_review")
out = handle_pending_subcommand(wa.SKILLS, ["reject", rec["id"]])
assert "Rejected" in out
assert wa.pending_count("skills") == 0
def test_handle_mode_set(hermes_home):
from hermes_cli.write_approval_commands import handle_pending_subcommand
from tools import write_approval as wa
captured = {}
out = handle_pending_subcommand(
wa.MEMORY, ["mode", "approve"],
set_mode_fn=lambda m: captured.update(mode=m),
)
assert captured["mode"] == "approve"
assert "approve" in out
def test_handle_mode_invalid(hermes_home):
from hermes_cli.write_approval_commands import handle_pending_subcommand
from tools import write_approval as wa
out = handle_pending_subcommand(wa.MEMORY, ["mode", "bogus"],
set_mode_fn=lambda m: None)
assert "Invalid mode" in out
def test_handle_unknown_subcommand_returns_none(hermes_home):
from hermes_cli.write_approval_commands import handle_pending_subcommand
from tools import write_approval as wa
# An unrecognized /skills subcommand (e.g. 'search') must return None so
# the CLI falls through to the skills hub.
out = handle_pending_subcommand(wa.SKILLS, ["search", "foo"])
assert out is None

View file

@ -606,6 +606,63 @@ class MemoryStore:
raise RuntimeError(f"Failed to write memory file {path}: {e}")
def _apply_write_gate(action: str, target: str, content: Optional[str],
old_text: Optional[str]) -> Optional[str]:
"""Evaluate the memory write gate. Returns a JSON tool-result string when
the write should NOT proceed normally (blocked or staged), or None when the
caller should perform the real write.
Only the mutating actions (add/replace/remove) are gated.
"""
if action not in {"add", "replace", "remove"}:
return None
try:
from tools import write_approval as wa
except Exception:
# If the gate module can't load, fail open (current behaviour) rather
# than blocking all memory writes.
return None
# Build a small inline summary/detail for the foreground approval prompt.
label = "user profile" if target == "user" else "memory"
if action == "add":
summary = f"add to {label}"
detail = content or ""
elif action == "replace":
summary = f"replace in {label}"
detail = f"old: {old_text}\nnew: {content}"
else: # remove
summary = f"remove from {label}"
detail = old_text or ""
decision = wa.evaluate_gate(wa.MEMORY, inline_summary=summary, inline_detail=detail)
if decision.allow:
return None
if decision.blocked:
return tool_error(decision.message, success=False)
# stage
payload = {
"action": action,
"target": target,
"content": content,
"old_text": old_text,
}
record = wa.stage_write(
wa.MEMORY, payload,
summary=f"{summary}: {detail[:120]}",
origin=wa.current_origin(),
)
return json.dumps(
{"success": True, "staged": True, "pending_id": record["id"],
"message": decision.message},
ensure_ascii=False,
)
def memory_tool(
action: str,
target: str = "memory",
@ -624,6 +681,12 @@ def memory_tool(
if target not in {"memory", "user"}:
return tool_error(f"Invalid target '{target}'. Use 'memory' or 'user'.", success=False)
# Write gate: off blocks the write; approve stages it (background) or
# prompts inline (foreground). on (default) passes straight through.
gate_result = _apply_write_gate(action, target, content, old_text)
if gate_result is not None:
return gate_result
if action == "add":
if not content:
return tool_error("Content is required for 'add' action.", success=False)
@ -652,7 +715,23 @@ def check_memory_requirements() -> bool:
return True
# =============================================================================
def apply_memory_pending(payload: Dict[str, Any], store: "MemoryStore") -> Dict[str, Any]:
"""Replay a staged memory write directly against the store, bypassing the
write gate. Called by the /memory approve handler.
Returns the store's result dict.
"""
action = payload.get("action")
target = payload.get("target", "memory")
content = payload.get("content") or ""
old_text = payload.get("old_text") or ""
if action == "add":
return store.add(target, content)
if action == "replace":
return store.replace(target, old_text, content)
if action == "remove":
return store.remove(target, old_text)
return {"success": False, "error": f"Unknown staged action '{action}'."}
# OpenAI Function-Calling Schema
# =============================================================================

View file

@ -822,6 +822,75 @@ def _remove_file(name: str, file_path: str) -> Dict[str, Any]:
# Main entry point
# =============================================================================
# ContextVar bypass: set while replaying an already-approved staged skill write
# so skill_manage() does not re-gate (and re-stage) it.
import contextvars as _ctxvars
_skill_gate_bypass: "_ctxvars.ContextVar[bool]" = _ctxvars.ContextVar(
"skill_gate_bypass", default=False
)
def _apply_skill_write_gate(action, name, **payload_kwargs):
"""Evaluate the skill write gate. Returns a JSON tool-result string when the
write should NOT proceed (blocked or staged), or None to perform the real
write. Bypassed during approved-pending replay.
"""
if action not in {"create", "edit", "patch", "delete", "write_file", "remove_file"}:
return None
if _skill_gate_bypass.get():
return None
try:
from tools import write_approval as wa
except Exception:
return None # fail open
decision = wa.evaluate_gate(wa.SKILLS)
if decision.allow:
return None
if decision.blocked:
return tool_error(decision.message, success=False)
# stage — record the full skill_manage kwargs so approval can replay it.
payload = {"action": action, "name": name}
payload.update({k: v for k, v in payload_kwargs.items() if v is not None})
gist = wa.skill_gist(
action, name,
content=payload_kwargs.get("content") or "",
file_path=payload_kwargs.get("file_path") or "",
old_string=payload_kwargs.get("old_string") or "",
new_string=payload_kwargs.get("new_string") or "",
)
record = wa.stage_write(wa.SKILLS, payload, summary=gist, origin=wa.current_origin())
return json.dumps(
{"success": True, "staged": True, "pending_id": record["id"],
"gist": gist, "message": decision.message},
ensure_ascii=False,
)
def apply_skill_pending(payload: Dict[str, Any]) -> str:
"""Replay a staged skill write, bypassing the gate. Returns the tool result
JSON string. Called by the /skills approve handler.
"""
token = _skill_gate_bypass.set(True)
try:
return skill_manage(
action=payload.get("action", ""),
name=payload.get("name", ""),
content=payload.get("content"),
category=payload.get("category"),
file_path=payload.get("file_path"),
file_content=payload.get("file_content"),
old_string=payload.get("old_string"),
new_string=payload.get("new_string"),
replace_all=payload.get("replace_all", False),
absorbed_into=payload.get("absorbed_into"),
)
finally:
_skill_gate_bypass.reset(token)
def skill_manage(
action: str,
name: str,
@ -839,6 +908,19 @@ def skill_manage(
Returns JSON string with results.
"""
# Write gate: off blocks the write; approve stages it for review (skills are
# too large to review inline, so they always stage regardless of origin).
# on (default) passes straight through. The gate is bypassed when this call
# is itself replaying an already-approved staged write (_skill_apply_pending).
gate_result = _apply_skill_write_gate(
action, name, content=content, category=category,
file_path=file_path, file_content=file_content,
old_string=old_string, new_string=new_string,
replace_all=replace_all, absorbed_into=absorbed_into,
)
if gate_result is not None:
return gate_result
if action == "create":
if not content:
return tool_error("content is required for 'create'. Provide the full SKILL.md text (frontmatter + body).", success=False)

491
tools/write_approval.py Normal file
View file

@ -0,0 +1,491 @@
#!/usr/bin/env python3
"""Write-approval gate + pending store for memory and skill writes.
Background
----------
The agent writes to two persistent stores that survive across sessions:
* **memory** MEMORY.md / USER.md, small (~200 char) declarative entries
* **skills** SKILL.md + supporting files, potentially huge (10-100 KB)
Both stores are written from two origins:
* **foreground** a normal agent turn (user is present / chatting)
* **background_review** the self-improvement review fork that runs after a
turn and autonomously decides what to save (the source of the
"wrong assumptions" users complained about)
This module lets the user gate those writes per-subsystem with a tri-state
``write_mode``:
* ``on`` write freely (current behaviour, default)
* ``off`` never write; the tool returns a clean "disabled" result
* ``approve`` do not commit the write; **stage** it to a pending store and
surface it for the user to approve or reject out-of-band
The size asymmetry between memory and skills is real and unavoidable: a memory
entry can be reviewed inline in a chat bubble; a 100 KB SKILL.md cannot. So
``approve`` mode stages BOTH to disk, but review affordances differ by subsystem
(see ``hermes_cli`` slash handlers): memory shows full content, skills show
metadata + a one-line gist + a ``diff`` escape hatch (CLI/dashboard/file).
Staging is mandatory for background-origin writes under ``approve`` (a daemon
thread cannot block on an interactive prompt). Foreground memory writes may
additionally block inline via the dangerous-command approval gate; foreground
skill writes always stage (too big to eyeball mid-loop).
Pending records live under ``<HERMES_HOME>/pending/{memory,skills}/<id>.json``
so they survive process restarts and can be reviewed from CLI, gateway, or the
web dashboard.
"""
from __future__ import annotations
import json
import logging
import os
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# Subsystem identifiers
MEMORY = "memory"
SKILLS = "skills"
_SUBSYSTEMS = (MEMORY, SKILLS)
# Tri-state write modes
MODE_ON = "on"
MODE_OFF = "off"
MODE_APPROVE = "approve"
_VALID_MODES = (MODE_ON, MODE_OFF, MODE_APPROVE)
# ---------------------------------------------------------------------------
# Config resolution
# ---------------------------------------------------------------------------
def get_write_mode(subsystem: str) -> str:
"""Return the configured write_mode for ``subsystem`` (memory|skills).
Reads ``<subsystem>.write_mode`` from config.yaml. Falls back to ``on``
(current behaviour) for any unset / invalid value so existing installs are
unaffected until the user opts in.
"""
if subsystem not in _SUBSYSTEMS:
return MODE_ON
try:
from hermes_cli.config import load_config, cfg_get
cfg = load_config()
raw = cfg_get(cfg, subsystem, "write_mode", default=MODE_ON)
except Exception:
return MODE_ON
return _normalize_mode(raw)
def _normalize_mode(value: Any) -> str:
"""Coerce a config value to a valid mode string.
YAML 1.1 parses bare ``off`` / ``on`` as booleans, so handle bools the way
the approval-mode normalizer does.
"""
if isinstance(value, bool):
return MODE_OFF if value is False else MODE_ON
if isinstance(value, str):
v = value.strip().lower()
if v in _VALID_MODES:
return v
return MODE_ON
# ---------------------------------------------------------------------------
# Pending store (file-backed)
# ---------------------------------------------------------------------------
def _pending_dir(subsystem: str) -> Path:
return get_hermes_home() / "pending" / subsystem
def stage_write(subsystem: str, payload: Dict[str, Any],
*, summary: str, origin: str) -> Dict[str, Any]:
"""Persist a pending write and return a short record describing it.
Args:
subsystem: ``memory`` or ``skills``.
payload: the exact kwargs needed to replay the write when approved
(e.g. ``{"action": "add", "target": "user", "content": "..."}``
for memory, or the full ``skill_manage`` kwargs for skills).
summary: a one-line human-readable description shown in pending lists.
For skills this is the LLM/heuristic gist; for memory it can be the
entry text itself.
origin: ``foreground`` or ``background_review`` recorded for audit.
Returns a dict with ``id`` and metadata. Best-effort: on disk failure it
logs and still returns a record (the write is simply lost, which is the
safe failure for an approval gate nothing is silently committed).
"""
pid = uuid.uuid4().hex[:8]
record = {
"id": pid,
"subsystem": subsystem,
"action": payload.get("action", ""),
"summary": (summary or "").strip(),
"origin": origin or "foreground",
"created_at": time.time(),
"payload": payload,
}
try:
d = _pending_dir(subsystem)
d.mkdir(parents=True, exist_ok=True)
path = d / f"{pid}.json"
tmp = path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(record, ensure_ascii=False, indent=2), encoding="utf-8")
os.replace(tmp, path)
except Exception as e: # pragma: no cover - disk failure path
logger.error("Failed to stage pending %s write: %s", subsystem, e, exc_info=True)
return record
def list_pending(subsystem: str) -> List[Dict[str, Any]]:
"""Return all pending records for ``subsystem``, oldest first."""
d = _pending_dir(subsystem)
if not d.exists():
return []
records: List[Dict[str, Any]] = []
for p in d.glob("*.json"):
try:
records.append(json.loads(p.read_text(encoding="utf-8")))
except Exception:
logger.warning("Skipping unreadable pending record: %s", p)
records.sort(key=lambda r: r.get("created_at", 0))
return records
def get_pending(subsystem: str, pending_id: str) -> Optional[Dict[str, Any]]:
"""Return a single pending record by id, or None."""
path = _pending_dir(subsystem) / f"{pending_id}.json"
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
def discard_pending(subsystem: str, pending_id: str) -> bool:
"""Delete a pending record. Returns True if it existed."""
path = _pending_dir(subsystem) / f"{pending_id}.json"
try:
if path.exists():
path.unlink()
return True
except Exception as e: # pragma: no cover
logger.error("Failed to discard pending %s/%s: %s", subsystem, pending_id, e)
return False
def pending_count(subsystem: str) -> int:
"""Cheap count of pending records (for notification badges)."""
d = _pending_dir(subsystem)
if not d.exists():
return 0
try:
return sum(1 for _ in d.glob("*.json"))
except Exception:
return 0
# ---------------------------------------------------------------------------
# Write origin
# ---------------------------------------------------------------------------
def current_origin() -> str:
"""Return the active write origin: ``foreground`` or ``background_review``.
Reuses the skill-provenance ContextVar, which the background review fork
already sets (see ``agent.background_review`` /
``AIAgent._spawn_background_review``). Foreground agent turns leave it at
the default ``foreground``.
"""
try:
from tools.skill_provenance import get_current_write_origin
return get_current_write_origin()
except Exception:
return "foreground"
def is_background() -> bool:
return current_origin() == "background_review"
# ---------------------------------------------------------------------------
# Gate decision
# ---------------------------------------------------------------------------
class GateDecision:
"""Result of evaluating the write gate for a single write attempt.
Exactly one of the boolean flags is True:
* ``allow`` proceed with the real write (mode ``on``, or an inline
approval was granted).
* ``blocked`` refuse the write (mode ``off``, or an inline approval was
denied). ``message`` explains why; surface it to the agent.
* ``stage`` do not write; the caller should stage the payload via
``stage_write`` (mode ``approve`` for a background write, or a
foreground write with no interactive prompt available). ``message`` is
the user-facing "staged for approval" note.
"""
__slots__ = ("allow", "blocked", "stage", "message")
def __init__(self, *, allow=False, blocked=False, stage=False, message=""):
self.allow = allow
self.blocked = blocked
self.stage = stage
self.message = message
def evaluate_gate(subsystem: str, *, inline_summary: str = "",
inline_detail: str = "") -> GateDecision:
"""Decide what to do with a pending write for ``subsystem``.
Args:
subsystem: ``memory`` or ``skills``.
inline_summary: short description used as the inline approval prompt
header (memory foreground path only).
inline_detail: full content shown in the inline prompt (memory entries
are small; skills never take the inline path).
Mode matrix:
on allow
off blocked
approve memory + foreground inline approve/deny prompt
memory + background stage
skills (any origin) stage (too big to review inline)
"""
mode = get_write_mode(subsystem)
if mode == MODE_ON:
return GateDecision(allow=True)
if mode == MODE_OFF:
return GateDecision(
blocked=True,
message=(
f"{subsystem.capitalize()} writes are disabled "
f"({subsystem}.write_mode = off). The change was not saved. "
f"Set {subsystem}.write_mode to 'on' or 'approve' to allow writes."
),
)
# mode == approve
background = is_background()
# Skills always stage — a SKILL.md is too large to review inline, and a
# background skill write happens in a daemon thread with no user present.
if subsystem == SKILLS or background:
where = "/skills pending" if subsystem == SKILLS else "/memory pending"
return GateDecision(
stage=True,
message=(
f"Staged for approval ({subsystem}.write_mode = approve). "
f"Not yet saved — review with {where}."
),
)
# Memory + foreground: if an interactive approval channel exists (CLI
# prompt_toolkit callback, or a gateway approve/deny round-trip), prompt
# inline — entries are small enough to show in full. Otherwise (script,
# batch, no listener) stage instead of forcing a blind deny.
if _interactive_approval_available():
granted = _prompt_inline_memory_approval(inline_summary, inline_detail)
if granted is True:
return GateDecision(allow=True)
if granted is False:
return GateDecision(
blocked=True,
message="Memory write denied by user. The change was not saved.",
)
# granted is None → prompt failed; fall through to staging.
return GateDecision(
stage=True,
message=(
"Staged for approval (memory.write_mode = approve). "
"Not yet saved — review with /memory pending."
),
)
def _interactive_approval_available() -> bool:
"""True when a foreground memory write can be approved inline.
Either a per-thread approval callback is registered (interactive CLI), or
the call is inside a gateway/API session that supports the /approve //deny
round-trip. Scripts, cron, and background threads have neither stage.
"""
try:
from tools.terminal_tool import _get_approval_callback
if _get_approval_callback() is not None:
return True
except Exception:
pass
try:
from tools.approval import _is_gateway_approval_context
return bool(_is_gateway_approval_context())
except Exception:
return False
def _prompt_inline_memory_approval(summary: str, detail: str) -> Optional[bool]:
"""Prompt the user inline to approve a memory write.
Returns True (approved), False (denied), or None (no interactive prompt
available on this thread caller should stage instead).
Reuses the dangerous-command approval machinery so the CLI prompt_toolkit
callback and the gateway ``/approve`` ``/deny`` round-trip both work without
duplicating that plumbing.
"""
try:
from tools.approval import prompt_dangerous_approval
except Exception:
return None
header = summary.strip() or "Save to memory?"
body = detail.strip()
description = f"Save to memory: {header}"
command = body if body else header
try:
choice = prompt_dangerous_approval(
command,
description,
allow_permanent=False,
)
except Exception as e: # pragma: no cover
logger.error("Inline memory approval prompt failed: %s", e)
return None
if choice in {"once", "session"}:
return True
if choice == "deny":
return False
# Any other outcome (e.g. timeout that returns "deny" already handled) →
# treat unknown as no-decision so we stage rather than silently drop.
return None
# ---------------------------------------------------------------------------
# Skill-specific helpers (gist + diff for the review affordances)
# ---------------------------------------------------------------------------
def skill_gist(action: str, name: str, *, content: str = "",
file_path: str = "", old_string: str = "",
new_string: str = "") -> str:
"""Build a one-line human gist for a pending skill write.
Heuristic, no model call the gist surfaces enough to decide approve/reject
in a chat bubble, while the full diff stays behind /skills diff (CLI/
dashboard/file). For create/edit it pulls the frontmatter ``description:``;
for patch/write_file it describes the size of the change.
"""
if action in {"create", "edit"} and content:
desc = _frontmatter_description(content)
size = f"{len(content) // 1024 + 1} KB" if len(content) >= 1024 else f"{len(content)} chars"
verb = "create" if action == "create" else "rewrite"
if desc:
return f"{verb} '{name}'{desc} ({size})"
return f"{verb} '{name}' ({size})"
if action == "patch":
target = file_path or "SKILL.md"
removed = old_string.count("\n") + 1 if old_string else 0
added = new_string.count("\n") + 1 if new_string else 0
return f"patch '{name}' {target} (+{added}/-{removed} lines)"
if action == "write_file":
return f"write {file_path} in '{name}'"
if action == "remove_file":
return f"remove {file_path} from '{name}'"
if action == "delete":
return f"delete skill '{name}'"
return f"{action} '{name}'"
def _frontmatter_description(content: str) -> str:
"""Extract the ``description:`` value from SKILL.md YAML frontmatter."""
import re
m = re.search(r"^description:\s*(.+)$", content, re.MULTILINE)
if not m:
return ""
desc = m.group(1).strip().strip("'\"")
return desc[:140]
def skill_pending_diff(record: Dict[str, Any]) -> str:
"""Build a full unified diff (or full content) for a staged skill write.
Used by /skills diff <id> on a surface that can render it (CLI pager, web
dashboard, or by opening the pending JSON file). For create this is the new
file content; for edit/patch it is a unified diff against the current
on-disk skill.
"""
import difflib
payload = record.get("payload", {})
action = payload.get("action", "")
name = payload.get("name", "")
if action == "create":
return (payload.get("content") or "")
# Resolve current on-disk content for diffable actions.
try:
from tools.skill_manager_tool import _find_skill
except Exception:
_find_skill = None # type: ignore
current = ""
target_label = "SKILL.md"
if _find_skill is not None:
found = _find_skill(name)
if found:
base = found["path"]
if action == "edit":
p = base / "SKILL.md"
elif action in {"patch", "write_file"}:
rel = payload.get("file_path") or "SKILL.md"
p = base / rel
target_label = rel
else:
p = base / "SKILL.md"
try:
if p.exists():
current = p.read_text(encoding="utf-8")
except Exception:
current = ""
if action == "edit":
new = payload.get("content") or ""
elif action == "patch":
old_s = payload.get("old_string") or ""
new_s = payload.get("new_string") or ""
new = current.replace(old_s, new_s) if current else f"(patch {old_s!r}{new_s!r})"
elif action == "write_file":
new = payload.get("file_content") or ""
elif action == "remove_file":
return f"remove file: {payload.get('file_path')} from skill '{name}'"
elif action == "delete":
return f"delete skill '{name}'"
else:
return f"({action} on '{name}')"
diff = difflib.unified_diff(
current.splitlines(keepends=True),
new.splitlines(keepends=True),
fromfile=f"a/{target_label}",
tofile=f"b/{target_label}",
)
text = "".join(diff)
return text or "(no textual change)"

View file

@ -209,8 +209,62 @@ memory:
user_profile_enabled: true
memory_char_limit: 2200 # ~800 tokens
user_char_limit: 1375 # ~500 tokens
write_mode: on # on | off | approve
```
## Controlling memory writes (`write_mode`)
By default the agent saves memory freely — including from the background
self-improvement review that runs after a turn. If you'd rather not have
memory written behind your back, `memory.write_mode` gives you three options
(applied to **both** foreground turns and the background review):
| Mode | Behaviour |
|------|-----------|
| `on` | Write freely (default — current behaviour). |
| `off` | Never write. The memory tool returns a clean "disabled" result; nothing is saved. |
| `approve` | Don't commit writes — review them first. Foreground writes prompt you inline (entries are small enough to read in a chat bubble). Background-review writes are **staged** instead of committed (a background thread can't block on a prompt). |
Review staged writes from the CLI or any messaging platform:
```
/memory pending # list staged memory writes (auto ones tagged [auto])
/memory approve <id> # apply one (or 'all')
/memory reject <id> # drop one (or 'all')
/memory mode approve # change write_mode and persist it
```
This is the answer to "the agent saved a wrong assumption about me": set
`write_mode: approve`, and every save — especially the unprompted background
ones — waits for your yes/no before it ever enters your profile.
## Controlling skill writes (`skills.write_mode`)
Skills use the same three-state gate, but the review UX differs because a
`SKILL.md` is far too large to read in a chat bubble:
```yaml
skills:
write_mode: on # on | off | approve
```
In `approve` mode, skill writes (create / edit / patch / write_file / delete)
always **stage** regardless of origin. You review the one-line gist inline, but
the full diff stays out-of-band:
```
/skills pending # list staged skill writes + a one-line gist each
/skills diff <id> # full unified diff (best viewed in CLI or dashboard)
/skills approve <id> # apply it (or 'all')
/skills reject <id> # drop it (or 'all')
/skills mode approve # change write_mode and persist it
```
On a messaging platform, approve a skill from its gist + metadata, or open
`/skills diff` on the CLI / dashboard / the staged file under
`~/.hermes/pending/skills/<id>.json` when you want to read the whole change.
## External Memory Providers
For deeper, persistent memory that goes beyond MEMORY.md and USER.md, Hermes ships with 8 external memory provider plugins — including Honcho, OpenViking, Mem0, Hindsight, Holographic, RetainDB, ByteRover, and Supermemory.