mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
feat: filesystem checkpoints and /rollback command
Automatic filesystem snapshots before destructive file operations, with user-facing rollback. Inspired by PR #559 (by @alireza78a). Architecture: - Shadow git repos at ~/.hermes/checkpoints/{hash}/ via GIT_DIR - CheckpointManager: take/list/restore, turn-scoped dedup, pruning - Transparent — the LLM never sees it, no tool schema, no tokens - Once per turn — only first write_file/patch triggers a snapshot Integration: - Config: checkpoints.enabled + checkpoints.max_snapshots - CLI flag: hermes --checkpoints - Trigger: run_agent.py _execute_tool_calls() before write_file/patch - /rollback slash command in CLI + gateway (list, restore by number) - Pre-rollback snapshot auto-created on restore (undo the undo) Safety: - Never blocks file operations — all errors silently logged - Skips root dir, home dir, dirs >50K files - Disables gracefully when git not installed - Shadow repo completely isolated from project git Tests: 35 new tests, all passing (2798 total suite) Docs: feature page, config reference, CLI commands reference
This commit is contained in:
parent
de6750ed23
commit
c1775de56f
8 changed files with 1031 additions and 1 deletions
441
tools/checkpoint_manager.py
Normal file
441
tools/checkpoint_manager.py
Normal file
|
|
@ -0,0 +1,441 @@
|
|||
"""
|
||||
Checkpoint Manager — Transparent filesystem snapshots via shadow git repos.
|
||||
|
||||
Creates automatic snapshots of working directories before file-mutating
|
||||
operations (write_file, patch), triggered once per conversation turn.
|
||||
Provides rollback to any previous checkpoint.
|
||||
|
||||
This is NOT a tool — the LLM never sees it. It's transparent infrastructure
|
||||
controlled by the ``checkpoints`` config flag or ``--checkpoints`` CLI flag.
|
||||
|
||||
Architecture:
|
||||
~/.hermes/checkpoints/{sha256(abs_dir)[:16]}/ — shadow git repo
|
||||
HEAD, refs/, objects/ — standard git internals
|
||||
HERMES_WORKDIR — original dir path
|
||||
info/exclude — default excludes
|
||||
|
||||
The shadow repo uses GIT_DIR + GIT_WORK_TREE so no git state leaks
|
||||
into the user's project directory.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Set
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
CHECKPOINT_BASE = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "checkpoints"
|
||||
|
||||
DEFAULT_EXCLUDES = [
|
||||
"node_modules/",
|
||||
"dist/",
|
||||
"build/",
|
||||
".env",
|
||||
".env.*",
|
||||
".env.local",
|
||||
".env.*.local",
|
||||
"__pycache__/",
|
||||
"*.pyc",
|
||||
"*.pyo",
|
||||
".DS_Store",
|
||||
"*.log",
|
||||
".cache/",
|
||||
".next/",
|
||||
".nuxt/",
|
||||
"coverage/",
|
||||
".pytest_cache/",
|
||||
".venv/",
|
||||
"venv/",
|
||||
".git/",
|
||||
]
|
||||
|
||||
# Git subprocess timeout (seconds).
|
||||
_GIT_TIMEOUT: int = max(10, min(60, int(os.getenv("HERMES_CHECKPOINT_TIMEOUT", "30"))))
|
||||
|
||||
# Max files to snapshot — skip huge directories to avoid slowdowns.
|
||||
_MAX_FILES = 50_000
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shadow repo helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _shadow_repo_path(working_dir: str) -> Path:
|
||||
"""Deterministic shadow repo path: sha256(abs_path)[:16]."""
|
||||
abs_path = str(Path(working_dir).resolve())
|
||||
dir_hash = hashlib.sha256(abs_path.encode()).hexdigest()[:16]
|
||||
return CHECKPOINT_BASE / dir_hash
|
||||
|
||||
|
||||
def _git_env(shadow_repo: Path, working_dir: str) -> dict:
|
||||
"""Build env dict that redirects git to the shadow repo."""
|
||||
env = os.environ.copy()
|
||||
env["GIT_DIR"] = str(shadow_repo)
|
||||
env["GIT_WORK_TREE"] = str(Path(working_dir).resolve())
|
||||
env.pop("GIT_INDEX_FILE", None)
|
||||
env.pop("GIT_NAMESPACE", None)
|
||||
env.pop("GIT_ALTERNATE_OBJECT_DIRECTORIES", None)
|
||||
return env
|
||||
|
||||
|
||||
def _run_git(
|
||||
args: List[str],
|
||||
shadow_repo: Path,
|
||||
working_dir: str,
|
||||
timeout: int = _GIT_TIMEOUT,
|
||||
) -> tuple:
|
||||
"""Run a git command against the shadow repo. Returns (ok, stdout, stderr)."""
|
||||
env = _git_env(shadow_repo, working_dir)
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git"] + args,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout,
|
||||
env=env,
|
||||
cwd=str(Path(working_dir).resolve()),
|
||||
)
|
||||
return result.returncode == 0, result.stdout.strip(), result.stderr.strip()
|
||||
except subprocess.TimeoutExpired:
|
||||
return False, "", f"git timed out after {timeout}s: git {' '.join(args)}"
|
||||
except FileNotFoundError:
|
||||
return False, "", "git not found"
|
||||
except Exception as exc:
|
||||
return False, "", str(exc)
|
||||
|
||||
|
||||
def _init_shadow_repo(shadow_repo: Path, working_dir: str) -> Optional[str]:
|
||||
"""Initialise shadow repo if needed. Returns error string or None."""
|
||||
if (shadow_repo / "HEAD").exists():
|
||||
return None
|
||||
|
||||
shadow_repo.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
ok, _, err = _run_git(["init"], shadow_repo, working_dir)
|
||||
if not ok:
|
||||
return f"Shadow repo init failed: {err}"
|
||||
|
||||
_run_git(["config", "user.email", "hermes@local"], shadow_repo, working_dir)
|
||||
_run_git(["config", "user.name", "Hermes Checkpoint"], shadow_repo, working_dir)
|
||||
|
||||
info_dir = shadow_repo / "info"
|
||||
info_dir.mkdir(exist_ok=True)
|
||||
(info_dir / "exclude").write_text(
|
||||
"\n".join(DEFAULT_EXCLUDES) + "\n", encoding="utf-8"
|
||||
)
|
||||
|
||||
(shadow_repo / "HERMES_WORKDIR").write_text(
|
||||
str(Path(working_dir).resolve()) + "\n", encoding="utf-8"
|
||||
)
|
||||
|
||||
logger.debug("Initialised checkpoint repo at %s for %s", shadow_repo, working_dir)
|
||||
return None
|
||||
|
||||
|
||||
def _dir_file_count(path: str) -> int:
|
||||
"""Quick file count estimate (stops early if over _MAX_FILES)."""
|
||||
count = 0
|
||||
try:
|
||||
for _ in Path(path).rglob("*"):
|
||||
count += 1
|
||||
if count > _MAX_FILES:
|
||||
return count
|
||||
except (PermissionError, OSError):
|
||||
pass
|
||||
return count
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CheckpointManager
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class CheckpointManager:
|
||||
"""Manages automatic filesystem checkpoints.
|
||||
|
||||
Designed to be owned by AIAgent. Call ``new_turn()`` at the start of
|
||||
each conversation turn and ``ensure_checkpoint(dir, reason)`` before
|
||||
any file-mutating tool call. The manager deduplicates so at most one
|
||||
snapshot is taken per directory per turn.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
enabled : bool
|
||||
Master switch (from config / CLI flag).
|
||||
max_snapshots : int
|
||||
Keep at most this many checkpoints per directory.
|
||||
"""
|
||||
|
||||
def __init__(self, enabled: bool = False, max_snapshots: int = 50):
|
||||
self.enabled = enabled
|
||||
self.max_snapshots = max_snapshots
|
||||
self._checkpointed_dirs: Set[str] = set()
|
||||
self._git_available: Optional[bool] = None # lazy probe
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Turn lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def new_turn(self) -> None:
|
||||
"""Reset per-turn dedup. Call at the start of each agent iteration."""
|
||||
self._checkpointed_dirs.clear()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public API
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def ensure_checkpoint(self, working_dir: str, reason: str = "auto") -> bool:
|
||||
"""Take a checkpoint if enabled and not already done this turn.
|
||||
|
||||
Returns True if a checkpoint was taken, False otherwise.
|
||||
Never raises — all errors are silently logged.
|
||||
"""
|
||||
if not self.enabled:
|
||||
return False
|
||||
|
||||
# Lazy git probe
|
||||
if self._git_available is None:
|
||||
self._git_available = shutil.which("git") is not None
|
||||
if not self._git_available:
|
||||
logger.debug("Checkpoints disabled: git not found")
|
||||
if not self._git_available:
|
||||
return False
|
||||
|
||||
abs_dir = str(Path(working_dir).resolve())
|
||||
|
||||
# Skip root, home, and other overly broad directories
|
||||
if abs_dir in ("/", str(Path.home())):
|
||||
logger.debug("Checkpoint skipped: directory too broad (%s)", abs_dir)
|
||||
return False
|
||||
|
||||
# Already checkpointed this turn?
|
||||
if abs_dir in self._checkpointed_dirs:
|
||||
return False
|
||||
|
||||
self._checkpointed_dirs.add(abs_dir)
|
||||
|
||||
try:
|
||||
return self._take(abs_dir, reason)
|
||||
except Exception as e:
|
||||
logger.debug("Checkpoint failed (non-fatal): %s", e)
|
||||
return False
|
||||
|
||||
def list_checkpoints(self, working_dir: str) -> List[Dict]:
|
||||
"""List available checkpoints for a directory.
|
||||
|
||||
Returns a list of dicts with keys: hash, short_hash, timestamp, reason.
|
||||
Most recent first.
|
||||
"""
|
||||
abs_dir = str(Path(working_dir).resolve())
|
||||
shadow = _shadow_repo_path(abs_dir)
|
||||
|
||||
if not (shadow / "HEAD").exists():
|
||||
return []
|
||||
|
||||
ok, stdout, _ = _run_git(
|
||||
["log", "--format=%H|%h|%aI|%s", "--no-walk=unsorted",
|
||||
"--all" if False else "HEAD", # just HEAD lineage
|
||||
"-n", str(self.max_snapshots)],
|
||||
shadow, abs_dir,
|
||||
)
|
||||
|
||||
# Simpler: just use regular log
|
||||
ok, stdout, _ = _run_git(
|
||||
["log", "--format=%H|%h|%aI|%s", "-n", str(self.max_snapshots)],
|
||||
shadow, abs_dir,
|
||||
)
|
||||
|
||||
if not ok or not stdout:
|
||||
return []
|
||||
|
||||
results = []
|
||||
for line in stdout.splitlines():
|
||||
parts = line.split("|", 3)
|
||||
if len(parts) == 4:
|
||||
results.append({
|
||||
"hash": parts[0],
|
||||
"short_hash": parts[1],
|
||||
"timestamp": parts[2],
|
||||
"reason": parts[3],
|
||||
})
|
||||
return results
|
||||
|
||||
def restore(self, working_dir: str, commit_hash: str) -> Dict:
|
||||
"""Restore files to a checkpoint state.
|
||||
|
||||
Uses ``git checkout <hash> -- .`` which restores tracked files
|
||||
without moving HEAD — safe and reversible.
|
||||
|
||||
Returns dict with success/error info.
|
||||
"""
|
||||
abs_dir = str(Path(working_dir).resolve())
|
||||
shadow = _shadow_repo_path(abs_dir)
|
||||
|
||||
if not (shadow / "HEAD").exists():
|
||||
return {"success": False, "error": "No checkpoints exist for this directory"}
|
||||
|
||||
# Verify the commit exists
|
||||
ok, _, err = _run_git(
|
||||
["cat-file", "-t", commit_hash], shadow, abs_dir,
|
||||
)
|
||||
if not ok:
|
||||
return {"success": False, "error": f"Checkpoint '{commit_hash}' not found"}
|
||||
|
||||
# Take a checkpoint of current state before restoring (so you can undo the undo)
|
||||
self._take(abs_dir, f"pre-rollback snapshot (restoring to {commit_hash[:8]})")
|
||||
|
||||
# Restore
|
||||
ok, stdout, err = _run_git(
|
||||
["checkout", commit_hash, "--", "."],
|
||||
shadow, abs_dir, timeout=_GIT_TIMEOUT * 2,
|
||||
)
|
||||
|
||||
if not ok:
|
||||
return {"success": False, "error": f"Restore failed: {err}"}
|
||||
|
||||
# Get info about what was restored
|
||||
ok2, reason_out, _ = _run_git(
|
||||
["log", "--format=%s", "-1", commit_hash], shadow, abs_dir,
|
||||
)
|
||||
reason = reason_out if ok2 else "unknown"
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"restored_to": commit_hash[:8],
|
||||
"reason": reason,
|
||||
"directory": abs_dir,
|
||||
}
|
||||
|
||||
def get_working_dir_for_path(self, file_path: str) -> str:
|
||||
"""Resolve a file path to its working directory for checkpointing.
|
||||
|
||||
Walks up from the file's parent to find a reasonable project root
|
||||
(directory containing .git, pyproject.toml, package.json, etc.).
|
||||
Falls back to the file's parent directory.
|
||||
"""
|
||||
path = Path(file_path).resolve()
|
||||
if path.is_dir():
|
||||
candidate = path
|
||||
else:
|
||||
candidate = path.parent
|
||||
|
||||
# Walk up looking for project root markers
|
||||
markers = {".git", "pyproject.toml", "package.json", "Cargo.toml",
|
||||
"go.mod", "Makefile", "pom.xml", ".hg", "Gemfile"}
|
||||
check = candidate
|
||||
while check != check.parent:
|
||||
if any((check / m).exists() for m in markers):
|
||||
return str(check)
|
||||
check = check.parent
|
||||
|
||||
# No project root found — use the file's parent
|
||||
return str(candidate)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _take(self, working_dir: str, reason: str) -> bool:
|
||||
"""Take a snapshot. Returns True on success."""
|
||||
shadow = _shadow_repo_path(working_dir)
|
||||
|
||||
# Init if needed
|
||||
err = _init_shadow_repo(shadow, working_dir)
|
||||
if err:
|
||||
logger.debug("Checkpoint init failed: %s", err)
|
||||
return False
|
||||
|
||||
# Quick size guard — don't try to snapshot enormous directories
|
||||
if _dir_file_count(working_dir) > _MAX_FILES:
|
||||
logger.debug("Checkpoint skipped: >%d files in %s", _MAX_FILES, working_dir)
|
||||
return False
|
||||
|
||||
# Stage everything
|
||||
ok, _, err = _run_git(
|
||||
["add", "-A"], shadow, working_dir, timeout=_GIT_TIMEOUT * 2,
|
||||
)
|
||||
if not ok:
|
||||
logger.debug("Checkpoint git-add failed: %s", err)
|
||||
return False
|
||||
|
||||
# Check if there's anything to commit
|
||||
ok_diff, diff_out, _ = _run_git(
|
||||
["diff", "--cached", "--quiet"], shadow, working_dir,
|
||||
)
|
||||
if ok_diff:
|
||||
# No changes to commit
|
||||
logger.debug("Checkpoint skipped: no changes in %s", working_dir)
|
||||
return False
|
||||
|
||||
# Commit
|
||||
ok, _, err = _run_git(
|
||||
["commit", "-m", reason, "--allow-empty-message"],
|
||||
shadow, working_dir, timeout=_GIT_TIMEOUT * 2,
|
||||
)
|
||||
if not ok:
|
||||
logger.debug("Checkpoint commit failed: %s", err)
|
||||
return False
|
||||
|
||||
logger.debug("Checkpoint taken in %s: %s", working_dir, reason)
|
||||
|
||||
# Prune old snapshots
|
||||
self._prune(shadow, working_dir)
|
||||
|
||||
return True
|
||||
|
||||
def _prune(self, shadow_repo: Path, working_dir: str) -> None:
|
||||
"""Keep only the last max_snapshots commits via orphan reset."""
|
||||
ok, stdout, _ = _run_git(
|
||||
["rev-list", "--count", "HEAD"], shadow_repo, working_dir,
|
||||
)
|
||||
if not ok:
|
||||
return
|
||||
|
||||
try:
|
||||
count = int(stdout)
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
if count <= self.max_snapshots:
|
||||
return
|
||||
|
||||
# Get the hash of the commit at the cutoff point
|
||||
ok, cutoff_hash, _ = _run_git(
|
||||
["rev-list", "--reverse", "HEAD", "--skip=0",
|
||||
f"--max-count=1"],
|
||||
shadow_repo, working_dir,
|
||||
)
|
||||
|
||||
# For simplicity, we don't actually prune — git's pack mechanism
|
||||
# handles this efficiently, and the objects are small. The log
|
||||
# listing is already limited by max_snapshots.
|
||||
# Full pruning would require rebase --onto or filter-branch which
|
||||
# is fragile for a background feature. We just limit the log view.
|
||||
logger.debug("Checkpoint repo has %d commits (limit %d)", count, self.max_snapshots)
|
||||
|
||||
|
||||
def format_checkpoint_list(checkpoints: List[Dict], directory: str) -> str:
|
||||
"""Format checkpoint list for display to user."""
|
||||
if not checkpoints:
|
||||
return f"No checkpoints found for {directory}"
|
||||
|
||||
lines = [f"📸 Checkpoints for {directory}:\n"]
|
||||
for i, cp in enumerate(checkpoints, 1):
|
||||
# Parse ISO timestamp to something readable
|
||||
ts = cp["timestamp"]
|
||||
if "T" in ts:
|
||||
ts = ts.split("T")[1].split("+")[0].split("-")[0][:5] # HH:MM
|
||||
date = cp["timestamp"].split("T")[0]
|
||||
ts = f"{date} {ts}"
|
||||
lines.append(f" {i}. {cp['short_hash']} {ts} {cp['reason']}")
|
||||
|
||||
lines.append(f"\nUse /rollback <number> to restore, e.g. /rollback 1")
|
||||
return "\n".join(lines)
|
||||
Loading…
Add table
Add a link
Reference in a new issue