mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
The original name was cute but non-obvious; disk-cleanup says what it does. Plugin directory, script, state path, log lines, slash command, and test module all renamed. No user-visible state exists yet, so no migration path is needed. New website page "Built-in Plugins" documents the <repo>/plugins/<name>/ source, how discovery interacts with user/project plugins, the HERMES_DISABLE_BUNDLED_PLUGINS escape hatch, disk-cleanup's hook behaviour and deletion rules, and guidance on when a plugin belongs bundled vs. user-installable. Added to the Features → Core sidebar next to the main Plugins page, with a cross-reference from plugins.md.
496 lines
16 KiB
Python
Executable file
496 lines
16 KiB
Python
Executable file
"""disk_cleanup — ephemeral file cleanup for Hermes Agent.
|
|
|
|
Library module wrapping the deterministic cleanup rules written by
|
|
@LVT382009 in PR #12212. The plugin ``__init__.py`` wires these
|
|
functions into ``post_tool_call`` and ``on_session_end`` hooks so
|
|
tracking and cleanup happen automatically — the agent never needs to
|
|
call a tool or remember a skill.
|
|
|
|
Rules:
|
|
- test files → delete immediately at task end (age >= 0)
|
|
- temp files → delete after 7 days
|
|
- cron-output → delete after 14 days
|
|
- empty dirs → always delete (under HERMES_HOME)
|
|
- research → keep 10 newest, prompt for older (deep only)
|
|
- chrome-profile→ prompt after 14 days (deep only)
|
|
- >500 MB files → prompt always (deep only)
|
|
|
|
Scope: strictly HERMES_HOME and /tmp/hermes-*
|
|
Never touches: ~/.hermes/logs/ or any system directory.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import shutil
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
try:
|
|
from hermes_constants import get_hermes_home
|
|
except Exception: # pragma: no cover — plugin may load before constants resolves
|
|
import os
|
|
|
|
def get_hermes_home() -> Path: # type: ignore[no-redef]
|
|
val = (os.environ.get("HERMES_HOME") or "").strip()
|
|
return Path(val).resolve() if val else (Path.home() / ".hermes").resolve()
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Paths
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_state_dir() -> Path:
|
|
"""State dir — separate from ``$HERMES_HOME/logs/``."""
|
|
return get_hermes_home() / "disk-cleanup"
|
|
|
|
|
|
def get_tracked_file() -> Path:
|
|
return get_state_dir() / "tracked.json"
|
|
|
|
|
|
def get_log_file() -> Path:
|
|
"""Audit log — intentionally NOT under ``$HERMES_HOME/logs/``."""
|
|
return get_state_dir() / "cleanup.log"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Path safety
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def is_safe_path(path: Path) -> bool:
|
|
"""Accept only paths under HERMES_HOME or ``/tmp/hermes-*``.
|
|
|
|
Rejects Windows mounts (``/mnt/c`` etc.) and any system directory.
|
|
"""
|
|
hermes_home = get_hermes_home()
|
|
try:
|
|
path.resolve().relative_to(hermes_home)
|
|
return True
|
|
except (ValueError, OSError):
|
|
pass
|
|
# Allow /tmp/hermes-* explicitly
|
|
parts = path.parts
|
|
if len(parts) >= 3 and parts[1] == "tmp" and parts[2].startswith("hermes-"):
|
|
return True
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Audit log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _log(message: str) -> None:
|
|
try:
|
|
log_file = get_log_file()
|
|
log_file.parent.mkdir(parents=True, exist_ok=True)
|
|
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
|
with open(log_file, "a") as f:
|
|
f.write(f"[{ts}] {message}\n")
|
|
except OSError:
|
|
# Never let the audit log break the agent loop.
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# tracked.json — atomic read/write, backup scoped to tracked.json only
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_tracked() -> List[Dict[str, Any]]:
|
|
"""Load tracked.json. Restores from ``.bak`` on corruption."""
|
|
tf = get_tracked_file()
|
|
tf.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
if not tf.exists():
|
|
return []
|
|
|
|
try:
|
|
return json.loads(tf.read_text())
|
|
except (json.JSONDecodeError, ValueError):
|
|
bak = tf.with_suffix(".json.bak")
|
|
if bak.exists():
|
|
try:
|
|
data = json.loads(bak.read_text())
|
|
_log("WARN: tracked.json corrupted — restored from .bak")
|
|
return data
|
|
except Exception:
|
|
pass
|
|
_log("WARN: tracked.json corrupted, no backup — starting fresh")
|
|
return []
|
|
|
|
|
|
def save_tracked(tracked: List[Dict[str, Any]]) -> None:
|
|
"""Atomic write: ``.tmp`` → backup old → rename."""
|
|
tf = get_tracked_file()
|
|
tf.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = tf.with_suffix(".json.tmp")
|
|
tmp.write_text(json.dumps(tracked, indent=2))
|
|
if tf.exists():
|
|
shutil.copy2(tf, tf.with_suffix(".json.bak"))
|
|
tmp.replace(tf)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Categories
|
|
# ---------------------------------------------------------------------------
|
|
|
|
ALLOWED_CATEGORIES = {
|
|
"temp", "test", "research", "download",
|
|
"chrome-profile", "cron-output", "other",
|
|
}
|
|
|
|
|
|
def fmt_size(n: float) -> str:
|
|
for unit in ("B", "KB", "MB", "GB", "TB"):
|
|
if n < 1024:
|
|
return f"{n:.1f} {unit}"
|
|
n /= 1024
|
|
return f"{n:.1f} PB"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Track / forget
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def track(path_str: str, category: str, silent: bool = False) -> bool:
|
|
"""Register a file for tracking. Returns True if newly tracked."""
|
|
if category not in ALLOWED_CATEGORIES:
|
|
_log(f"WARN: unknown category '{category}', using 'other'")
|
|
category = "other"
|
|
|
|
path = Path(path_str).resolve()
|
|
|
|
if not path.exists():
|
|
_log(f"SKIP: {path} (does not exist)")
|
|
return False
|
|
|
|
if not is_safe_path(path):
|
|
_log(f"REJECT: {path} (outside HERMES_HOME)")
|
|
return False
|
|
|
|
size = path.stat().st_size if path.is_file() else 0
|
|
tracked = load_tracked()
|
|
|
|
# Deduplicate
|
|
if any(item["path"] == str(path) for item in tracked):
|
|
return False
|
|
|
|
tracked.append({
|
|
"path": str(path),
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"category": category,
|
|
"size": size,
|
|
})
|
|
save_tracked(tracked)
|
|
_log(f"TRACKED: {path} ({category}, {fmt_size(size)})")
|
|
if not silent:
|
|
print(f"Tracked: {path} ({category}, {fmt_size(size)})")
|
|
return True
|
|
|
|
|
|
def forget(path_str: str) -> int:
|
|
"""Remove a path from tracking without deleting the file."""
|
|
p = Path(path_str).resolve()
|
|
tracked = load_tracked()
|
|
before = len(tracked)
|
|
tracked = [i for i in tracked if Path(i["path"]).resolve() != p]
|
|
removed = before - len(tracked)
|
|
if removed:
|
|
save_tracked(tracked)
|
|
_log(f"FORGOT: {p} ({removed} entries)")
|
|
return removed
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dry run
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def dry_run() -> Tuple[List[Dict], List[Dict]]:
|
|
"""Return (auto_delete_list, needs_prompt_list) without touching files."""
|
|
tracked = load_tracked()
|
|
now = datetime.now(timezone.utc)
|
|
|
|
auto: List[Dict] = []
|
|
prompt: List[Dict] = []
|
|
|
|
for item in tracked:
|
|
p = Path(item["path"])
|
|
if not p.exists():
|
|
continue
|
|
age = (now - datetime.fromisoformat(item["timestamp"])).days
|
|
cat = item["category"]
|
|
size = item["size"]
|
|
|
|
if cat == "test":
|
|
auto.append(item)
|
|
elif cat == "temp" and age > 7:
|
|
auto.append(item)
|
|
elif cat == "cron-output" and age > 14:
|
|
auto.append(item)
|
|
elif cat == "research" and age > 30:
|
|
prompt.append(item)
|
|
elif cat == "chrome-profile" and age > 14:
|
|
prompt.append(item)
|
|
elif size > 500 * 1024 * 1024:
|
|
prompt.append(item)
|
|
|
|
return auto, prompt
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Quick cleanup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def quick() -> Dict[str, Any]:
|
|
"""Safe deterministic cleanup — no prompts.
|
|
|
|
Returns: ``{"deleted": N, "empty_dirs": N, "freed": bytes,
|
|
"errors": [str, ...]}``.
|
|
"""
|
|
tracked = load_tracked()
|
|
now = datetime.now(timezone.utc)
|
|
deleted = 0
|
|
freed = 0
|
|
new_tracked: List[Dict] = []
|
|
errors: List[str] = []
|
|
|
|
for item in tracked:
|
|
p = Path(item["path"])
|
|
cat = item["category"]
|
|
|
|
if not p.exists():
|
|
_log(f"STALE: {p} (removed from tracking)")
|
|
continue
|
|
|
|
age = (now - datetime.fromisoformat(item["timestamp"])).days
|
|
|
|
should_delete = (
|
|
cat == "test"
|
|
or (cat == "temp" and age > 7)
|
|
or (cat == "cron-output" and age > 14)
|
|
)
|
|
|
|
if should_delete:
|
|
try:
|
|
if p.is_file():
|
|
p.unlink()
|
|
elif p.is_dir():
|
|
shutil.rmtree(p)
|
|
freed += item["size"]
|
|
deleted += 1
|
|
_log(f"DELETED: {p} ({cat}, {fmt_size(item['size'])})")
|
|
except OSError as e:
|
|
_log(f"ERROR deleting {p}: {e}")
|
|
errors.append(f"{p}: {e}")
|
|
new_tracked.append(item)
|
|
else:
|
|
new_tracked.append(item)
|
|
|
|
# Remove empty dirs under HERMES_HOME (but leave HERMES_HOME itself and
|
|
# a short list of well-known top-level state dirs alone — a fresh install
|
|
# has these empty, and deleting them would surprise the user).
|
|
hermes_home = get_hermes_home()
|
|
_PROTECTED_TOP_LEVEL = {
|
|
"logs", "memories", "sessions", "cron", "cronjobs",
|
|
"cache", "skills", "plugins", "disk-cleanup", "optional-skills",
|
|
"hermes-agent", "backups", "profiles", ".worktrees",
|
|
}
|
|
empty_removed = 0
|
|
try:
|
|
for dirpath in sorted(hermes_home.rglob("*"), reverse=True):
|
|
if not dirpath.is_dir() or dirpath == hermes_home:
|
|
continue
|
|
try:
|
|
rel_parts = dirpath.relative_to(hermes_home).parts
|
|
except ValueError:
|
|
continue
|
|
# Skip the well-known top-level state dirs themselves.
|
|
if len(rel_parts) == 1 and rel_parts[0] in _PROTECTED_TOP_LEVEL:
|
|
continue
|
|
try:
|
|
if not any(dirpath.iterdir()):
|
|
dirpath.rmdir()
|
|
empty_removed += 1
|
|
_log(f"DELETED: {dirpath} (empty dir)")
|
|
except OSError:
|
|
pass
|
|
except OSError:
|
|
pass
|
|
|
|
save_tracked(new_tracked)
|
|
_log(
|
|
f"QUICK_SUMMARY: {deleted} files, {empty_removed} dirs, "
|
|
f"{fmt_size(freed)}"
|
|
)
|
|
return {
|
|
"deleted": deleted,
|
|
"empty_dirs": empty_removed,
|
|
"freed": freed,
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Deep cleanup (interactive — not called from plugin hooks)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def deep(
|
|
confirm: Optional[callable] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Deep cleanup.
|
|
|
|
Runs :func:`quick` first, then asks the *confirm* callable for each
|
|
risky item (research > 30d beyond 10 newest, chrome-profile > 14d,
|
|
any file > 500 MB). *confirm(item)* must return True to delete.
|
|
|
|
Returns: ``{"quick": {...}, "deep_deleted": N, "deep_freed": bytes}``.
|
|
"""
|
|
quick_result = quick()
|
|
|
|
if confirm is None:
|
|
# No interactive confirmer — deep stops after the quick pass.
|
|
return {"quick": quick_result, "deep_deleted": 0, "deep_freed": 0}
|
|
|
|
tracked = load_tracked()
|
|
now = datetime.now(timezone.utc)
|
|
research, chrome, large = [], [], []
|
|
|
|
for item in tracked:
|
|
p = Path(item["path"])
|
|
if not p.exists():
|
|
continue
|
|
age = (now - datetime.fromisoformat(item["timestamp"])).days
|
|
cat = item["category"]
|
|
|
|
if cat == "research" and age > 30:
|
|
research.append(item)
|
|
elif cat == "chrome-profile" and age > 14:
|
|
chrome.append(item)
|
|
elif item["size"] > 500 * 1024 * 1024:
|
|
large.append(item)
|
|
|
|
research.sort(key=lambda x: x["timestamp"], reverse=True)
|
|
old_research = research[10:]
|
|
|
|
freed, count = 0, 0
|
|
to_remove: List[Dict] = []
|
|
|
|
for group in (old_research, chrome, large):
|
|
for item in group:
|
|
if confirm(item):
|
|
try:
|
|
p = Path(item["path"])
|
|
if p.is_file():
|
|
p.unlink()
|
|
elif p.is_dir():
|
|
shutil.rmtree(p)
|
|
to_remove.append(item)
|
|
freed += item["size"]
|
|
count += 1
|
|
_log(
|
|
f"DELETED: {p} ({item['category']}, "
|
|
f"{fmt_size(item['size'])})"
|
|
)
|
|
except OSError as e:
|
|
_log(f"ERROR deleting {item['path']}: {e}")
|
|
|
|
if to_remove:
|
|
remove_paths = {i["path"] for i in to_remove}
|
|
save_tracked([i for i in tracked if i["path"] not in remove_paths])
|
|
|
|
return {"quick": quick_result, "deep_deleted": count, "deep_freed": freed}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def status() -> Dict[str, Any]:
|
|
"""Return per-category breakdown and top 10 largest tracked files."""
|
|
tracked = load_tracked()
|
|
cats: Dict[str, Dict] = {}
|
|
for item in tracked:
|
|
c = item["category"]
|
|
cats.setdefault(c, {"count": 0, "size": 0})
|
|
cats[c]["count"] += 1
|
|
cats[c]["size"] += item["size"]
|
|
|
|
existing = [
|
|
(i["path"], i["size"], i["category"])
|
|
for i in tracked if Path(i["path"]).exists()
|
|
]
|
|
existing.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
return {
|
|
"categories": cats,
|
|
"top10": existing[:10],
|
|
"total_tracked": len(tracked),
|
|
}
|
|
|
|
|
|
def format_status(s: Dict[str, Any]) -> str:
|
|
"""Human-readable status string (for slash command output)."""
|
|
lines = [f"{'Category':<20} {'Files':>6} {'Size':>10}", "-" * 40]
|
|
cats = s["categories"]
|
|
for cat, d in sorted(cats.items(), key=lambda x: x[1]["size"], reverse=True):
|
|
lines.append(f"{cat:<20} {d['count']:>6} {fmt_size(d['size']):>10}")
|
|
|
|
if not cats:
|
|
lines.append("(nothing tracked yet)")
|
|
|
|
lines.append("")
|
|
lines.append("Top 10 largest tracked files:")
|
|
if not s["top10"]:
|
|
lines.append(" (none)")
|
|
else:
|
|
for rank, (path, size, cat) in enumerate(s["top10"], 1):
|
|
lines.append(f" {rank:>2}. {fmt_size(size):>8} [{cat}] {path}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auto-categorisation from tool-call inspection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_TEST_PATTERNS = ("test_", "tmp_")
|
|
_TEST_SUFFIXES = (".test.py", ".test.js", ".test.ts", ".test.md")
|
|
|
|
|
|
def guess_category(path: Path) -> Optional[str]:
|
|
"""Return a category label for *path*, or None if we shouldn't track it.
|
|
|
|
Used by the ``post_tool_call`` hook to auto-track ephemeral files.
|
|
"""
|
|
if not is_safe_path(path):
|
|
return None
|
|
|
|
# Skip the state dir itself, logs, memory files, sessions, config.
|
|
hermes_home = get_hermes_home()
|
|
try:
|
|
rel = path.resolve().relative_to(hermes_home)
|
|
top = rel.parts[0] if rel.parts else ""
|
|
if top in {
|
|
"disk-cleanup", "logs", "memories", "sessions", "config.yaml",
|
|
"skills", "plugins", ".env", "USER.md", "MEMORY.md", "SOUL.md",
|
|
"auth.json", "hermes-agent",
|
|
}:
|
|
return None
|
|
if top == "cron" or top == "cronjobs":
|
|
return "cron-output"
|
|
if top == "cache":
|
|
return "temp"
|
|
except ValueError:
|
|
# Path isn't under HERMES_HOME (e.g. /tmp/hermes-*) — fall through.
|
|
pass
|
|
|
|
name = path.name
|
|
if name.startswith(_TEST_PATTERNS):
|
|
return "test"
|
|
if any(name.endswith(sfx) for sfx in _TEST_SUFFIXES):
|
|
return "test"
|
|
return None
|