mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Three changes consolidated into the existing backup system: 1. Fix: hermes backup now uses sqlite3.Connection.backup() for .db files instead of raw file copy. Raw copy of a WAL-mode database can produce a corrupted backup — the backup() API handles this correctly. 2. hermes backup --quick: fast snapshot of just critical state files (config.yaml, state.db, .env, auth.json, cron/jobs.json, etc.) stored in ~/.hermes/state-snapshots/. Auto-prunes to 20 snapshots. 3. /snapshot slash command (alias /snap): in-session interface for quick state snapshots. create/list/restore/prune subcommands. Restore by ID or number. Powered by the same backup module. No new modules — everything lives in hermes_cli/backup.py alongside the existing full backup/import code. No hooks in run_agent.py — purely on-demand, zero runtime overhead. Closes the use case from PRs #8406 and #7813 with ~200 lines of new logic instead of a 1090-line content-addressed storage engine.
655 lines
21 KiB
Python
655 lines
21 KiB
Python
"""
|
|
Backup and import commands for hermes CLI.
|
|
|
|
`hermes backup` creates a zip archive of the entire ~/.hermes/ directory
|
|
(excluding the hermes-agent repo and transient files).
|
|
|
|
`hermes import` restores from a backup zip, overlaying onto the current
|
|
HERMES_HOME root.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import sqlite3
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import zipfile
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from hermes_constants import get_default_hermes_root, get_hermes_home, display_hermes_home
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Exclusion rules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Directory names to skip entirely (matched against each path component)
|
|
_EXCLUDED_DIRS = {
|
|
"hermes-agent", # the codebase repo — re-clone instead
|
|
"__pycache__", # bytecode caches — regenerated on import
|
|
".git", # nested git dirs (profiles shouldn't have these, but safety)
|
|
"node_modules", # js deps if website/ somehow leaks in
|
|
}
|
|
|
|
# File-name suffixes to skip
|
|
_EXCLUDED_SUFFIXES = (
|
|
".pyc",
|
|
".pyo",
|
|
)
|
|
|
|
# File names to skip (runtime state that's meaningless on another machine)
|
|
_EXCLUDED_NAMES = {
|
|
"gateway.pid",
|
|
"cron.pid",
|
|
}
|
|
|
|
|
|
def _should_exclude(rel_path: Path) -> bool:
|
|
"""Return True if *rel_path* (relative to hermes root) should be skipped."""
|
|
parts = rel_path.parts
|
|
|
|
# Any path component matches an excluded dir name
|
|
for part in parts:
|
|
if part in _EXCLUDED_DIRS:
|
|
return True
|
|
|
|
name = rel_path.name
|
|
|
|
if name in _EXCLUDED_NAMES:
|
|
return True
|
|
|
|
if name.endswith(_EXCLUDED_SUFFIXES):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SQLite safe copy
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _safe_copy_db(src: Path, dst: Path) -> bool:
|
|
"""Copy a SQLite database safely using the backup() API.
|
|
|
|
Handles WAL mode — produces a consistent snapshot even while
|
|
the DB is being written to. Falls back to raw copy on failure.
|
|
"""
|
|
try:
|
|
conn = sqlite3.connect(f"file:{src}?mode=ro", uri=True)
|
|
backup_conn = sqlite3.connect(str(dst))
|
|
conn.backup(backup_conn)
|
|
backup_conn.close()
|
|
conn.close()
|
|
return True
|
|
except Exception as exc:
|
|
logger.warning("SQLite safe copy failed for %s: %s", src, exc)
|
|
try:
|
|
shutil.copy2(src, dst)
|
|
return True
|
|
except Exception as exc2:
|
|
logger.error("Raw copy also failed for %s: %s", src, exc2)
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Backup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _format_size(nbytes: int) -> str:
|
|
"""Human-readable file size."""
|
|
for unit in ("B", "KB", "MB", "GB"):
|
|
if nbytes < 1024:
|
|
return f"{nbytes:.1f} {unit}" if unit != "B" else f"{nbytes} {unit}"
|
|
nbytes /= 1024
|
|
return f"{nbytes:.1f} TB"
|
|
|
|
|
|
def run_backup(args) -> None:
|
|
"""Create a zip backup of the Hermes home directory."""
|
|
hermes_root = get_default_hermes_root()
|
|
|
|
if not hermes_root.is_dir():
|
|
print(f"Error: Hermes home directory not found at {hermes_root}")
|
|
sys.exit(1)
|
|
|
|
# Determine output path
|
|
if args.output:
|
|
out_path = Path(args.output).expanduser().resolve()
|
|
# If user gave a directory, put the zip inside it
|
|
if out_path.is_dir():
|
|
stamp = datetime.now().strftime("%Y-%m-%d-%H%M%S")
|
|
out_path = out_path / f"hermes-backup-{stamp}.zip"
|
|
else:
|
|
stamp = datetime.now().strftime("%Y-%m-%d-%H%M%S")
|
|
out_path = Path.home() / f"hermes-backup-{stamp}.zip"
|
|
|
|
# Ensure the suffix is .zip
|
|
if out_path.suffix.lower() != ".zip":
|
|
out_path = out_path.with_suffix(out_path.suffix + ".zip")
|
|
|
|
# Ensure parent directory exists
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Collect files
|
|
print(f"Scanning {display_hermes_home()} ...")
|
|
files_to_add: list[tuple[Path, Path]] = [] # (absolute, relative)
|
|
skipped_dirs = set()
|
|
|
|
for dirpath, dirnames, filenames in os.walk(hermes_root, followlinks=False):
|
|
dp = Path(dirpath)
|
|
rel_dir = dp.relative_to(hermes_root)
|
|
|
|
# Prune excluded directories in-place so os.walk doesn't descend
|
|
orig_dirnames = dirnames[:]
|
|
dirnames[:] = [
|
|
d for d in dirnames
|
|
if d not in _EXCLUDED_DIRS
|
|
]
|
|
for removed in set(orig_dirnames) - set(dirnames):
|
|
skipped_dirs.add(str(rel_dir / removed))
|
|
|
|
for fname in filenames:
|
|
fpath = dp / fname
|
|
rel = fpath.relative_to(hermes_root)
|
|
|
|
if _should_exclude(rel):
|
|
continue
|
|
|
|
# Skip the output zip itself if it happens to be inside hermes root
|
|
try:
|
|
if fpath.resolve() == out_path.resolve():
|
|
continue
|
|
except (OSError, ValueError):
|
|
pass
|
|
|
|
files_to_add.append((fpath, rel))
|
|
|
|
if not files_to_add:
|
|
print("No files to back up.")
|
|
return
|
|
|
|
# Create the zip
|
|
file_count = len(files_to_add)
|
|
print(f"Backing up {file_count} files ...")
|
|
|
|
total_bytes = 0
|
|
errors = []
|
|
t0 = time.monotonic()
|
|
|
|
with zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED, compresslevel=6) as zf:
|
|
for i, (abs_path, rel_path) in enumerate(files_to_add, 1):
|
|
try:
|
|
# Safe copy for SQLite databases (handles WAL mode)
|
|
if abs_path.suffix == ".db":
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
|
|
tmp_db = Path(tmp.name)
|
|
if _safe_copy_db(abs_path, tmp_db):
|
|
zf.write(tmp_db, arcname=str(rel_path))
|
|
total_bytes += tmp_db.stat().st_size
|
|
tmp_db.unlink(missing_ok=True)
|
|
else:
|
|
tmp_db.unlink(missing_ok=True)
|
|
errors.append(f" {rel_path}: SQLite safe copy failed")
|
|
continue
|
|
else:
|
|
zf.write(abs_path, arcname=str(rel_path))
|
|
total_bytes += abs_path.stat().st_size
|
|
except (PermissionError, OSError) as exc:
|
|
errors.append(f" {rel_path}: {exc}")
|
|
continue
|
|
|
|
# Progress every 500 files
|
|
if i % 500 == 0:
|
|
print(f" {i}/{file_count} files ...")
|
|
|
|
elapsed = time.monotonic() - t0
|
|
zip_size = out_path.stat().st_size
|
|
|
|
# Summary
|
|
print()
|
|
print(f"Backup complete: {out_path}")
|
|
print(f" Files: {file_count}")
|
|
print(f" Original: {_format_size(total_bytes)}")
|
|
print(f" Compressed: {_format_size(zip_size)}")
|
|
print(f" Time: {elapsed:.1f}s")
|
|
|
|
if skipped_dirs:
|
|
print(f"\n Excluded directories:")
|
|
for d in sorted(skipped_dirs):
|
|
print(f" {d}/")
|
|
|
|
if errors:
|
|
print(f"\n Warnings ({len(errors)} files skipped):")
|
|
for e in errors[:10]:
|
|
print(e)
|
|
if len(errors) > 10:
|
|
print(f" ... and {len(errors) - 10} more")
|
|
|
|
print(f"\nRestore with: hermes import {out_path.name}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Import
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _validate_backup_zip(zf: zipfile.ZipFile) -> tuple[bool, str]:
|
|
"""Check that a zip looks like a Hermes backup.
|
|
|
|
Returns (ok, reason).
|
|
"""
|
|
names = zf.namelist()
|
|
if not names:
|
|
return False, "zip archive is empty"
|
|
|
|
# Look for telltale files that a hermes home would have
|
|
markers = {"config.yaml", ".env", "state.db"}
|
|
found = set()
|
|
for n in names:
|
|
# Could be at the root or one level deep (if someone zipped the directory)
|
|
basename = Path(n).name
|
|
if basename in markers:
|
|
found.add(basename)
|
|
|
|
if not found:
|
|
return False, (
|
|
"zip does not appear to be a Hermes backup "
|
|
"(no config.yaml, .env, or state databases found)"
|
|
)
|
|
|
|
return True, ""
|
|
|
|
|
|
def _detect_prefix(zf: zipfile.ZipFile) -> str:
|
|
"""Detect if the zip has a common directory prefix wrapping all entries.
|
|
|
|
Some tools zip as `.hermes/config.yaml` instead of `config.yaml`.
|
|
Returns the prefix to strip (empty string if none).
|
|
"""
|
|
names = [n for n in zf.namelist() if not n.endswith("/")]
|
|
if not names:
|
|
return ""
|
|
|
|
# Find common prefix
|
|
parts_list = [Path(n).parts for n in names]
|
|
|
|
# Check if all entries share a common first directory
|
|
first_parts = {p[0] for p in parts_list if len(p) > 1}
|
|
if len(first_parts) == 1:
|
|
prefix = first_parts.pop()
|
|
# Only strip if it looks like a hermes dir name
|
|
if prefix in (".hermes", "hermes"):
|
|
return prefix + "/"
|
|
|
|
return ""
|
|
|
|
|
|
def run_import(args) -> None:
|
|
"""Restore a Hermes backup from a zip file."""
|
|
zip_path = Path(args.zipfile).expanduser().resolve()
|
|
|
|
if not zip_path.is_file():
|
|
print(f"Error: File not found: {zip_path}")
|
|
sys.exit(1)
|
|
|
|
if not zipfile.is_zipfile(zip_path):
|
|
print(f"Error: Not a valid zip file: {zip_path}")
|
|
sys.exit(1)
|
|
|
|
hermes_root = get_default_hermes_root()
|
|
|
|
with zipfile.ZipFile(zip_path, "r") as zf:
|
|
# Validate
|
|
ok, reason = _validate_backup_zip(zf)
|
|
if not ok:
|
|
print(f"Error: {reason}")
|
|
sys.exit(1)
|
|
|
|
prefix = _detect_prefix(zf)
|
|
members = [n for n in zf.namelist() if not n.endswith("/")]
|
|
file_count = len(members)
|
|
|
|
print(f"Backup contains {file_count} files")
|
|
print(f"Target: {display_hermes_home()}")
|
|
|
|
if prefix:
|
|
print(f"Detected archive prefix: {prefix!r} (will be stripped)")
|
|
|
|
# Check for existing installation
|
|
has_config = (hermes_root / "config.yaml").exists()
|
|
has_env = (hermes_root / ".env").exists()
|
|
|
|
if (has_config or has_env) and not args.force:
|
|
print()
|
|
print("Warning: Target directory already has Hermes configuration.")
|
|
print("Importing will overwrite existing files with backup contents.")
|
|
print()
|
|
try:
|
|
answer = input("Continue? [y/N] ").strip().lower()
|
|
except (EOFError, KeyboardInterrupt):
|
|
print("\nAborted.")
|
|
sys.exit(1)
|
|
if answer not in ("y", "yes"):
|
|
print("Aborted.")
|
|
return
|
|
|
|
# Extract
|
|
print(f"\nImporting {file_count} files ...")
|
|
hermes_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
errors = []
|
|
restored = 0
|
|
t0 = time.monotonic()
|
|
|
|
for member in members:
|
|
# Strip prefix if detected
|
|
if prefix and member.startswith(prefix):
|
|
rel = member[len(prefix):]
|
|
else:
|
|
rel = member
|
|
|
|
if not rel:
|
|
continue
|
|
|
|
target = hermes_root / rel
|
|
|
|
# Security: reject absolute paths and traversals
|
|
try:
|
|
target.resolve().relative_to(hermes_root.resolve())
|
|
except ValueError:
|
|
errors.append(f" {rel}: path traversal blocked")
|
|
continue
|
|
|
|
try:
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
with zf.open(member) as src, open(target, "wb") as dst:
|
|
dst.write(src.read())
|
|
restored += 1
|
|
except (PermissionError, OSError) as exc:
|
|
errors.append(f" {rel}: {exc}")
|
|
|
|
if restored % 500 == 0:
|
|
print(f" {restored}/{file_count} files ...")
|
|
|
|
elapsed = time.monotonic() - t0
|
|
|
|
# Summary
|
|
print()
|
|
print(f"Import complete: {restored} files restored in {elapsed:.1f}s")
|
|
print(f" Target: {display_hermes_home()}")
|
|
|
|
if errors:
|
|
print(f"\n Warnings ({len(errors)} files skipped):")
|
|
for e in errors[:10]:
|
|
print(e)
|
|
if len(errors) > 10:
|
|
print(f" ... and {len(errors) - 10} more")
|
|
|
|
# Post-import: restore profile wrapper scripts
|
|
profiles_dir = hermes_root / "profiles"
|
|
restored_profiles = []
|
|
if profiles_dir.is_dir():
|
|
try:
|
|
from hermes_cli.profiles import (
|
|
create_wrapper_script, check_alias_collision,
|
|
_is_wrapper_dir_in_path, _get_wrapper_dir,
|
|
)
|
|
for entry in sorted(profiles_dir.iterdir()):
|
|
if not entry.is_dir():
|
|
continue
|
|
profile_name = entry.name
|
|
# Only create wrappers for directories with config
|
|
if not (entry / "config.yaml").exists() and not (entry / ".env").exists():
|
|
continue
|
|
collision = check_alias_collision(profile_name)
|
|
if collision:
|
|
print(f" Skipped alias '{profile_name}': {collision}")
|
|
restored_profiles.append((profile_name, False))
|
|
else:
|
|
wrapper = create_wrapper_script(profile_name)
|
|
restored_profiles.append((profile_name, wrapper is not None))
|
|
|
|
if restored_profiles:
|
|
created = [n for n, ok in restored_profiles if ok]
|
|
skipped = [n for n, ok in restored_profiles if not ok]
|
|
if created:
|
|
print(f"\n Profile aliases restored: {', '.join(created)}")
|
|
if skipped:
|
|
print(f" Profile aliases skipped: {', '.join(skipped)}")
|
|
if not _is_wrapper_dir_in_path():
|
|
print(f"\n Note: {_get_wrapper_dir()} is not in your PATH.")
|
|
print(' Add to your shell config (~/.bashrc or ~/.zshrc):')
|
|
print(' export PATH="$HOME/.local/bin:$PATH"')
|
|
except ImportError:
|
|
# hermes_cli.profiles might not be available (fresh install)
|
|
if any(profiles_dir.iterdir()):
|
|
print(f"\n Profiles detected but aliases could not be created.")
|
|
print(f" Run: hermes profile list (after installing hermes)")
|
|
|
|
# Guidance
|
|
print()
|
|
if not (hermes_root / "hermes-agent").is_dir():
|
|
print("Note: The hermes-agent codebase was not included in the backup.")
|
|
print(" If this is a fresh install, run: hermes update")
|
|
|
|
if restored_profiles:
|
|
gw_profiles = [n for n, _ in restored_profiles]
|
|
print("\nTo re-enable gateway services for profiles:")
|
|
for pname in gw_profiles:
|
|
print(f" hermes -p {pname} gateway install")
|
|
|
|
print("Done. Your Hermes configuration has been restored.")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Quick state snapshots (used by /snapshot slash command and hermes backup --quick)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Critical state files to include in quick snapshots (relative to HERMES_HOME).
|
|
# Everything else is either regeneratable (logs, cache) or managed separately
|
|
# (skills, repo, sessions/).
|
|
_QUICK_STATE_FILES = (
|
|
"state.db",
|
|
"config.yaml",
|
|
".env",
|
|
"auth.json",
|
|
"cron/jobs.json",
|
|
"gateway_state.json",
|
|
"channel_directory.json",
|
|
"processes.json",
|
|
)
|
|
|
|
_QUICK_SNAPSHOTS_DIR = "state-snapshots"
|
|
_QUICK_DEFAULT_KEEP = 20
|
|
|
|
|
|
def _quick_snapshot_root(hermes_home: Optional[Path] = None) -> Path:
|
|
home = hermes_home or get_hermes_home()
|
|
return home / _QUICK_SNAPSHOTS_DIR
|
|
|
|
|
|
def create_quick_snapshot(
|
|
label: Optional[str] = None,
|
|
hermes_home: Optional[Path] = None,
|
|
) -> Optional[str]:
|
|
"""Create a quick state snapshot of critical files.
|
|
|
|
Copies STATE_FILES to a timestamped directory under state-snapshots/.
|
|
Auto-prunes old snapshots beyond the keep limit.
|
|
|
|
Returns:
|
|
Snapshot ID (timestamp-based), or None if no files found.
|
|
"""
|
|
home = hermes_home or get_hermes_home()
|
|
root = _quick_snapshot_root(home)
|
|
|
|
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
|
|
snap_id = f"{ts}-{label}" if label else ts
|
|
snap_dir = root / snap_id
|
|
snap_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
manifest: Dict[str, int] = {} # rel_path -> file size
|
|
|
|
for rel in _QUICK_STATE_FILES:
|
|
src = home / rel
|
|
if not src.exists() or not src.is_file():
|
|
continue
|
|
|
|
dst = snap_dir / rel
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
if src.suffix == ".db":
|
|
if not _safe_copy_db(src, dst):
|
|
continue
|
|
else:
|
|
shutil.copy2(src, dst)
|
|
manifest[rel] = dst.stat().st_size
|
|
except (OSError, PermissionError) as exc:
|
|
logger.warning("Could not snapshot %s: %s", rel, exc)
|
|
|
|
if not manifest:
|
|
shutil.rmtree(snap_dir, ignore_errors=True)
|
|
return None
|
|
|
|
# Write manifest
|
|
meta = {
|
|
"id": snap_id,
|
|
"timestamp": ts,
|
|
"label": label,
|
|
"file_count": len(manifest),
|
|
"total_size": sum(manifest.values()),
|
|
"files": manifest,
|
|
}
|
|
with open(snap_dir / "manifest.json", "w") as f:
|
|
json.dump(meta, f, indent=2)
|
|
|
|
# Auto-prune
|
|
_prune_quick_snapshots(root, keep=_QUICK_DEFAULT_KEEP)
|
|
|
|
logger.info("State snapshot created: %s (%d files)", snap_id, len(manifest))
|
|
return snap_id
|
|
|
|
|
|
def list_quick_snapshots(
|
|
limit: int = 20,
|
|
hermes_home: Optional[Path] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""List existing quick state snapshots, most recent first."""
|
|
root = _quick_snapshot_root(hermes_home)
|
|
if not root.exists():
|
|
return []
|
|
|
|
results = []
|
|
for d in sorted(root.iterdir(), reverse=True):
|
|
if not d.is_dir():
|
|
continue
|
|
manifest_path = d / "manifest.json"
|
|
if manifest_path.exists():
|
|
try:
|
|
with open(manifest_path) as f:
|
|
results.append(json.load(f))
|
|
except (json.JSONDecodeError, OSError):
|
|
results.append({"id": d.name, "file_count": 0, "total_size": 0})
|
|
if len(results) >= limit:
|
|
break
|
|
|
|
return results
|
|
|
|
|
|
def restore_quick_snapshot(
|
|
snapshot_id: str,
|
|
hermes_home: Optional[Path] = None,
|
|
) -> bool:
|
|
"""Restore state from a quick snapshot.
|
|
|
|
Overwrites current state files with the snapshot's copies.
|
|
Returns True if at least one file was restored.
|
|
"""
|
|
home = hermes_home or get_hermes_home()
|
|
root = _quick_snapshot_root(home)
|
|
snap_dir = root / snapshot_id
|
|
|
|
if not snap_dir.is_dir():
|
|
return False
|
|
|
|
manifest_path = snap_dir / "manifest.json"
|
|
if not manifest_path.exists():
|
|
return False
|
|
|
|
with open(manifest_path) as f:
|
|
meta = json.load(f)
|
|
|
|
restored = 0
|
|
for rel in meta.get("files", {}):
|
|
src = snap_dir / rel
|
|
if not src.exists():
|
|
continue
|
|
|
|
dst = home / rel
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
if dst.suffix == ".db":
|
|
# Atomic-ish replace for databases
|
|
tmp = dst.parent / f".{dst.name}.snap_restore"
|
|
shutil.copy2(src, tmp)
|
|
dst.unlink(missing_ok=True)
|
|
shutil.move(str(tmp), str(dst))
|
|
else:
|
|
shutil.copy2(src, dst)
|
|
restored += 1
|
|
except (OSError, PermissionError) as exc:
|
|
logger.error("Failed to restore %s: %s", rel, exc)
|
|
|
|
logger.info("Restored %d files from snapshot %s", restored, snapshot_id)
|
|
return restored > 0
|
|
|
|
|
|
def _prune_quick_snapshots(root: Path, keep: int = _QUICK_DEFAULT_KEEP) -> int:
|
|
"""Remove oldest quick snapshots beyond the keep limit. Returns count deleted."""
|
|
if not root.exists():
|
|
return 0
|
|
|
|
dirs = sorted(
|
|
(d for d in root.iterdir() if d.is_dir()),
|
|
key=lambda d: d.name,
|
|
reverse=True,
|
|
)
|
|
|
|
deleted = 0
|
|
for d in dirs[keep:]:
|
|
try:
|
|
shutil.rmtree(d)
|
|
deleted += 1
|
|
except OSError as exc:
|
|
logger.warning("Failed to prune snapshot %s: %s", d.name, exc)
|
|
|
|
return deleted
|
|
|
|
|
|
def prune_quick_snapshots(
|
|
keep: int = _QUICK_DEFAULT_KEEP,
|
|
hermes_home: Optional[Path] = None,
|
|
) -> int:
|
|
"""Manually prune quick snapshots. Returns count deleted."""
|
|
return _prune_quick_snapshots(_quick_snapshot_root(hermes_home), keep=keep)
|
|
|
|
|
|
def run_quick_backup(args) -> None:
|
|
"""CLI entry point for hermes backup --quick."""
|
|
label = getattr(args, "label", None)
|
|
snap_id = create_quick_snapshot(label=label)
|
|
if snap_id:
|
|
print(f"State snapshot created: {snap_id}")
|
|
snaps = list_quick_snapshots()
|
|
print(f" {len(snaps)} snapshot(s) stored in {display_hermes_home()}/state-snapshots/")
|
|
print(f" Restore with: /snapshot restore {snap_id}")
|
|
else:
|
|
print("No state files found to snapshot.")
|