diff --git a/hermes_cli/plugins.py b/hermes_cli/plugins.py index 7796be4de..621dedde1 100644 --- a/hermes_cli/plugins.py +++ b/hermes_cli/plugins.py @@ -2,14 +2,20 @@ Hermes Plugin System ==================== -Discovers, loads, and manages plugins from three sources: +Discovers, loads, and manages plugins from four sources: -1. **User plugins** – ``~/.hermes/plugins//`` -2. **Project plugins** – ``./.hermes/plugins//`` (opt-in via +1. **Bundled plugins** – ``/plugins//`` (shipped with hermes-agent; + ``memory/`` and ``context_engine/`` subdirs are excluded — they have their + own discovery paths) +2. **User plugins** – ``~/.hermes/plugins//`` +3. **Project plugins** – ``./.hermes/plugins//`` (opt-in via ``HERMES_ENABLE_PROJECT_PLUGINS``) -3. **Pip plugins** – packages that expose the ``hermes_agent.plugins`` +4. **Pip plugins** – packages that expose the ``hermes_agent.plugins`` entry-point group. +Later sources override earlier ones on name collision, so a user or project +plugin with the same name as a bundled plugin replaces it. + Each directory plugin must contain a ``plugin.yaml`` manifest **and** an ``__init__.py`` with a ``register(ctx)`` function. @@ -422,21 +428,42 @@ class PluginManager: manifests: List[PluginManifest] = [] - # 1. User plugins (~/.hermes/plugins/) + # 1. Bundled plugins (/plugins//) + # Repo-shipped generic plugins live next to hermes_cli/. Memory and + # context_engine subdirs are handled by their own discovery paths, so + # skip those names here. + # Tests can set HERMES_DISABLE_BUNDLED_PLUGINS=1 to get a clean slate. + if not _env_enabled("HERMES_DISABLE_BUNDLED_PLUGINS"): + repo_plugins = Path(__file__).resolve().parent.parent / "plugins" + manifests.extend( + self._scan_directory( + repo_plugins, + source="bundled", + skip_names={"memory", "context_engine"}, + ) + ) + + # 2. User plugins (~/.hermes/plugins/) user_dir = get_hermes_home() / "plugins" manifests.extend(self._scan_directory(user_dir, source="user")) - # 2. Project plugins (./.hermes/plugins/) + # 3. Project plugins (./.hermes/plugins/) if _env_enabled("HERMES_ENABLE_PROJECT_PLUGINS"): project_dir = Path.cwd() / ".hermes" / "plugins" manifests.extend(self._scan_directory(project_dir, source="project")) - # 3. Pip / entry-point plugins + # 4. Pip / entry-point plugins manifests.extend(self._scan_entry_points()) - # Load each manifest (skip user-disabled plugins) + # Load each manifest (skip user-disabled plugins). + # Later sources override earlier ones on name collision — user plugins + # take precedence over bundled, project plugins take precedence over + # user. Dedup here so we only load the final winner. disabled = _get_disabled_plugins() + winners: Dict[str, PluginManifest] = {} for manifest in manifests: + winners[manifest.name] = manifest + for manifest in winners.values(): if manifest.name in disabled: loaded = LoadedPlugin(manifest=manifest, enabled=False) loaded.error = "disabled via config" @@ -456,8 +483,18 @@ class PluginManager: # Directory scanning # ----------------------------------------------------------------------- - def _scan_directory(self, path: Path, source: str) -> List[PluginManifest]: - """Read ``plugin.yaml`` manifests from subdirectories of *path*.""" + def _scan_directory( + self, + path: Path, + source: str, + skip_names: Optional[Set[str]] = None, + ) -> List[PluginManifest]: + """Read ``plugin.yaml`` manifests from subdirectories of *path*. + + *skip_names* is an optional allow-list of names to ignore (used + for the bundled scan to exclude ``memory`` / ``context_engine`` + subdirs that have their own discovery path). + """ manifests: List[PluginManifest] = [] if not path.is_dir(): return manifests @@ -465,6 +502,8 @@ class PluginManager: for child in sorted(path.iterdir()): if not child.is_dir(): continue + if skip_names and child.name in skip_names: + continue manifest_file = child / "plugin.yaml" if not manifest_file.exists(): manifest_file = child / "plugin.yml" @@ -532,7 +571,7 @@ class PluginManager: loaded = LoadedPlugin(manifest=manifest) try: - if manifest.source in ("user", "project"): + if manifest.source in ("user", "project", "bundled"): module = self._load_directory_module(manifest) else: module = self._load_entrypoint_module(manifest) diff --git a/optional-skills/devops/disk-guardian/SKILL.md b/optional-skills/devops/disk-guardian/SKILL.md deleted file mode 100644 index c57c9957c..000000000 --- a/optional-skills/devops/disk-guardian/SKILL.md +++ /dev/null @@ -1,156 +0,0 @@ ---- -name: disk-guardian -description: > - Cleans up ephemeral files Hermes creates during sessions — temp outputs, - test scripts, test artifacts, cron logs, and stale chrome profiles. - Auto-deletes test files at the end of every task. Runs automatically when - disk usage is high or sessions are long. Scoped strictly to HERMES_HOME. - Triggers: "clean up", "free disk space", "disk is full", "remove test files", - "what temp files exist", "show disk usage", "run cleanup". -version: 1.2.0 -platforms: [linux] -requires_toolsets: [terminal] -metadata: - hermes: - tags: [devops, maintenance, cleanup, ephemeral, WSL2, auto] - category: devops ---- - -# Disk Guardian - -Tracks and removes **ephemeral files Hermes creates** during sessions: -temp outputs, test scripts/artifacts, cron logs, stale chrome profiles, -and large downloads. Scope is strictly `HERMES_HOME` and `/tmp/hermes-*`. -Never touches `~/.hermes/logs/` or any system directory. - -## When to Use - -**Auto-trigger (no user prompt needed):** -- End of every task that created test files → run `quick` silently -- Session context window exceeds 60% → run `quick` silently -- Any tool call creates a file matching `test_*`, `*.test.*`, `tmp_*` → track it immediately -- User says "done", "finished", "task complete" after a multi-step task - -**User-triggered:** -- User says "disk is full", "clean up", "free space", "remove temp files" -- User says "what's taking disk space", "show disk usage" -- User asks for dry-run preview before committing to deletion - -## Auto-Run Behavior - -This skill runs **silently in the background**. When auto-running: -- Never ask for confirmation for safe categories (temp, test, empty dirs) -- Print one line after: `[disk-guardian] Cleaned 12 files, freed 234 MB` -- Only surface to the user if something needs confirmation (research folders, large files >500 MB) - -## Memory — Install Once, Active Forever - -On first install, run this to write persistent memory rules so the behavior -carries across all future sessions automatically: - -```bash -python3 ~/.hermes/optional-skills/devops/disk-guardian/scripts/disk_guardian.py install-memory -``` - -This adds to Hermes memory: -- Auto-track every file created under HERMES_HOME during tasks -- Auto-run quick cleanup at end of every task that created test files -- Auto-run quick cleanup if HERMES_HOME disk usage exceeds 5 GB -- Never ask permission for temp/test cleanup — clean silently and report one line - -## Quick Reference - -```bash -SCRIPT=~/.hermes/optional-skills/devops/disk-guardian/scripts/disk_guardian.py - -python3 $SCRIPT install-memory # write persistent memory (run once after install) -python3 $SCRIPT status # show breakdown by category + top 10 largest -python3 $SCRIPT dry-run # preview deletions, touch nothing -python3 $SCRIPT quick # auto-delete safe files (no prompts) -python3 $SCRIPT deep # full cleanup, prompt for risky items -python3 $SCRIPT track # register a file -python3 $SCRIPT forget # stop tracking a path -``` - -## Procedure - -### Step 1 — Track files as Hermes creates them - -Whenever Hermes creates a file via `write_file` or `terminal`, immediately track it: - -```bash -# Test script created during a task -python3 $SCRIPT track "/home/user/.hermes/test_output.py" "test" - -# Temp output -python3 $SCRIPT track "/home/user/.hermes/cache/run_abc.json" "temp" - -# Cron output -python3 $SCRIPT track "/home/user/.hermes/cron/report_2026.md" "cron-output" - -# Chrome profile -python3 $SCRIPT track "/home/user/.local/share/notebooklm-mcp/chrome_profile_x" "chrome-profile" -``` - -Categories: `temp` | `test` | `research` | `download` | `chrome-profile` | `cron-output` | `other` - -Always wrap user-supplied paths with `shlex.quote()`. - -### Step 2 — Auto-delete test files at end of every task - -At the end of **every task** that involved creating or running test files, silently run: - -```bash -python3 $SCRIPT quick -``` - -Output: `[disk-guardian] Cleaned 3 test files, freed 45 KB` - -Do not ask the user — just clean and report one line. - -### Step 3 — Respond to user cleanup requests - -```bash -# Safe, no prompts -python3 $SCRIPT quick - -# Full cleanup with confirmation for research/large files -python3 $SCRIPT deep - -# Preview only -python3 $SCRIPT dry-run -``` - -## Cleanup Rules (Deterministic) - -| Category | Threshold | Confirmation | -|---|---|---| -| `test` | >0 days — delete at task end | Never | -| `temp` | >7 days since tracked | Never | -| empty dirs under HERMES_HOME | always | Never | -| `cron-output` | >14 days since tracked | Never | -| `research` | >30 days, beyond 10 newest | Always | -| `chrome-profile` | >14 days since tracked | Always | -| `download` / `other` | never auto | Always (deep only) | -| any file >500 MB | never auto | Always (deep only) | - -## Pitfalls - -- **Never hardcode `~/.hermes`** — always use `HERMES_HOME` env var or `get_hermes_home()` -- **Never touch `~/.hermes/logs/`** — agent debug logs are not ephemeral artifacts -- **Backup/restore scoped to `tracked.json` only** — never agent logs or other Hermes state -- **WSL2: reject Windows mounts** — `/mnt/c/` and all `/mnt/` paths rejected by `_is_safe_path()` -- **Test files are always ephemeral** — delete aggressively, never prompt -- **Silent by default** — only interrupt the user when confirmation is genuinely required - -## Verification - -```bash -# After quick cleanup: -tail -5 ~/.hermes/disk-guardian/cleanup.log -# Should show DELETED entries for test/temp files - -# After install-memory: -# Ask Hermes: "what do you remember about disk cleanup?" -# Should confirm auto-cleanup rules are in memory -``` diff --git a/optional-skills/devops/disk-guardian/disk_guardian.py b/optional-skills/devops/disk-guardian/disk_guardian.py deleted file mode 100755 index cd0dad547..000000000 --- a/optional-skills/devops/disk-guardian/disk_guardian.py +++ /dev/null @@ -1,508 +0,0 @@ -#!/usr/bin/env python3 -""" -disk_guardian.py v1.2.0 — ephemeral file cleanup for Hermes Agent - -Tracks and removes temp outputs, test artifacts, cron logs, and stale -chrome profiles created during Hermes sessions. - -Rules: - - test files → delete immediately at task end (age > 0) - - temp files → delete after 7 days - - cron-output → delete after 14 days - - empty dirs → always delete - - research → keep 10 newest, prompt for older (deep only) - - chrome-profile→ prompt after 14 days (deep only) - - >500 MB files → prompt always (deep only) - -Scope: strictly HERMES_HOME and /tmp/hermes-* -Never touches: ~/.hermes/logs/ or any system directory -""" - -import argparse -import json -import os -import shutil -import sys -from datetime import datetime, timezone -from pathlib import Path -from typing import Any, Dict, List, Optional - -# --------------------------------------------------------------------------- -# Paths -# --------------------------------------------------------------------------- - -def get_hermes_home() -> Path: - """Return HERMES_HOME, defaulting to ~/.hermes.""" - val = os.environ.get("HERMES_HOME", "").strip() - return Path(val).resolve() if val else (Path.home() / ".hermes").resolve() - - -def get_state_dir() -> Path: - """State dir — separate from ~/.hermes/logs/.""" - return get_hermes_home() / "disk-guardian" - - -def get_tracked_file() -> Path: - return get_state_dir() / "tracked.json" - - -def get_log_file() -> Path: - """Audit log — NOT ~/.hermes/logs/.""" - return get_state_dir() / "cleanup.log" - - -# --------------------------------------------------------------------------- -# WSL + path safety -# --------------------------------------------------------------------------- - -def is_wsl() -> bool: - try: - return "microsoft" in Path("/proc/version").read_text().lower() - except Exception: - return False - - -def _is_safe_path(path: Path) -> bool: - """ - Accept only paths under HERMES_HOME or /tmp/hermes-*. - Rejects Windows mounts (/mnt/c etc.) and system directories. - """ - hermes_home = get_hermes_home() - try: - path.relative_to(hermes_home) - return True - except ValueError: - pass - # Allow /tmp/hermes-* explicitly - parts = path.parts - if len(parts) >= 3 and parts[1] == "tmp" and parts[2].startswith("hermes-"): - return True - return False - - -# --------------------------------------------------------------------------- -# Audit log — writes only to disk-guardian/cleanup.log -# --------------------------------------------------------------------------- - -def _log(message: str) -> None: - log_file = get_log_file() - log_file.parent.mkdir(parents=True, exist_ok=True) - ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") - with open(log_file, "a") as f: - f.write(f"[{ts}] {message}\n") - - -# --------------------------------------------------------------------------- -# tracked.json — atomic read/write, backup scoped to tracked.json only -# --------------------------------------------------------------------------- - -def load_tracked() -> List[Dict[str, Any]]: - """ - Load tracked.json. - Corruption recovery: restore from .bak — never touches ~/.hermes/logs/. - """ - tf = get_tracked_file() - tf.parent.mkdir(parents=True, exist_ok=True) - - if not tf.exists(): - return [] - - try: - return json.loads(tf.read_text()) - except (json.JSONDecodeError, ValueError): - bak = tf.with_suffix(".json.bak") - if bak.exists(): - try: - data = json.loads(bak.read_text()) - _log("WARN: tracked.json corrupted — restored from .bak") - print("Warning: tracking file corrupted, restored from backup.") - return data - except Exception: - pass - _log("WARN: tracked.json corrupted, no backup — starting fresh") - print("Warning: tracking file corrupted, starting fresh.") - return [] - - -def save_tracked(tracked: List[Dict[str, Any]]) -> None: - """Atomic write: .tmp → backup old → rename.""" - tf = get_tracked_file() - tf.parent.mkdir(parents=True, exist_ok=True) - tmp = tf.with_suffix(".json.tmp") - tmp.write_text(json.dumps(tracked, indent=2)) - if tf.exists(): - shutil.copy2(tf, tf.with_suffix(".json.bak")) - tmp.replace(tf) - - -# --------------------------------------------------------------------------- -# Allowed categories -# --------------------------------------------------------------------------- - -ALLOWED_CATEGORIES = { - "temp", "test", "research", "download", - "chrome-profile", "cron-output", "other", -} - -# --------------------------------------------------------------------------- -# Commands -# --------------------------------------------------------------------------- - -def cmd_track(path_str: str, category: str) -> None: - """Register a file for tracking.""" - if category not in ALLOWED_CATEGORIES: - print(f"Unknown category '{category}', using 'other'.") - _log(f"WARN: unknown category '{category}', using 'other'") - category = "other" - - path = Path(path_str).resolve() - - if not path.exists(): - print(f"Path does not exist, skipping: {path}") - _log(f"SKIP: {path} (does not exist)") - return - - if not _is_safe_path(path): - print(f"Rejected: path is outside HERMES_HOME — {path}") - _log(f"REJECT: {path} (outside HERMES_HOME)") - return - - size = path.stat().st_size if path.is_file() else 0 - tracked = load_tracked() - - # Deduplicate - if any(item["path"] == str(path) for item in tracked): - print(f"Already tracked: {path}") - return - - tracked.append({ - "path": str(path), - "timestamp": datetime.now(timezone.utc).isoformat(), - "category": category, - "size": size, - }) - save_tracked(tracked) - _log(f"TRACKED: {path} ({category}, {_fmt(size)})") - print(f"Tracked: {path} ({category}, {_fmt(size)})") - - -def cmd_dry_run() -> None: - """Show what would be deleted — no files touched.""" - tracked = load_tracked() - now = datetime.now(timezone.utc) - - auto: List[Dict] = [] - prompt: List[Dict] = [] - - for item in tracked: - p = Path(item["path"]) - if not p.exists(): - continue - age = (now - datetime.fromisoformat(item["timestamp"])).days - cat = item["category"] - size = item["size"] - - if cat == "test": - auto.append(item) - elif cat == "temp" and age > 7: - auto.append(item) - elif cat == "cron-output" and age > 14: - auto.append(item) - elif cat == "research" and age > 30: - prompt.append(item) - elif cat == "chrome-profile" and age > 14: - prompt.append(item) - elif size > 500 * 1024 * 1024: - prompt.append(item) - - auto_size = sum(i["size"] for i in auto) - prompt_size = sum(i["size"] for i in prompt) - - print("Dry-run preview (nothing deleted):") - print(f" Auto-delete : {len(auto)} files ({_fmt(auto_size)})") - for item in auto: - print(f" [{item['category']}] {item['path']}") - print(f" Needs prompt: {len(prompt)} files ({_fmt(prompt_size)})") - for item in prompt: - print(f" [{item['category']}] {item['path']}") - print(f"\n Total potential: {_fmt(auto_size + prompt_size)}") - print("Run 'quick' for auto-delete only, 'deep' for full cleanup.") - - -def cmd_quick(silent: bool = False) -> None: - """ - Safe deterministic cleanup — no prompts. - Deletes: test (age>0), temp (>7d), cron-output (>14d), empty dirs. - Pass silent=True to suppress output (for auto-runs). - """ - tracked = load_tracked() - now = datetime.now(timezone.utc) - deleted, freed = 0, 0 - new_tracked = [] - - for item in tracked: - p = Path(item["path"]) - cat = item["category"] - - if not p.exists(): - _log(f"STALE: {p} (removed from tracking)") - continue - - age = (now - datetime.fromisoformat(item["timestamp"])).days - - should_delete = ( - cat == "test" or # always delete test files - (cat == "temp" and age > 7) or - (cat == "cron-output" and age > 14) - ) - - if should_delete: - try: - if p.is_file(): - p.unlink() - elif p.is_dir(): - shutil.rmtree(p) - freed += item["size"] - deleted += 1 - _log(f"DELETED: {p} ({cat}, {_fmt(item['size'])})") - except OSError as e: - _log(f"ERROR deleting {p}: {e}") - if not silent: - print(f" Skipped (error): {p} — {e}") - new_tracked.append(item) - else: - new_tracked.append(item) - - # Remove empty dirs under HERMES_HOME - hermes_home = get_hermes_home() - empty_removed = 0 - for dirpath in sorted(hermes_home.rglob("*"), reverse=True): - if dirpath.is_dir() and dirpath != hermes_home: - try: - if not any(dirpath.iterdir()): - dirpath.rmdir() - empty_removed += 1 - _log(f"DELETED: {dirpath} (empty dir)") - except OSError: - pass - - save_tracked(new_tracked) - - summary = (f"[disk-guardian] Cleaned {deleted} files + {empty_removed} " - f"empty dirs, freed {_fmt(freed)}.") - _log(f"QUICK_SUMMARY: {deleted} files, {empty_removed} dirs, {_fmt(freed)}") - print(summary) - - -def cmd_deep() -> None: - """Full cleanup — auto for safe files, interactive for risky.""" - print("Running quick cleanup first...") - cmd_quick() - - tracked = load_tracked() - now = datetime.now(timezone.utc) - research, chrome, large = [], [], [] - - for item in tracked: - p = Path(item["path"]) - if not p.exists(): - continue - age = (now - datetime.fromisoformat(item["timestamp"])).days - cat = item["category"] - - if cat == "research" and age > 30: - research.append(item) - elif cat == "chrome-profile" and age > 14: - chrome.append(item) - elif item["size"] > 500 * 1024 * 1024: - large.append(item) - - # Keep 10 newest research folders - research.sort(key=lambda x: x["timestamp"], reverse=True) - old_research = research[10:] - - freed, count = 0, 0 - to_remove = [] - - for item in old_research: - p = Path(item["path"]) - ans = input(f"\nDelete old research ({_fmt(item['size'])}): {p} [y/N] ") - if ans.lower() == "y": - _delete_item(p, item, to_remove) - freed += item["size"] - count += 1 - - for item in chrome: - p = Path(item["path"]) - ans = input(f"\nDelete chrome profile ({_fmt(item['size'])}): {p} [y/N] ") - if ans.lower() == "y": - _delete_item(p, item, to_remove) - freed += item["size"] - count += 1 - - for item in large: - p = Path(item["path"]) - ans = input(f"\nDelete large file ({_fmt(item['size'])}, " - f"{item['category']}): {p} [y/N] ") - if ans.lower() == "y": - _delete_item(p, item, to_remove) - freed += item["size"] - count += 1 - - if to_remove: - remove_paths = {i["path"] for i in to_remove} - save_tracked([i for i in tracked if i["path"] not in remove_paths]) - - print(f"\n[disk-guardian] Deep cleanup done: {count} items, freed {_fmt(freed)}.") - - -def _delete_item(p: Path, item: Dict, to_remove: list) -> None: - try: - if p.is_file(): - p.unlink() - elif p.is_dir(): - shutil.rmtree(p) - to_remove.append(item) - _log(f"DELETED: {p} ({item['category']}, {_fmt(item['size'])})") - print(f" Deleted: {p}") - except OSError as e: - _log(f"ERROR deleting {p}: {e}") - print(f" Error: {e}") - - -def cmd_status() -> None: - """Print per-category breakdown and top 10 largest tracked files.""" - tracked = load_tracked() - cats: Dict[str, Dict] = {} - for item in tracked: - c = item["category"] - cats.setdefault(c, {"count": 0, "size": 0}) - cats[c]["count"] += 1 - cats[c]["size"] += item["size"] - - print(f"{'Category':<20} {'Files':>6} {'Size':>10}") - print("-" * 40) - for cat, d in sorted(cats.items(), key=lambda x: x[1]["size"], reverse=True): - print(f"{cat:<20} {d['count']:>6} {_fmt(d['size']):>10}") - - existing = [ - (i["path"], i["size"], i["category"]) - for i in tracked if Path(i["path"]).exists() - ] - existing.sort(key=lambda x: x[1], reverse=True) - - print("\nTop 10 largest tracked files:") - for rank, (path, size, cat) in enumerate(existing[:10], 1): - print(f" {rank:>2}. {_fmt(size):>8} [{cat}] {path}") - - if not tracked: - print("No files tracked yet.") - - -def cmd_forget(path_str: str) -> None: - """Remove a path from tracking without deleting the file.""" - p = Path(path_str).resolve() - tracked = load_tracked() - before = len(tracked) - tracked = [i for i in tracked if Path(i["path"]).resolve() != p] - removed = before - len(tracked) - if removed: - save_tracked(tracked) - _log(f"FORGOT: {p} ({removed} entries)") - print(f"Removed {removed} tracking entries for {p}.") - else: - print(f"Not found in tracking: {p}") - - -def cmd_install_memory() -> None: - """ - Write persistent memory rules so disk-guardian auto-runs across sessions. - Writes to ~/.hermes/MEMORY.md (appends if exists). - """ - memory_file = get_hermes_home() / "MEMORY.md" - rules = """ -## disk-guardian auto-cleanup rules (added by disk-guardian v1.2.0) - -- After every task that creates test files (test_*.py, *.test.*, tmp_*), - automatically run: python3 ~/.hermes/optional-skills/devops/disk-guardian/scripts/disk_guardian.py quick - Do this silently at task end — no need to ask the user. -- Track every file created under HERMES_HOME during tasks using disk-guardian track. -- If HERMES_HOME disk usage exceeds 5 GB, silently run disk-guardian quick. -- Never ask permission for temp/test file deletion — clean silently, report one line. -- Print cleanup summary as: [disk-guardian] Cleaned N files, freed X MB -""" - memory_file.parent.mkdir(parents=True, exist_ok=True) - with open(memory_file, "a") as f: - f.write(rules) - _log("MEMORY: wrote auto-cleanup rules to MEMORY.md") - print(f"Memory rules written to {memory_file}") - print("Disk-guardian will now auto-run at the end of every task that creates test files.") - print("Restart Hermes for the rules to take effect.") - - -# --------------------------------------------------------------------------- -# Formatting -# --------------------------------------------------------------------------- - -def _fmt(n: int) -> str: - for unit in ("B", "KB", "MB", "GB", "TB"): - if n < 1024: - return f"{n:.1f} {unit}" - n /= 1024 - return f"{n:.1f} PB" - - -# --------------------------------------------------------------------------- -# CLI -# --------------------------------------------------------------------------- - -def main() -> None: - parser = argparse.ArgumentParser( - description="disk_guardian — ephemeral session file cleanup for Hermes Agent" - ) - sub = parser.add_subparsers(dest="cmd") - - p_track = sub.add_parser("track", help="Register a file for tracking") - p_track.add_argument("path") - p_track.add_argument("category", choices=sorted(ALLOWED_CATEGORIES)) - - sub.add_parser("dry-run", help="Preview deletions, touch nothing") - sub.add_parser("quick", help="Auto-delete safe files (no prompts)") - sub.add_parser("deep", help="Full cleanup with prompts for risky items") - sub.add_parser("status", help="Show disk usage by category") - sub.add_parser("install-memory", help="Write persistent auto-run memory rules") - - p_forget = sub.add_parser("forget", help="Stop tracking a path") - p_forget.add_argument("path") - - args = parser.parse_args() - if not args.cmd: - parser.print_help() - sys.exit(1) - - try: - if args.cmd == "track": - cmd_track(args.path, args.category) - elif args.cmd == "dry-run": - cmd_dry_run() - elif args.cmd == "quick": - cmd_quick() - elif args.cmd == "deep": - cmd_deep() - elif args.cmd == "status": - cmd_status() - elif args.cmd == "install-memory": - cmd_install_memory() - elif args.cmd == "forget": - cmd_forget(args.path) - except KeyboardInterrupt: - print("\nAborted.") - sys.exit(0) - except Exception as e: - _log(f"ERROR: {e}") - print(f"Error: {e}", file=sys.stderr) - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/plugins/disk-guardian/README.md b/plugins/disk-guardian/README.md new file mode 100644 index 000000000..512c1cb62 --- /dev/null +++ b/plugins/disk-guardian/README.md @@ -0,0 +1,51 @@ +# disk-guardian + +Auto-tracks and cleans up ephemeral files created during Hermes Agent +sessions — test scripts, temp outputs, cron logs, stale chrome profiles. +Scoped strictly to `$HERMES_HOME` and `/tmp/hermes-*`. + +Originally contributed by [@LVT382009](https://github.com/LVT382009) as a +skill in PR #12212. Ported to the plugin system so the behaviour runs +automatically via `post_tool_call` and `on_session_end` hooks — the agent +never needs to remember to call a tool. + +## How it works + +| Hook | Behaviour | +|---|---| +| `post_tool_call` | When `write_file` / `terminal` / `patch` creates a file matching `test_*`, `tmp_*`, or `*.test.*` inside `HERMES_HOME`, track it silently as `test` / `temp` / `cron-output`. | +| `on_session_end` | If any test files were auto-tracked during this turn, run `quick` cleanup (no prompts). | + +Deletion rules (same as the original PR): + +| Category | Threshold | Confirmation | +|---|---|---| +| `test` | every session end | Never | +| `temp` | >7 days since tracked | Never | +| `cron-output` | >14 days since tracked | Never | +| empty dirs under HERMES_HOME | always | Never | +| `research` | >30 days, beyond 10 newest | Always (deep only) | +| `chrome-profile` | >14 days since tracked | Always (deep only) | +| files >500 MB | never auto | Always (deep only) | + +## Slash command + +``` +/disk-guardian status # breakdown + top-10 largest +/disk-guardian dry-run # preview without deleting +/disk-guardian quick # run safe cleanup now +/disk-guardian deep # quick + list items needing prompt +/disk-guardian track # manual tracking +/disk-guardian forget # stop tracking +``` + +## Safety + +- `is_safe_path()` rejects anything outside `HERMES_HOME` or `/tmp/hermes-*` +- Windows mounts (`/mnt/c` etc.) are rejected +- The state directory `$HERMES_HOME/disk-guardian/` is itself excluded +- `$HERMES_HOME/logs/`, `memories/`, `sessions/`, `skills/`, `plugins/`, + and config files are never tracked +- Backup/restore is scoped to `tracked.json` — the plugin never touches + agent logs +- Atomic writes: `.tmp` → backup → rename diff --git a/plugins/disk-guardian/__init__.py b/plugins/disk-guardian/__init__.py new file mode 100644 index 000000000..3b73df6de --- /dev/null +++ b/plugins/disk-guardian/__init__.py @@ -0,0 +1,316 @@ +"""disk-guardian plugin — auto-cleanup of ephemeral Hermes session files. + +Wires three behaviours: + +1. ``post_tool_call`` hook — inspects ``write_file`` and ``terminal`` + tool results for newly-created paths matching test/temp patterns + under ``HERMES_HOME`` and tracks them silently. Zero agent + compliance required. + +2. ``on_session_end`` hook — when any test files were auto-tracked + during the just-finished turn, runs :func:`disk_guardian.quick` and + logs a single line to ``$HERMES_HOME/disk-guardian/cleanup.log``. + +3. ``/disk-guardian`` slash command — manual ``status``, ``dry-run``, + ``quick``, ``deep``, ``track``, ``forget``. + +Replaces PR #12212's skill-plus-script design: the agent no longer +needs to remember to run commands. +""" + +from __future__ import annotations + +import logging +import re +import shlex +import threading +from pathlib import Path +from typing import Any, Dict, Optional, Set + +from . import disk_guardian as dg + +logger = logging.getLogger(__name__) + + +# Per-task set of "test files newly tracked this turn". Keyed by task_id +# (or session_id as fallback) so on_session_end can decide whether to run +# cleanup. Guarded by a lock — post_tool_call can fire concurrently on +# parallel tool calls. +_recent_test_tracks: Dict[str, Set[str]] = {} +_lock = threading.Lock() + + +# Tool-call result shapes we can parse +_WRITE_FILE_PATH_KEY = "path" +_TERMINAL_PATH_REGEX = re.compile(r"(?:^|\s)(/[^\s'\"`]+|\~/[^\s'\"`]+)") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _tracker_key(task_id: str, session_id: str) -> str: + return task_id or session_id or "default" + + +def _record_track(task_id: str, session_id: str, path: Path, category: str) -> None: + """Record that we tracked *path* as *category* during this turn.""" + if category != "test": + return + key = _tracker_key(task_id, session_id) + with _lock: + _recent_test_tracks.setdefault(key, set()).add(str(path)) + + +def _drain(task_id: str, session_id: str) -> Set[str]: + """Pop the set of test paths tracked during this turn.""" + key = _tracker_key(task_id, session_id) + with _lock: + return _recent_test_tracks.pop(key, set()) + + +def _attempt_track(path_str: str, task_id: str, session_id: str) -> None: + """Best-effort auto-track. Never raises.""" + try: + p = Path(path_str).expanduser() + except Exception: + return + if not p.exists(): + return + category = dg.guess_category(p) + if category is None: + return + newly = dg.track(str(p), category, silent=True) + if newly: + _record_track(task_id, session_id, p, category) + + +def _extract_paths_from_write_file(args: Dict[str, Any]) -> Set[str]: + path = args.get(_WRITE_FILE_PATH_KEY) + return {path} if isinstance(path, str) and path else set() + + +def _extract_paths_from_patch(args: Dict[str, Any]) -> Set[str]: + # The patch tool creates new files via the `mode="patch"` path too, but + # most of its use is editing existing files — we only care about new + # ephemeral creations, so treat patch conservatively and only pick up + # the single-file `path` arg. Track-then-cleanup is idempotent, so + # re-tracking an already-tracked file is a no-op (dedup in track()). + path = args.get("path") + return {path} if isinstance(path, str) and path else set() + + +def _extract_paths_from_terminal(args: Dict[str, Any], result: str) -> Set[str]: + """Best-effort: pull candidate filesystem paths from a terminal command + and its output, then let ``guess_category`` / ``is_safe_path`` filter. + """ + paths: Set[str] = set() + cmd = args.get("command") or "" + if isinstance(cmd, str) and cmd: + # Tokenise the command — catches `touch /tmp/hermes-x/test_foo.py` + try: + for tok in shlex.split(cmd, posix=True): + if tok.startswith(("/", "~")): + paths.add(tok) + except ValueError: + pass + # Only scan the result text if it's a reasonable size (avoid 50KB dumps). + if isinstance(result, str) and len(result) < 4096: + for match in _TERMINAL_PATH_REGEX.findall(result): + paths.add(match) + return paths + + +# --------------------------------------------------------------------------- +# Hooks +# --------------------------------------------------------------------------- + +def _on_post_tool_call( + tool_name: str = "", + args: Optional[Dict[str, Any]] = None, + result: Any = None, + task_id: str = "", + session_id: str = "", + tool_call_id: str = "", + **_: Any, +) -> None: + """Auto-track ephemeral files created by recent tool calls.""" + if not isinstance(args, dict): + return + + candidates: Set[str] = set() + if tool_name == "write_file": + candidates = _extract_paths_from_write_file(args) + elif tool_name == "patch": + candidates = _extract_paths_from_patch(args) + elif tool_name == "terminal": + candidates = _extract_paths_from_terminal(args, result if isinstance(result, str) else "") + else: + return + + for path_str in candidates: + _attempt_track(path_str, task_id, session_id) + + +def _on_session_end( + session_id: str = "", + completed: bool = True, + interrupted: bool = False, + **_: Any, +) -> None: + """Run quick cleanup if any test files were tracked during this turn.""" + # Drain both task-level and session-level buckets. In practice only one + # is populated per turn; the other is empty. + drained_session = _drain("", session_id) + # Also drain any task-scoped buckets that happen to exist. This is a + # cheap sweep: if an agent spawned subagents (each with their own + # task_id) they'll have recorded into separate buckets; we want to + # cleanup them all at session end. + with _lock: + task_buckets = list(_recent_test_tracks.keys()) + for key in task_buckets: + if key and key != session_id: + _recent_test_tracks.pop(key, None) + + if not drained_session and not task_buckets: + return + + try: + summary = dg.quick() + except Exception as exc: + logger.debug("disk-guardian quick cleanup failed: %s", exc) + return + + if summary["deleted"] or summary["empty_dirs"]: + dg._log( + f"AUTO_QUICK (session_end): deleted={summary['deleted']} " + f"dirs={summary['empty_dirs']} freed={dg.fmt_size(summary['freed'])}" + ) + + +# --------------------------------------------------------------------------- +# Slash command +# --------------------------------------------------------------------------- + +_HELP_TEXT = """\ +/disk-guardian — ephemeral-file cleanup + +Subcommands: + status Per-category breakdown + top-10 largest + dry-run Preview what quick/deep would delete + quick Run safe cleanup now (no prompts) + deep Run quick, then list items that need prompts + track Manually add a path to tracking + forget Stop tracking a path (does not delete) + +Categories: temp | test | research | download | chrome-profile | cron-output | other + +All operations are scoped to HERMES_HOME and /tmp/hermes-*. +Test files are auto-tracked on write_file / terminal and auto-cleaned at session end. +""" + + +def _fmt_summary(summary: Dict[str, Any]) -> str: + base = ( + f"[disk-guardian] Cleaned {summary['deleted']} files + " + f"{summary['empty_dirs']} empty dirs, freed {dg.fmt_size(summary['freed'])}." + ) + if summary.get("errors"): + base += f"\n {len(summary['errors'])} error(s); see cleanup.log." + return base + + +def _handle_slash(raw_args: str) -> Optional[str]: + argv = raw_args.strip().split() + if not argv or argv[0] in ("help", "-h", "--help"): + return _HELP_TEXT + + sub = argv[0] + + if sub == "status": + return dg.format_status(dg.status()) + + if sub == "dry-run": + auto, prompt = dg.dry_run() + auto_size = sum(i["size"] for i in auto) + prompt_size = sum(i["size"] for i in prompt) + lines = [ + "Dry-run preview (nothing deleted):", + f" Auto-delete : {len(auto)} files ({dg.fmt_size(auto_size)})", + ] + for item in auto: + lines.append(f" [{item['category']}] {item['path']}") + lines.append( + f" Needs prompt: {len(prompt)} files ({dg.fmt_size(prompt_size)})" + ) + for item in prompt: + lines.append(f" [{item['category']}] {item['path']}") + lines.append( + f"\n Total potential: {dg.fmt_size(auto_size + prompt_size)}" + ) + return "\n".join(lines) + + if sub == "quick": + return _fmt_summary(dg.quick()) + + if sub == "deep": + # In-session deep can't prompt the user interactively — show what + # quick cleaned plus the items that WOULD need confirmation. + quick_summary = dg.quick() + _auto, prompt_items = dg.dry_run() + lines = [_fmt_summary(quick_summary)] + if prompt_items: + size = sum(i["size"] for i in prompt_items) + lines.append( + f"\n{len(prompt_items)} item(s) need confirmation " + f"({dg.fmt_size(size)}):" + ) + for item in prompt_items: + lines.append(f" [{item['category']}] {item['path']}") + lines.append( + "\nRun `/disk-guardian forget ` to skip, or delete " + "manually via terminal." + ) + return "\n".join(lines) + + if sub == "track": + if len(argv) < 3: + return "Usage: /disk-guardian track " + path_arg = argv[1] + category = argv[2] + if category not in dg.ALLOWED_CATEGORIES: + return ( + f"Unknown category '{category}'. " + f"Allowed: {sorted(dg.ALLOWED_CATEGORIES)}" + ) + if dg.track(path_arg, category, silent=True): + return f"Tracked {path_arg} as '{category}'." + return ( + f"Not tracked (already present, missing, or outside HERMES_HOME): " + f"{path_arg}" + ) + + if sub == "forget": + if len(argv) < 2: + return "Usage: /disk-guardian forget " + n = dg.forget(argv[1]) + return ( + f"Removed {n} tracking entr{'y' if n == 1 else 'ies'} for {argv[1]}." + if n else f"Not found in tracking: {argv[1]}" + ) + + return f"Unknown subcommand: {sub}\n\n{_HELP_TEXT}" + + +# --------------------------------------------------------------------------- +# Plugin registration +# --------------------------------------------------------------------------- + +def register(ctx) -> None: + ctx.register_hook("post_tool_call", _on_post_tool_call) + ctx.register_hook("on_session_end", _on_session_end) + ctx.register_command( + "disk-guardian", + handler=_handle_slash, + description="Track and clean up ephemeral Hermes session files.", + ) diff --git a/plugins/disk-guardian/disk_guardian.py b/plugins/disk-guardian/disk_guardian.py new file mode 100755 index 000000000..b6f120c9d --- /dev/null +++ b/plugins/disk-guardian/disk_guardian.py @@ -0,0 +1,496 @@ +"""disk_guardian — ephemeral file cleanup for Hermes Agent. + +Library module wrapping the deterministic cleanup rules written by +@LVT382009 in PR #12212. The plugin ``__init__.py`` wires these +functions into ``post_tool_call`` and ``on_session_end`` hooks so +tracking and cleanup happen automatically — the agent never needs to +call a tool or remember a skill. + +Rules: + - test files → delete immediately at task end (age >= 0) + - temp files → delete after 7 days + - cron-output → delete after 14 days + - empty dirs → always delete (under HERMES_HOME) + - research → keep 10 newest, prompt for older (deep only) + - chrome-profile→ prompt after 14 days (deep only) + - >500 MB files → prompt always (deep only) + +Scope: strictly HERMES_HOME and /tmp/hermes-* +Never touches: ~/.hermes/logs/ or any system directory. +""" + +from __future__ import annotations + +import json +import logging +import shutil +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +try: + from hermes_constants import get_hermes_home +except Exception: # pragma: no cover — plugin may load before constants resolves + import os + + def get_hermes_home() -> Path: # type: ignore[no-redef] + val = (os.environ.get("HERMES_HOME") or "").strip() + return Path(val).resolve() if val else (Path.home() / ".hermes").resolve() + + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Paths +# --------------------------------------------------------------------------- + +def get_state_dir() -> Path: + """State dir — separate from ``$HERMES_HOME/logs/``.""" + return get_hermes_home() / "disk-guardian" + + +def get_tracked_file() -> Path: + return get_state_dir() / "tracked.json" + + +def get_log_file() -> Path: + """Audit log — intentionally NOT under ``$HERMES_HOME/logs/``.""" + return get_state_dir() / "cleanup.log" + + +# --------------------------------------------------------------------------- +# Path safety +# --------------------------------------------------------------------------- + +def is_safe_path(path: Path) -> bool: + """Accept only paths under HERMES_HOME or ``/tmp/hermes-*``. + + Rejects Windows mounts (``/mnt/c`` etc.) and any system directory. + """ + hermes_home = get_hermes_home() + try: + path.resolve().relative_to(hermes_home) + return True + except (ValueError, OSError): + pass + # Allow /tmp/hermes-* explicitly + parts = path.parts + if len(parts) >= 3 and parts[1] == "tmp" and parts[2].startswith("hermes-"): + return True + return False + + +# --------------------------------------------------------------------------- +# Audit log +# --------------------------------------------------------------------------- + +def _log(message: str) -> None: + try: + log_file = get_log_file() + log_file.parent.mkdir(parents=True, exist_ok=True) + ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + with open(log_file, "a") as f: + f.write(f"[{ts}] {message}\n") + except OSError: + # Never let the audit log break the agent loop. + pass + + +# --------------------------------------------------------------------------- +# tracked.json — atomic read/write, backup scoped to tracked.json only +# --------------------------------------------------------------------------- + +def load_tracked() -> List[Dict[str, Any]]: + """Load tracked.json. Restores from ``.bak`` on corruption.""" + tf = get_tracked_file() + tf.parent.mkdir(parents=True, exist_ok=True) + + if not tf.exists(): + return [] + + try: + return json.loads(tf.read_text()) + except (json.JSONDecodeError, ValueError): + bak = tf.with_suffix(".json.bak") + if bak.exists(): + try: + data = json.loads(bak.read_text()) + _log("WARN: tracked.json corrupted — restored from .bak") + return data + except Exception: + pass + _log("WARN: tracked.json corrupted, no backup — starting fresh") + return [] + + +def save_tracked(tracked: List[Dict[str, Any]]) -> None: + """Atomic write: ``.tmp`` → backup old → rename.""" + tf = get_tracked_file() + tf.parent.mkdir(parents=True, exist_ok=True) + tmp = tf.with_suffix(".json.tmp") + tmp.write_text(json.dumps(tracked, indent=2)) + if tf.exists(): + shutil.copy2(tf, tf.with_suffix(".json.bak")) + tmp.replace(tf) + + +# --------------------------------------------------------------------------- +# Categories +# --------------------------------------------------------------------------- + +ALLOWED_CATEGORIES = { + "temp", "test", "research", "download", + "chrome-profile", "cron-output", "other", +} + + +def fmt_size(n: float) -> str: + for unit in ("B", "KB", "MB", "GB", "TB"): + if n < 1024: + return f"{n:.1f} {unit}" + n /= 1024 + return f"{n:.1f} PB" + + +# --------------------------------------------------------------------------- +# Track / forget +# --------------------------------------------------------------------------- + +def track(path_str: str, category: str, silent: bool = False) -> bool: + """Register a file for tracking. Returns True if newly tracked.""" + if category not in ALLOWED_CATEGORIES: + _log(f"WARN: unknown category '{category}', using 'other'") + category = "other" + + path = Path(path_str).resolve() + + if not path.exists(): + _log(f"SKIP: {path} (does not exist)") + return False + + if not is_safe_path(path): + _log(f"REJECT: {path} (outside HERMES_HOME)") + return False + + size = path.stat().st_size if path.is_file() else 0 + tracked = load_tracked() + + # Deduplicate + if any(item["path"] == str(path) for item in tracked): + return False + + tracked.append({ + "path": str(path), + "timestamp": datetime.now(timezone.utc).isoformat(), + "category": category, + "size": size, + }) + save_tracked(tracked) + _log(f"TRACKED: {path} ({category}, {fmt_size(size)})") + if not silent: + print(f"Tracked: {path} ({category}, {fmt_size(size)})") + return True + + +def forget(path_str: str) -> int: + """Remove a path from tracking without deleting the file.""" + p = Path(path_str).resolve() + tracked = load_tracked() + before = len(tracked) + tracked = [i for i in tracked if Path(i["path"]).resolve() != p] + removed = before - len(tracked) + if removed: + save_tracked(tracked) + _log(f"FORGOT: {p} ({removed} entries)") + return removed + + +# --------------------------------------------------------------------------- +# Dry run +# --------------------------------------------------------------------------- + +def dry_run() -> Tuple[List[Dict], List[Dict]]: + """Return (auto_delete_list, needs_prompt_list) without touching files.""" + tracked = load_tracked() + now = datetime.now(timezone.utc) + + auto: List[Dict] = [] + prompt: List[Dict] = [] + + for item in tracked: + p = Path(item["path"]) + if not p.exists(): + continue + age = (now - datetime.fromisoformat(item["timestamp"])).days + cat = item["category"] + size = item["size"] + + if cat == "test": + auto.append(item) + elif cat == "temp" and age > 7: + auto.append(item) + elif cat == "cron-output" and age > 14: + auto.append(item) + elif cat == "research" and age > 30: + prompt.append(item) + elif cat == "chrome-profile" and age > 14: + prompt.append(item) + elif size > 500 * 1024 * 1024: + prompt.append(item) + + return auto, prompt + + +# --------------------------------------------------------------------------- +# Quick cleanup +# --------------------------------------------------------------------------- + +def quick() -> Dict[str, Any]: + """Safe deterministic cleanup — no prompts. + + Returns: ``{"deleted": N, "empty_dirs": N, "freed": bytes, + "errors": [str, ...]}``. + """ + tracked = load_tracked() + now = datetime.now(timezone.utc) + deleted = 0 + freed = 0 + new_tracked: List[Dict] = [] + errors: List[str] = [] + + for item in tracked: + p = Path(item["path"]) + cat = item["category"] + + if not p.exists(): + _log(f"STALE: {p} (removed from tracking)") + continue + + age = (now - datetime.fromisoformat(item["timestamp"])).days + + should_delete = ( + cat == "test" + or (cat == "temp" and age > 7) + or (cat == "cron-output" and age > 14) + ) + + if should_delete: + try: + if p.is_file(): + p.unlink() + elif p.is_dir(): + shutil.rmtree(p) + freed += item["size"] + deleted += 1 + _log(f"DELETED: {p} ({cat}, {fmt_size(item['size'])})") + except OSError as e: + _log(f"ERROR deleting {p}: {e}") + errors.append(f"{p}: {e}") + new_tracked.append(item) + else: + new_tracked.append(item) + + # Remove empty dirs under HERMES_HOME (but leave HERMES_HOME itself and + # a short list of well-known top-level state dirs alone — a fresh install + # has these empty, and deleting them would surprise the user). + hermes_home = get_hermes_home() + _PROTECTED_TOP_LEVEL = { + "logs", "memories", "sessions", "cron", "cronjobs", + "cache", "skills", "plugins", "disk-guardian", "optional-skills", + "hermes-agent", "backups", "profiles", ".worktrees", + } + empty_removed = 0 + try: + for dirpath in sorted(hermes_home.rglob("*"), reverse=True): + if not dirpath.is_dir() or dirpath == hermes_home: + continue + try: + rel_parts = dirpath.relative_to(hermes_home).parts + except ValueError: + continue + # Skip the well-known top-level state dirs themselves. + if len(rel_parts) == 1 and rel_parts[0] in _PROTECTED_TOP_LEVEL: + continue + try: + if not any(dirpath.iterdir()): + dirpath.rmdir() + empty_removed += 1 + _log(f"DELETED: {dirpath} (empty dir)") + except OSError: + pass + except OSError: + pass + + save_tracked(new_tracked) + _log( + f"QUICK_SUMMARY: {deleted} files, {empty_removed} dirs, " + f"{fmt_size(freed)}" + ) + return { + "deleted": deleted, + "empty_dirs": empty_removed, + "freed": freed, + "errors": errors, + } + + +# --------------------------------------------------------------------------- +# Deep cleanup (interactive — not called from plugin hooks) +# --------------------------------------------------------------------------- + +def deep( + confirm: Optional[callable] = None, +) -> Dict[str, Any]: + """Deep cleanup. + + Runs :func:`quick` first, then asks the *confirm* callable for each + risky item (research > 30d beyond 10 newest, chrome-profile > 14d, + any file > 500 MB). *confirm(item)* must return True to delete. + + Returns: ``{"quick": {...}, "deep_deleted": N, "deep_freed": bytes}``. + """ + quick_result = quick() + + if confirm is None: + # No interactive confirmer — deep stops after the quick pass. + return {"quick": quick_result, "deep_deleted": 0, "deep_freed": 0} + + tracked = load_tracked() + now = datetime.now(timezone.utc) + research, chrome, large = [], [], [] + + for item in tracked: + p = Path(item["path"]) + if not p.exists(): + continue + age = (now - datetime.fromisoformat(item["timestamp"])).days + cat = item["category"] + + if cat == "research" and age > 30: + research.append(item) + elif cat == "chrome-profile" and age > 14: + chrome.append(item) + elif item["size"] > 500 * 1024 * 1024: + large.append(item) + + research.sort(key=lambda x: x["timestamp"], reverse=True) + old_research = research[10:] + + freed, count = 0, 0 + to_remove: List[Dict] = [] + + for group in (old_research, chrome, large): + for item in group: + if confirm(item): + try: + p = Path(item["path"]) + if p.is_file(): + p.unlink() + elif p.is_dir(): + shutil.rmtree(p) + to_remove.append(item) + freed += item["size"] + count += 1 + _log( + f"DELETED: {p} ({item['category']}, " + f"{fmt_size(item['size'])})" + ) + except OSError as e: + _log(f"ERROR deleting {item['path']}: {e}") + + if to_remove: + remove_paths = {i["path"] for i in to_remove} + save_tracked([i for i in tracked if i["path"] not in remove_paths]) + + return {"quick": quick_result, "deep_deleted": count, "deep_freed": freed} + + +# --------------------------------------------------------------------------- +# Status +# --------------------------------------------------------------------------- + +def status() -> Dict[str, Any]: + """Return per-category breakdown and top 10 largest tracked files.""" + tracked = load_tracked() + cats: Dict[str, Dict] = {} + for item in tracked: + c = item["category"] + cats.setdefault(c, {"count": 0, "size": 0}) + cats[c]["count"] += 1 + cats[c]["size"] += item["size"] + + existing = [ + (i["path"], i["size"], i["category"]) + for i in tracked if Path(i["path"]).exists() + ] + existing.sort(key=lambda x: x[1], reverse=True) + + return { + "categories": cats, + "top10": existing[:10], + "total_tracked": len(tracked), + } + + +def format_status(s: Dict[str, Any]) -> str: + """Human-readable status string (for slash command output).""" + lines = [f"{'Category':<20} {'Files':>6} {'Size':>10}", "-" * 40] + cats = s["categories"] + for cat, d in sorted(cats.items(), key=lambda x: x[1]["size"], reverse=True): + lines.append(f"{cat:<20} {d['count']:>6} {fmt_size(d['size']):>10}") + + if not cats: + lines.append("(nothing tracked yet)") + + lines.append("") + lines.append("Top 10 largest tracked files:") + if not s["top10"]: + lines.append(" (none)") + else: + for rank, (path, size, cat) in enumerate(s["top10"], 1): + lines.append(f" {rank:>2}. {fmt_size(size):>8} [{cat}] {path}") + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# Auto-categorisation from tool-call inspection +# --------------------------------------------------------------------------- + +_TEST_PATTERNS = ("test_", "tmp_") +_TEST_SUFFIXES = (".test.py", ".test.js", ".test.ts", ".test.md") + + +def guess_category(path: Path) -> Optional[str]: + """Return a category label for *path*, or None if we shouldn't track it. + + Used by the ``post_tool_call`` hook to auto-track ephemeral files. + """ + if not is_safe_path(path): + return None + + # Skip the state dir itself, logs, memory files, sessions, config. + hermes_home = get_hermes_home() + try: + rel = path.resolve().relative_to(hermes_home) + top = rel.parts[0] if rel.parts else "" + if top in { + "disk-guardian", "logs", "memories", "sessions", "config.yaml", + "skills", "plugins", ".env", "USER.md", "MEMORY.md", "SOUL.md", + "auth.json", "hermes-agent", + }: + return None + if top == "cron" or top == "cronjobs": + return "cron-output" + if top == "cache": + return "temp" + except ValueError: + # Path isn't under HERMES_HOME (e.g. /tmp/hermes-*) — fall through. + pass + + name = path.name + if name.startswith(_TEST_PATTERNS): + return "test" + if any(name.endswith(sfx) for sfx in _TEST_SUFFIXES): + return "test" + return None diff --git a/plugins/disk-guardian/plugin.yaml b/plugins/disk-guardian/plugin.yaml new file mode 100644 index 000000000..f26f0bae6 --- /dev/null +++ b/plugins/disk-guardian/plugin.yaml @@ -0,0 +1,7 @@ +name: disk-guardian +version: 2.0.0 +description: "Auto-track and clean up ephemeral files (test scripts, temp outputs, cron logs) created during Hermes sessions. Runs via plugin hooks — no agent action required." +author: "@LVT382009 (original), NousResearch (plugin port)" +hooks: + - post_tool_call + - on_session_end diff --git a/tests/conftest.py b/tests/conftest.py index ca4a9a970..50fc3f213 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -243,6 +243,11 @@ def _hermetic_environment(tmp_path, monkeypatch): # 5. Reset plugin singleton so tests don't leak plugins from # ~/.hermes/plugins/ (which, per step 3, is now empty — but the # singleton might still be cached from a previous test). + # Also disable bundled-plugin discovery by default so the + # repo-shipped /plugins// dirs don't appear in tests + # that assume an empty plugin set. Tests that specifically exercise + # bundled discovery can clear this var explicitly. + monkeypatch.setenv("HERMES_DISABLE_BUNDLED_PLUGINS", "1") try: import hermes_cli.plugins as _plugins_mod monkeypatch.setattr(_plugins_mod, "_plugin_manager", None) diff --git a/tests/plugins/test_disk_guardian_plugin.py b/tests/plugins/test_disk_guardian_plugin.py new file mode 100644 index 000000000..1ea0aba7a --- /dev/null +++ b/tests/plugins/test_disk_guardian_plugin.py @@ -0,0 +1,426 @@ +"""Tests for the disk-guardian plugin. + +Covers the bundled plugin at ``plugins/disk-guardian/``: + + * ``disk_guardian`` library: track / forget / dry_run / quick / status, + ``is_safe_path`` and ``guess_category`` filtering. + * Plugin ``__init__``: ``post_tool_call`` hook auto-tracks files created + by ``write_file`` / ``terminal``; ``on_session_end`` hook runs quick + cleanup when anything was tracked during the turn. + * Slash command handler: status / dry-run / quick / track / forget / + unknown subcommand behaviours. + * Bundled-plugin discovery via ``PluginManager.discover_and_load``. +""" + +import importlib +import json +import os +import sys +from pathlib import Path + +import pytest + + +@pytest.fixture(autouse=True) +def _isolate_env(tmp_path, monkeypatch): + """Isolate HERMES_HOME + clear plugin module cache for each test.""" + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + # Drop the disk-guardian modules so each test re-imports fresh. + for mod in list(sys.modules.keys()): + if mod.startswith("hermes_plugins.disk_guardian") or mod == "plugins.disk_guardian": + del sys.modules[mod] + yield hermes_home + + +def _load_lib(): + """Import the plugin's library module directly from the repo path.""" + repo_root = Path(__file__).resolve().parents[2] + lib_path = repo_root / "plugins" / "disk-guardian" / "disk_guardian.py" + spec = importlib.util.spec_from_file_location( + "disk_guardian_under_test", lib_path + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +def _load_plugin_init(): + """Import the plugin's __init__.py (which depends on the library).""" + repo_root = Path(__file__).resolve().parents[2] + plugin_dir = repo_root / "plugins" / "disk-guardian" + # Use the PluginManager's module naming convention so relative imports work. + spec = importlib.util.spec_from_file_location( + "hermes_plugins.disk_guardian", + plugin_dir / "__init__.py", + submodule_search_locations=[str(plugin_dir)], + ) + # Ensure parent namespace package exists for the relative `. import disk_guardian` + import types + if "hermes_plugins" not in sys.modules: + ns = types.ModuleType("hermes_plugins") + ns.__path__ = [] + sys.modules["hermes_plugins"] = ns + mod = importlib.util.module_from_spec(spec) + mod.__package__ = "hermes_plugins.disk_guardian" + mod.__path__ = [str(plugin_dir)] + sys.modules["hermes_plugins.disk_guardian"] = mod + spec.loader.exec_module(mod) + return mod + + +# --------------------------------------------------------------------------- +# Library tests +# --------------------------------------------------------------------------- + +class TestIsSafePath: + def test_accepts_path_under_hermes_home(self, _isolate_env): + dg = _load_lib() + p = _isolate_env / "subdir" / "file.txt" + p.parent.mkdir() + p.write_text("x") + assert dg.is_safe_path(p) is True + + def test_rejects_outside_hermes_home(self, _isolate_env): + dg = _load_lib() + assert dg.is_safe_path(Path("/etc/passwd")) is False + + def test_accepts_tmp_hermes_prefix(self, _isolate_env, tmp_path): + dg = _load_lib() + assert dg.is_safe_path(Path("/tmp/hermes-abc/x.log")) is True + + def test_rejects_plain_tmp(self, _isolate_env): + dg = _load_lib() + assert dg.is_safe_path(Path("/tmp/other.log")) is False + + def test_rejects_windows_mount(self, _isolate_env): + dg = _load_lib() + assert dg.is_safe_path(Path("/mnt/c/Users/x/test.txt")) is False + + +class TestGuessCategory: + def test_test_prefix(self, _isolate_env): + dg = _load_lib() + p = _isolate_env / "test_foo.py" + p.write_text("x") + assert dg.guess_category(p) == "test" + + def test_tmp_prefix(self, _isolate_env): + dg = _load_lib() + p = _isolate_env / "tmp_foo.log" + p.write_text("x") + assert dg.guess_category(p) == "test" + + def test_dot_test_suffix(self, _isolate_env): + dg = _load_lib() + p = _isolate_env / "mything.test.js" + p.write_text("x") + assert dg.guess_category(p) == "test" + + def test_skips_protected_top_level(self, _isolate_env): + dg = _load_lib() + logs_dir = _isolate_env / "logs" + logs_dir.mkdir() + p = logs_dir / "test_log.txt" + p.write_text("x") + # Even though it matches test_* pattern, logs/ is excluded. + assert dg.guess_category(p) is None + + def test_cron_subtree_categorised(self, _isolate_env): + dg = _load_lib() + cron_dir = _isolate_env / "cron" + cron_dir.mkdir() + p = cron_dir / "job_output.md" + p.write_text("x") + assert dg.guess_category(p) == "cron-output" + + def test_ordinary_file_returns_none(self, _isolate_env): + dg = _load_lib() + p = _isolate_env / "notes.md" + p.write_text("x") + assert dg.guess_category(p) is None + + +class TestTrackForgetQuick: + def test_track_then_quick_deletes_test(self, _isolate_env): + dg = _load_lib() + p = _isolate_env / "test_a.py" + p.write_text("x") + assert dg.track(str(p), "test", silent=True) is True + summary = dg.quick() + assert summary["deleted"] == 1 + assert not p.exists() + + def test_track_dedup(self, _isolate_env): + dg = _load_lib() + p = _isolate_env / "test_a.py" + p.write_text("x") + assert dg.track(str(p), "test", silent=True) is True + # Second call returns False (already tracked) + assert dg.track(str(p), "test", silent=True) is False + + def test_track_rejects_outside_home(self, _isolate_env): + dg = _load_lib() + # /etc/hostname exists on most Linux boxes; fall back if not. + outside = "/etc/hostname" if Path("/etc/hostname").exists() else "/etc/passwd" + assert dg.track(outside, "test", silent=True) is False + + def test_track_skips_missing(self, _isolate_env): + dg = _load_lib() + assert dg.track(str(_isolate_env / "nope.txt"), "test", silent=True) is False + + def test_forget_removes_entry(self, _isolate_env): + dg = _load_lib() + p = _isolate_env / "keep.tmp" + p.write_text("x") + dg.track(str(p), "temp", silent=True) + assert dg.forget(str(p)) == 1 + assert p.exists() # forget does NOT delete the file + + def test_quick_preserves_unexpired_temp(self, _isolate_env): + dg = _load_lib() + p = _isolate_env / "fresh.tmp" + p.write_text("x") + dg.track(str(p), "temp", silent=True) + summary = dg.quick() + assert summary["deleted"] == 0 + assert p.exists() + + def test_quick_preserves_protected_top_level_dirs(self, _isolate_env): + dg = _load_lib() + for d in ("logs", "memories", "sessions", "cron", "cache"): + (_isolate_env / d).mkdir() + dg.quick() + for d in ("logs", "memories", "sessions", "cron", "cache"): + assert (_isolate_env / d).exists(), f"{d}/ should be preserved" + + +class TestStatus: + def test_empty_status(self, _isolate_env): + dg = _load_lib() + s = dg.status() + assert s["total_tracked"] == 0 + assert s["top10"] == [] + + def test_status_with_entries(self, _isolate_env): + dg = _load_lib() + p = _isolate_env / "big.tmp" + p.write_text("y" * 100) + dg.track(str(p), "temp", silent=True) + s = dg.status() + assert s["total_tracked"] == 1 + assert len(s["top10"]) == 1 + rendered = dg.format_status(s) + assert "temp" in rendered + assert "big.tmp" in rendered + + +class TestDryRun: + def test_classifies_by_category(self, _isolate_env): + dg = _load_lib() + test_f = _isolate_env / "test_x.py" + test_f.write_text("x") + big = _isolate_env / "big.bin" + big.write_bytes(b"z" * 10) + dg.track(str(test_f), "test", silent=True) + dg.track(str(big), "other", silent=True) + auto, prompt = dg.dry_run() + # test → auto, other → neither (doesn't hit any rule) + assert any(i["path"] == str(test_f) for i in auto) + + +# --------------------------------------------------------------------------- +# Plugin hooks tests +# --------------------------------------------------------------------------- + +class TestPostToolCallHook: + def test_write_file_test_pattern_tracked(self, _isolate_env): + pi = _load_plugin_init() + p = _isolate_env / "test_created.py" + p.write_text("x") + pi._on_post_tool_call( + tool_name="write_file", + args={"path": str(p), "content": "x"}, + result="OK", + task_id="t1", session_id="s1", + ) + tracked_file = _isolate_env / "disk-guardian" / "tracked.json" + data = json.loads(tracked_file.read_text()) + assert len(data) == 1 + assert data[0]["category"] == "test" + + def test_write_file_non_test_not_tracked(self, _isolate_env): + pi = _load_plugin_init() + p = _isolate_env / "notes.md" + p.write_text("x") + pi._on_post_tool_call( + tool_name="write_file", + args={"path": str(p), "content": "x"}, + result="OK", + task_id="t2", session_id="s2", + ) + tracked_file = _isolate_env / "disk-guardian" / "tracked.json" + assert not tracked_file.exists() or tracked_file.read_text().strip() == "[]" + + def test_terminal_command_picks_up_paths(self, _isolate_env): + pi = _load_plugin_init() + p = _isolate_env / "tmp_created.log" + p.write_text("x") + pi._on_post_tool_call( + tool_name="terminal", + args={"command": f"touch {p}"}, + result=f"created {p}\n", + task_id="t3", session_id="s3", + ) + tracked_file = _isolate_env / "disk-guardian" / "tracked.json" + data = json.loads(tracked_file.read_text()) + assert any(Path(i["path"]) == p.resolve() for i in data) + + def test_ignores_unrelated_tool(self, _isolate_env): + pi = _load_plugin_init() + pi._on_post_tool_call( + tool_name="read_file", + args={"path": str(_isolate_env / "test_x.py")}, + result="contents", + task_id="t4", session_id="s4", + ) + # read_file should never trigger tracking. + tracked_file = _isolate_env / "disk-guardian" / "tracked.json" + assert not tracked_file.exists() or tracked_file.read_text().strip() == "[]" + + +class TestOnSessionEndHook: + def test_runs_quick_when_test_files_tracked(self, _isolate_env): + pi = _load_plugin_init() + p = _isolate_env / "test_cleanup.py" + p.write_text("x") + pi._on_post_tool_call( + tool_name="write_file", + args={"path": str(p), "content": "x"}, + result="OK", + task_id="", session_id="s1", + ) + assert p.exists() + pi._on_session_end(session_id="s1", completed=True, interrupted=False) + assert not p.exists(), "test file should be auto-deleted" + + def test_noop_when_no_test_tracked(self, _isolate_env): + pi = _load_plugin_init() + # Nothing tracked → on_session_end should not raise. + pi._on_session_end(session_id="empty", completed=True, interrupted=False) + + +# --------------------------------------------------------------------------- +# Slash command +# --------------------------------------------------------------------------- + +class TestSlashCommand: + def test_help(self, _isolate_env): + pi = _load_plugin_init() + out = pi._handle_slash("help") + assert "disk-guardian" in out + assert "status" in out + + def test_status_empty(self, _isolate_env): + pi = _load_plugin_init() + out = pi._handle_slash("status") + assert "nothing tracked" in out + + def test_track_rejects_missing(self, _isolate_env): + pi = _load_plugin_init() + out = pi._handle_slash( + f"track {_isolate_env / 'nope.txt'} temp" + ) + assert "Not tracked" in out + + def test_track_rejects_bad_category(self, _isolate_env): + pi = _load_plugin_init() + p = _isolate_env / "a.tmp" + p.write_text("x") + out = pi._handle_slash(f"track {p} banana") + assert "Unknown category" in out + + def test_track_and_forget(self, _isolate_env): + pi = _load_plugin_init() + p = _isolate_env / "a.tmp" + p.write_text("x") + out = pi._handle_slash(f"track {p} temp") + assert "Tracked" in out + out = pi._handle_slash(f"forget {p}") + assert "Removed 1" in out + + def test_unknown_subcommand(self, _isolate_env): + pi = _load_plugin_init() + out = pi._handle_slash("foobar") + assert "Unknown subcommand" in out + + def test_quick_on_empty(self, _isolate_env): + pi = _load_plugin_init() + out = pi._handle_slash("quick") + assert "Cleaned 0 files" in out + + +# --------------------------------------------------------------------------- +# Bundled-plugin discovery +# --------------------------------------------------------------------------- + +class TestBundledDiscovery: + def test_disk_guardian_is_discovered_as_bundled(self, _isolate_env, monkeypatch): + # The default hermetic conftest disables bundled plugin discovery. + # This test specifically exercises it, so clear the suppression. + monkeypatch.delenv("HERMES_DISABLE_BUNDLED_PLUGINS", raising=False) + # Reset plugin manager state so discovery runs fresh. + for mod in list(sys.modules.keys()): + if mod.startswith("hermes_cli.plugins") or mod == "plugins": + del sys.modules[mod] + + repo_root = Path(__file__).resolve().parents[2] + sys.path.insert(0, str(repo_root)) + try: + from hermes_cli import plugins as pmod + mgr = pmod.PluginManager() + mgr.discover_and_load() + assert "disk-guardian" in mgr._plugins + loaded = mgr._plugins["disk-guardian"] + assert loaded.manifest.source == "bundled" + assert loaded.enabled + assert "post_tool_call" in loaded.hooks_registered + assert "on_session_end" in loaded.hooks_registered + assert "disk-guardian" in loaded.commands_registered + finally: + sys.path.pop(0) + + def test_memory_and_context_engine_subdirs_skipped(self, _isolate_env, monkeypatch): + """Bundled scan must NOT pick up plugins/memory or plugins/context_engine + as top-level plugins — they have their own discovery paths.""" + monkeypatch.delenv("HERMES_DISABLE_BUNDLED_PLUGINS", raising=False) + for mod in list(sys.modules.keys()): + if mod.startswith("hermes_cli.plugins") or mod == "plugins": + del sys.modules[mod] + repo_root = Path(__file__).resolve().parents[2] + sys.path.insert(0, str(repo_root)) + try: + from hermes_cli import plugins as pmod + mgr = pmod.PluginManager() + mgr.discover_and_load() + assert "memory" not in mgr._plugins + assert "context_engine" not in mgr._plugins + finally: + sys.path.pop(0) + + def test_bundled_scan_suppressed_by_env_var(self, _isolate_env, monkeypatch): + """HERMES_DISABLE_BUNDLED_PLUGINS=1 suppresses bundled discovery.""" + monkeypatch.setenv("HERMES_DISABLE_BUNDLED_PLUGINS", "1") + for mod in list(sys.modules.keys()): + if mod.startswith("hermes_cli.plugins") or mod == "plugins": + del sys.modules[mod] + repo_root = Path(__file__).resolve().parents[2] + sys.path.insert(0, str(repo_root)) + try: + from hermes_cli import plugins as pmod + mgr = pmod.PluginManager() + mgr.discover_and_load() + assert "disk-guardian" not in mgr._plugins + finally: + sys.path.pop(0)