hermes-agent/plugins/disk-cleanup/disk_cleanup.py

521 lines
17 KiB
Python
Executable file

"""disk_cleanup — ephemeral file cleanup for Hermes Agent.
Library module wrapping the deterministic cleanup rules written by
@LVT382009 in PR #12212. The plugin ``__init__.py`` wires these
functions into ``post_tool_call`` and ``on_session_end`` hooks so
tracking and cleanup happen automatically — the agent never needs to
call a tool or remember a skill.
Rules:
- test files → delete immediately at task end (age >= 0)
- temp files → delete after 7 days
- cron-output → delete after 14 days
- empty dirs → delete when safe under HERMES_HOME (excluding checkpoint shadow-repo dirs and git internals)
- research → keep 10 newest, prompt for older (deep only)
- chrome-profile→ prompt after 14 days (deep only)
- >500 MB files → prompt always (deep only)
Scope: strictly HERMES_HOME and /tmp/hermes-*
Never touches: ~/.hermes/logs/ or any system directory.
"""
from __future__ import annotations
import json
import logging
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
try:
from hermes_constants import get_hermes_home
except Exception: # pragma: no cover — plugin may load before constants resolves
import os
def get_hermes_home() -> Path: # type: ignore[no-redef]
val = (os.environ.get("HERMES_HOME") or "").strip()
return Path(val).resolve() if val else (Path.home() / ".hermes").resolve()
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
def get_state_dir() -> Path:
"""State dir — separate from ``$HERMES_HOME/logs/``."""
return get_hermes_home() / "disk-cleanup"
def get_tracked_file() -> Path:
return get_state_dir() / "tracked.json"
def get_log_file() -> Path:
"""Audit log — intentionally NOT under ``$HERMES_HOME/logs/``."""
return get_state_dir() / "cleanup.log"
# ---------------------------------------------------------------------------
# Path safety
# ---------------------------------------------------------------------------
def is_safe_path(path: Path) -> bool:
"""Accept only paths under HERMES_HOME or ``/tmp/hermes-*``.
Rejects Windows mounts (``/mnt/c`` etc.) and any system directory.
"""
hermes_home = get_hermes_home()
try:
path.resolve().relative_to(hermes_home)
return True
except (ValueError, OSError):
pass
# Allow /tmp/hermes-* explicitly
parts = path.parts
if len(parts) >= 3 and parts[1] == "tmp" and parts[2].startswith("hermes-"):
return True
return False
# ---------------------------------------------------------------------------
# Audit log
# ---------------------------------------------------------------------------
def _log(message: str) -> None:
try:
log_file = get_log_file()
log_file.parent.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
with open(log_file, "a") as f:
f.write(f"[{ts}] {message}\n")
except OSError:
# Never let the audit log break the agent loop.
pass
# ---------------------------------------------------------------------------
# tracked.json — atomic read/write, backup scoped to tracked.json only
# ---------------------------------------------------------------------------
def load_tracked() -> List[Dict[str, Any]]:
"""Load tracked.json. Restores from ``.bak`` on corruption."""
tf = get_tracked_file()
tf.parent.mkdir(parents=True, exist_ok=True)
if not tf.exists():
return []
try:
return json.loads(tf.read_text())
except (json.JSONDecodeError, ValueError):
bak = tf.with_suffix(".json.bak")
if bak.exists():
try:
data = json.loads(bak.read_text())
_log("WARN: tracked.json corrupted — restored from .bak")
return data
except Exception:
pass
_log("WARN: tracked.json corrupted, no backup — starting fresh")
return []
def save_tracked(tracked: List[Dict[str, Any]]) -> None:
"""Atomic write: ``.tmp`` → backup old → rename."""
tf = get_tracked_file()
tf.parent.mkdir(parents=True, exist_ok=True)
tmp = tf.with_suffix(".json.tmp")
tmp.write_text(json.dumps(tracked, indent=2))
if tf.exists():
shutil.copy2(tf, tf.with_suffix(".json.bak"))
tmp.replace(tf)
# ---------------------------------------------------------------------------
# Categories
# ---------------------------------------------------------------------------
ALLOWED_CATEGORIES = {
"temp", "test", "research", "download",
"chrome-profile", "cron-output", "other",
}
def _is_protected_empty_dir(dirpath: Path, hermes_home: Path) -> bool:
"""Return True when an empty-dir cleanup candidate must be preserved.
This protects git internals for both Hermes checkpoint shadow repos
(stored under ``checkpoints/<id>/...``) and ordinary repos living under
``HERMES_HOME`` (stored under ``.git/...``).
"""
try:
rel_parts = dirpath.relative_to(hermes_home).parts
except ValueError:
return False
if ".git" in rel_parts and rel_parts[-1] != ".git":
return True
if len(rel_parts) >= 2 and rel_parts[0] == "checkpoints":
repo_root = hermes_home / rel_parts[0] / rel_parts[1]
if any((repo_root / marker).exists() for marker in ("HEAD", "HERMES_WORKDIR", "config")):
return True
return False
def fmt_size(n: float) -> str:
for unit in ("B", "KB", "MB", "GB", "TB"):
if n < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} PB"
# ---------------------------------------------------------------------------
# Track / forget
# ---------------------------------------------------------------------------
def track(path_str: str, category: str, silent: bool = False) -> bool:
"""Register a file for tracking. Returns True if newly tracked."""
if category not in ALLOWED_CATEGORIES:
_log(f"WARN: unknown category '{category}', using 'other'")
category = "other"
path = Path(path_str).resolve()
if not path.exists():
_log(f"SKIP: {path} (does not exist)")
return False
if not is_safe_path(path):
_log(f"REJECT: {path} (outside HERMES_HOME)")
return False
size = path.stat().st_size if path.is_file() else 0
tracked = load_tracked()
# Deduplicate
if any(item["path"] == str(path) for item in tracked):
return False
tracked.append({
"path": str(path),
"timestamp": datetime.now(timezone.utc).isoformat(),
"category": category,
"size": size,
})
save_tracked(tracked)
_log(f"TRACKED: {path} ({category}, {fmt_size(size)})")
if not silent:
print(f"Tracked: {path} ({category}, {fmt_size(size)})")
return True
def forget(path_str: str) -> int:
"""Remove a path from tracking without deleting the file."""
p = Path(path_str).resolve()
tracked = load_tracked()
before = len(tracked)
tracked = [i for i in tracked if Path(i["path"]).resolve() != p]
removed = before - len(tracked)
if removed:
save_tracked(tracked)
_log(f"FORGOT: {p} ({removed} entries)")
return removed
# ---------------------------------------------------------------------------
# Dry run
# ---------------------------------------------------------------------------
def dry_run() -> Tuple[List[Dict], List[Dict]]:
"""Return (auto_delete_list, needs_prompt_list) without touching files."""
tracked = load_tracked()
now = datetime.now(timezone.utc)
auto: List[Dict] = []
prompt: List[Dict] = []
for item in tracked:
p = Path(item["path"])
if not p.exists():
continue
age = (now - datetime.fromisoformat(item["timestamp"])).days
cat = item["category"]
size = item["size"]
if cat == "test":
auto.append(item)
elif cat == "temp" and age > 7:
auto.append(item)
elif cat == "cron-output" and age > 14:
auto.append(item)
elif cat == "research" and age > 30:
prompt.append(item)
elif cat == "chrome-profile" and age > 14:
prompt.append(item)
elif size > 500 * 1024 * 1024:
prompt.append(item)
return auto, prompt
# ---------------------------------------------------------------------------
# Quick cleanup
# ---------------------------------------------------------------------------
def quick() -> Dict[str, Any]:
"""Safe deterministic cleanup — no prompts.
Returns: ``{"deleted": N, "empty_dirs": N, "freed": bytes,
"errors": [str, ...]}``.
"""
tracked = load_tracked()
now = datetime.now(timezone.utc)
deleted = 0
freed = 0
new_tracked: List[Dict] = []
errors: List[str] = []
for item in tracked:
p = Path(item["path"])
cat = item["category"]
if not p.exists():
_log(f"STALE: {p} (removed from tracking)")
continue
age = (now - datetime.fromisoformat(item["timestamp"])).days
should_delete = (
cat == "test"
or (cat == "temp" and age > 7)
or (cat == "cron-output" and age > 14)
)
if should_delete:
try:
if p.is_file():
p.unlink()
elif p.is_dir():
shutil.rmtree(p)
freed += item["size"]
deleted += 1
_log(f"DELETED: {p} ({cat}, {fmt_size(item['size'])})")
except OSError as e:
_log(f"ERROR deleting {p}: {e}")
errors.append(f"{p}: {e}")
new_tracked.append(item)
else:
new_tracked.append(item)
# Remove empty dirs under HERMES_HOME (but leave HERMES_HOME itself and
# a short list of well-known top-level state dirs alone — a fresh install
# has these empty, and deleting them would surprise the user).
hermes_home = get_hermes_home()
_PROTECTED_TOP_LEVEL = {
"logs", "memories", "sessions", "cron", "cronjobs",
"cache", "skills", "plugins", "disk-cleanup", "optional-skills",
"hermes-agent", "backups", "profiles", ".worktrees",
}
empty_removed = 0
try:
for dirpath in sorted(hermes_home.rglob("*"), reverse=True):
if not dirpath.is_dir() or dirpath == hermes_home:
continue
try:
rel_parts = dirpath.relative_to(hermes_home).parts
except ValueError:
continue
if _is_protected_empty_dir(dirpath, hermes_home):
continue
# Skip the well-known top-level state dirs themselves.
if len(rel_parts) == 1 and rel_parts[0] in _PROTECTED_TOP_LEVEL:
continue
try:
if not any(dirpath.iterdir()):
dirpath.rmdir()
empty_removed += 1
_log(f"DELETED: {dirpath} (empty dir)")
except OSError:
pass
except OSError:
pass
save_tracked(new_tracked)
_log(
f"QUICK_SUMMARY: {deleted} files, {empty_removed} dirs, "
f"{fmt_size(freed)}"
)
return {
"deleted": deleted,
"empty_dirs": empty_removed,
"freed": freed,
"errors": errors,
}
# ---------------------------------------------------------------------------
# Deep cleanup (interactive — not called from plugin hooks)
# ---------------------------------------------------------------------------
def deep(
confirm: Optional[callable] = None,
) -> Dict[str, Any]:
"""Deep cleanup.
Runs :func:`quick` first, then asks the *confirm* callable for each
risky item (research > 30d beyond 10 newest, chrome-profile > 14d,
any file > 500 MB). *confirm(item)* must return True to delete.
Returns: ``{"quick": {...}, "deep_deleted": N, "deep_freed": bytes}``.
"""
quick_result = quick()
if confirm is None:
# No interactive confirmer — deep stops after the quick pass.
return {"quick": quick_result, "deep_deleted": 0, "deep_freed": 0}
tracked = load_tracked()
now = datetime.now(timezone.utc)
research, chrome, large = [], [], []
for item in tracked:
p = Path(item["path"])
if not p.exists():
continue
age = (now - datetime.fromisoformat(item["timestamp"])).days
cat = item["category"]
if cat == "research" and age > 30:
research.append(item)
elif cat == "chrome-profile" and age > 14:
chrome.append(item)
elif item["size"] > 500 * 1024 * 1024:
large.append(item)
research.sort(key=lambda x: x["timestamp"], reverse=True)
old_research = research[10:]
freed, count = 0, 0
to_remove: List[Dict] = []
for group in (old_research, chrome, large):
for item in group:
if confirm(item):
try:
p = Path(item["path"])
if p.is_file():
p.unlink()
elif p.is_dir():
shutil.rmtree(p)
to_remove.append(item)
freed += item["size"]
count += 1
_log(
f"DELETED: {p} ({item['category']}, "
f"{fmt_size(item['size'])})"
)
except OSError as e:
_log(f"ERROR deleting {item['path']}: {e}")
if to_remove:
remove_paths = {i["path"] for i in to_remove}
save_tracked([i for i in tracked if i["path"] not in remove_paths])
return {"quick": quick_result, "deep_deleted": count, "deep_freed": freed}
# ---------------------------------------------------------------------------
# Status
# ---------------------------------------------------------------------------
def status() -> Dict[str, Any]:
"""Return per-category breakdown and top 10 largest tracked files."""
tracked = load_tracked()
cats: Dict[str, Dict] = {}
for item in tracked:
c = item["category"]
cats.setdefault(c, {"count": 0, "size": 0})
cats[c]["count"] += 1
cats[c]["size"] += item["size"]
existing = [
(i["path"], i["size"], i["category"])
for i in tracked if Path(i["path"]).exists()
]
existing.sort(key=lambda x: x[1], reverse=True)
return {
"categories": cats,
"top10": existing[:10],
"total_tracked": len(tracked),
}
def format_status(s: Dict[str, Any]) -> str:
"""Human-readable status string (for slash command output)."""
lines = [f"{'Category':<20} {'Files':>6} {'Size':>10}", "-" * 40]
cats = s["categories"]
for cat, d in sorted(cats.items(), key=lambda x: x[1]["size"], reverse=True):
lines.append(f"{cat:<20} {d['count']:>6} {fmt_size(d['size']):>10}")
if not cats:
lines.append("(nothing tracked yet)")
lines.append("")
lines.append("Top 10 largest tracked files:")
if not s["top10"]:
lines.append(" (none)")
else:
for rank, (path, size, cat) in enumerate(s["top10"], 1):
lines.append(f" {rank:>2}. {fmt_size(size):>8} [{cat}] {path}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Auto-categorisation from tool-call inspection
# ---------------------------------------------------------------------------
_TEST_PATTERNS = ("test_", "tmp_")
_TEST_SUFFIXES = (".test.py", ".test.js", ".test.ts", ".test.md")
def guess_category(path: Path) -> Optional[str]:
"""Return a category label for *path*, or None if we shouldn't track it.
Used by the ``post_tool_call`` hook to auto-track ephemeral files.
"""
if not is_safe_path(path):
return None
# Skip the state dir itself, logs, memory files, sessions, config.
hermes_home = get_hermes_home()
try:
rel = path.resolve().relative_to(hermes_home)
top = rel.parts[0] if rel.parts else ""
if top in {
"disk-cleanup", "logs", "memories", "sessions", "config.yaml",
"skills", "plugins", ".env", "USER.md", "MEMORY.md", "SOUL.md",
"auth.json", "hermes-agent",
}:
return None
if top == "cron" or top == "cronjobs":
return "cron-output"
if top == "cache":
return "temp"
except ValueError:
# Path isn't under HERMES_HOME (e.g. /tmp/hermes-*) — fall through.
pass
name = path.name
if name.startswith(_TEST_PATTERNS):
return "test"
if any(name.endswith(sfx) for sfx in _TEST_SUFFIXES):
return "test"
return None