From 32e6baea31f96b064ce0d847534c507d006858b6 Mon Sep 17 00:00:00 2001 From: Nox Date: Sun, 19 Apr 2026 05:20:59 +0700 Subject: [PATCH] Update disk_guardian.py --- .../devops/disk-guardian/disk_guardian.py | 805 +++++++++--------- 1 file changed, 408 insertions(+), 397 deletions(-) diff --git a/optional-skills/devops/disk-guardian/disk_guardian.py b/optional-skills/devops/disk-guardian/disk_guardian.py index 8c1473779..cd0dad547 100755 --- a/optional-skills/devops/disk-guardian/disk_guardian.py +++ b/optional-skills/devops/disk-guardian/disk_guardian.py @@ -1,494 +1,505 @@ #!/usr/bin/env python3 """ -Disk Guardian - Autonomous disk cleanup for Hermes Agent +disk_guardian.py v1.2.0 — ephemeral file cleanup for Hermes Agent -Tracks files created by Hermes and safely removes stale ones. +Tracks and removes temp outputs, test artifacts, cron logs, and stale +chrome profiles created during Hermes sessions. + +Rules: + - test files → delete immediately at task end (age > 0) + - temp files → delete after 7 days + - cron-output → delete after 14 days + - empty dirs → always delete + - research → keep 10 newest, prompt for older (deep only) + - chrome-profile→ prompt after 14 days (deep only) + - >500 MB files → prompt always (deep only) + +Scope: strictly HERMES_HOME and /tmp/hermes-* +Never touches: ~/.hermes/logs/ or any system directory """ import argparse import json import os -import sys -import subprocess -import shlex -from pathlib import Path -from datetime import datetime, timezone -from typing import Dict, List, Optional, Any -import fcntl import shutil +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional +# --------------------------------------------------------------------------- +# Paths +# --------------------------------------------------------------------------- def get_hermes_home() -> Path: - """Return the Hermes home directory (default: ~/.hermes).""" + """Return HERMES_HOME, defaulting to ~/.hermes.""" val = os.environ.get("HERMES_HOME", "").strip() - return Path(val) if val else Path.home() / ".hermes" + return Path(val).resolve() if val else (Path.home() / ".hermes").resolve() -def get_disk_guardian_dir() -> Path: - """Return the disk-guardian directory.""" +def get_state_dir() -> Path: + """State dir — separate from ~/.hermes/logs/.""" return get_hermes_home() / "disk-guardian" def get_tracked_file() -> Path: - """Return the tracked.json file path.""" - return get_disk_guardian_dir() / "tracked.json" + return get_state_dir() / "tracked.json" def get_log_file() -> Path: - """Return the cleanup.log file path.""" - return get_disk_guardian_dir() / "cleanup.log" + """Audit log — NOT ~/.hermes/logs/.""" + return get_state_dir() / "cleanup.log" +# --------------------------------------------------------------------------- +# WSL + path safety +# --------------------------------------------------------------------------- + def is_wsl() -> bool: - """Check if running in WSL.""" try: - with open("/proc/version", "r") as f: - return "microsoft" in f.read().lower() + return "microsoft" in Path("/proc/version").read_text().lower() except Exception: return False -def log_message(message: str) -> None: - """Write a message to the cleanup log.""" +def _is_safe_path(path: Path) -> bool: + """ + Accept only paths under HERMES_HOME or /tmp/hermes-*. + Rejects Windows mounts (/mnt/c etc.) and system directories. + """ + hermes_home = get_hermes_home() + try: + path.relative_to(hermes_home) + return True + except ValueError: + pass + # Allow /tmp/hermes-* explicitly + parts = path.parts + if len(parts) >= 3 and parts[1] == "tmp" and parts[2].startswith("hermes-"): + return True + return False + + +# --------------------------------------------------------------------------- +# Audit log — writes only to disk-guardian/cleanup.log +# --------------------------------------------------------------------------- + +def _log(message: str) -> None: log_file = get_log_file() log_file.parent.mkdir(parents=True, exist_ok=True) - timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") with open(log_file, "a") as f: - f.write(f"[{timestamp}] {message}\n") + f.write(f"[{ts}] {message}\n") +# --------------------------------------------------------------------------- +# tracked.json — atomic read/write, backup scoped to tracked.json only +# --------------------------------------------------------------------------- + def load_tracked() -> List[Dict[str, Any]]: - """Load tracked.json with error handling.""" - tracked_file = get_tracked_file() - tracked_file.parent.mkdir(parents=True, exist_ok=True) + """ + Load tracked.json. + Corruption recovery: restore from .bak — never touches ~/.hermes/logs/. + """ + tf = get_tracked_file() + tf.parent.mkdir(parents=True, exist_ok=True) - if not tracked_file.exists(): + if not tf.exists(): return [] try: - with open(tracked_file, "r") as f: - return json.load(f) - except json.JSONDecodeError: - # Try to restore from backup - backup_file = tracked_file.with_suffix(".json.bak") - if backup_file.exists(): - log_message("Tracking file corrupted, restoring from backup") - with open(backup_file, "r") as f: - return json.load(f) - log_message("Tracking file corrupted, starting fresh") + return json.loads(tf.read_text()) + except (json.JSONDecodeError, ValueError): + bak = tf.with_suffix(".json.bak") + if bak.exists(): + try: + data = json.loads(bak.read_text()) + _log("WARN: tracked.json corrupted — restored from .bak") + print("Warning: tracking file corrupted, restored from backup.") + return data + except Exception: + pass + _log("WARN: tracked.json corrupted, no backup — starting fresh") + print("Warning: tracking file corrupted, starting fresh.") return [] def save_tracked(tracked: List[Dict[str, Any]]) -> None: - """Save tracked.json with atomic write and backup.""" - tracked_file = get_tracked_file() - tracked_file.parent.mkdir(parents=True, exist_ok=True) - - # Create backup - if tracked_file.exists(): - backup_file = tracked_file.with_suffix(".json.bak") - shutil.copy2(tracked_file, backup_file) - - # Atomic write - temp_file = tracked_file.with_suffix(".json.tmp") - with open(temp_file, "w") as f: - json.dump(tracked, f, indent=2) - temp_file.replace(tracked_file) + """Atomic write: .tmp → backup old → rename.""" + tf = get_tracked_file() + tf.parent.mkdir(parents=True, exist_ok=True) + tmp = tf.with_suffix(".json.tmp") + tmp.write_text(json.dumps(tracked, indent=2)) + if tf.exists(): + shutil.copy2(tf, tf.with_suffix(".json.bak")) + tmp.replace(tf) -def track_path(path: str, category: str) -> None: - """Add a path to tracking.""" - allowed_categories = ["temp", "test", "research", "download", "chrome-profile", "cron-output", "other"] - if category not in allowed_categories: - log_message(f"Unknown category '{category}', using 'other'") +# --------------------------------------------------------------------------- +# Allowed categories +# --------------------------------------------------------------------------- + +ALLOWED_CATEGORIES = { + "temp", "test", "research", "download", + "chrome-profile", "cron-output", "other", +} + +# --------------------------------------------------------------------------- +# Commands +# --------------------------------------------------------------------------- + +def cmd_track(path_str: str, category: str) -> None: + """Register a file for tracking.""" + if category not in ALLOWED_CATEGORIES: + print(f"Unknown category '{category}', using 'other'.") + _log(f"WARN: unknown category '{category}', using 'other'") category = "other" - path_obj = Path(path).resolve() - if not path_obj.exists(): - log_message(f"Path {path} does not exist, skipping") + path = Path(path_str).resolve() + + if not path.exists(): + print(f"Path does not exist, skipping: {path}") + _log(f"SKIP: {path} (does not exist)") return - # Check if path is under Hermes home - hermes_home = get_hermes_home().resolve() - try: - path_obj.relative_to(hermes_home) - except ValueError: - log_message(f"Path {path} is outside Hermes home, skipping") + if not _is_safe_path(path): + print(f"Rejected: path is outside HERMES_HOME — {path}") + _log(f"REJECT: {path} (outside HERMES_HOME)") return - # Get file size - size = path_obj.stat().st_size if path_obj.is_file() else 0 - + size = path.stat().st_size if path.is_file() else 0 tracked = load_tracked() + + # Deduplicate + if any(item["path"] == str(path) for item in tracked): + print(f"Already tracked: {path}") + return + tracked.append({ - "path": str(path_obj), + "path": str(path), "timestamp": datetime.now(timezone.utc).isoformat(), "category": category, - "size": size + "size": size, }) save_tracked(tracked) - log_message(f"TRACKED: {path} ({category}, {size} bytes)") - print(f"Tracked: {path} ({category}, {size} bytes)") + _log(f"TRACKED: {path} ({category}, {_fmt(size)})") + print(f"Tracked: {path} ({category}, {_fmt(size)})") -def scan_files() -> None: - """Discover temp/test files by pattern and add to tracking.""" - hermes_home = get_hermes_home() +def cmd_dry_run() -> None: + """Show what would be deleted — no files touched.""" tracked = load_tracked() - tracked_paths = {item["path"] for item in tracked} - - # Scan for temp files - temp_patterns = [ - hermes_home / "cache" / "hermes" / "*", - Path("/tmp") / "hermes-*" - ] - - for pattern in temp_patterns: - for path in pattern.parent.glob(pattern.name): - if str(path.resolve()) not in tracked_paths and path.exists(): - track_path(str(path), "temp") - - # Scan for test files - test_patterns = [ - hermes_home / "test_*.py", - hermes_home / "*.test.log", - hermes_home / "tmp_*.json" - ] - - for pattern in test_patterns: - for path in hermes_home.glob(pattern.name): - if str(path.resolve()) not in tracked_paths and path.exists(): - track_path(str(path), "test") - - print(f"Scan complete. Total tracked files: {len(load_tracked())}") - - -def dry_run() -> None: - """Preview what would be deleted without touching anything.""" - hermes_home = get_hermes_home() - tracked = load_tracked() - - # Categorize files by age now = datetime.now(timezone.utc) - temp_files = [] - test_files = [] - research_folders = [] - large_files = [] - chrome_profiles = [] + + auto: List[Dict] = [] + prompt: List[Dict] = [] for item in tracked: - path = Path(item["path"]) - if not path.exists(): + p = Path(item["path"]) + if not p.exists(): continue - - timestamp = datetime.fromisoformat(item["timestamp"]) - age_days = (now - timestamp).days - - if item["category"] == "temp" and age_days > 7: - temp_files.append(item) - elif item["category"] == "test" and age_days > 3: - test_files.append(item) - elif item["category"] == "research" and age_days > 30: - research_folders.append(item) - elif item["size"] > 500 * 1024 * 1024: # > 500 MB - large_files.append(item) - elif item["category"] == "chrome-profile" and age_days > 14: - chrome_profiles.append(item) - - # Calculate sizes - temp_size = sum(item["size"] for item in temp_files) - test_size = sum(item["size"] for item in test_files) - research_size = sum(item["size"] for item in research_folders) - large_size = sum(item["size"] for item in large_files) - chrome_size = sum(item["size"] for item in chrome_profiles) - - print("Dry-run results:") - print(f"Would delete {len(temp_files)} temp files ({format_size(temp_size)})") - print(f"Would delete {len(test_files)} test files ({format_size(test_size)})") - print(f"Would prompt for {len(research_folders)} research folders ({format_size(research_size)})") - print(f"Would prompt for {len(large_files)} large files ({format_size(large_size)})") - print(f"Would prompt for {len(chrome_profiles)} chrome profiles ({format_size(chrome_size)})") - - total_size = temp_size + test_size + research_size + large_size + chrome_size - print(f"\nTotal potential cleanup: {format_size(total_size)}") - print("Run 'quick' for safe auto-cleanup") - print("Run 'deep' for full cleanup with confirmation") - - -def quick_cleanup() -> None: - """Safe fast clean, no confirmation needed.""" - hermes_home = get_hermes_home() - tracked = load_tracked() - now = datetime.now(timezone.utc) - - deleted_files = [] - total_freed = 0 - - # Delete temp files > 7 days - for item in tracked[:]: - if item["category"] == "temp": - path = Path(item["path"]) - if not path.exists(): - tracked.remove(item) - continue - - timestamp = datetime.fromisoformat(item["timestamp"]) - age_days = (now - timestamp).days - - if age_days > 7: - try: - if path.is_file(): - path.unlink() - elif path.is_dir(): - shutil.rmtree(path) - deleted_files.append(item) - total_freed += item["size"] - tracked.remove(item) - log_message(f"DELETED: {item['path']} (temp, {item['size']} bytes)") - except Exception as e: - log_message(f"ERROR deleting {item['path']}: {e}") - - # Delete test files > 3 days - for item in tracked[:]: - if item["category"] == "test": - path = Path(item["path"]) - if not path.exists(): - tracked.remove(item) - continue - - timestamp = datetime.fromisoformat(item["timestamp"]) - age_days = (now - timestamp).days - - if age_days > 3: - try: - if path.is_file(): - path.unlink() - elif path.is_dir(): - shutil.rmtree(path) - deleted_files.append(item) - total_freed += item["size"] - tracked.remove(item) - log_message(f"DELETED: {item['path']} (test, {item['size']} bytes)") - except Exception as e: - log_message(f"ERROR deleting {item['path']}: {e}") - - # Delete empty directories - for root, dirs, files in os.walk(hermes_home, topdown=False): - for dir_name in dirs: - dir_path = Path(root) / dir_name - try: - if dir_path.is_dir() and not any(dir_path.iterdir()): - dir_path.rmdir() - log_message(f"DELETED: {dir_path} (empty directory)") - except Exception as e: - pass # Directory not empty or permission denied - - save_tracked(tracked) - print(f"Deleted {len(deleted_files)} files, freed {format_size(total_freed)}") - - -def deep_cleanup() -> None: - """Full scan with confirmation for risky items.""" - hermes_home = get_hermes_home() - tracked = load_tracked() - now = datetime.now(timezone.utc) - - # First, do quick cleanup - print("Running quick cleanup first...") - quick_cleanup() - tracked = load_tracked() - - # Now handle risky items - research_folders = [] - large_files = [] - chrome_profiles = [] - - for item in tracked: - path = Path(item["path"]) - if not path.exists(): - continue - - timestamp = datetime.fromisoformat(item["timestamp"]) - age_days = (now - timestamp).days - - if item["category"] == "research" and age_days > 30: - research_folders.append(item) - elif item["size"] > 500 * 1024 * 1024: # > 500 MB - large_files.append(item) - elif item["category"] == "chrome-profile" and age_days > 14: - chrome_profiles.append(item) - - # Keep last 10 research folders - research_folders.sort(key=lambda x: x["timestamp"], reverse=True) - old_research = research_folders[10:] - - total_freed = 0 - deleted_count = 0 - - # Prompt for old research folders - for item in old_research: - path = Path(item["path"]) - response = input(f"Delete old research folder: {path}? [y/N] ") - if response.lower() == "y": - try: - if path.exists(): - shutil.rmtree(path) - total_freed += item["size"] - deleted_count += 1 - tracked.remove(item) - log_message(f"DELETED: {item['path']} (research, {item['size']} bytes)") - print(f"Deleted: {path} ({format_size(item['size'])})") - except Exception as e: - log_message(f"ERROR deleting {item['path']}: {e}") - print(f"Error deleting {path}: {e}") - - # Prompt for large files - for item in large_files: - path = Path(item["path"]) - print(f"\nLarge file: {path} ({format_size(item['size'])})") - print("Category:", item["category"]) - response = input("Delete this file? [y/N] ") - if response.lower() == "y": - try: - if path.exists(): - path.unlink() - total_freed += item["size"] - deleted_count += 1 - tracked.remove(item) - log_message(f"DELETED: {item['path']} (large file, {item['size']} bytes)") - print(f"Deleted: {path}") - except Exception as e: - log_message(f"ERROR deleting {item['path']}: {e}") - print(f"Error deleting {path}: {e}") - - # Prompt for chrome profiles - for item in chrome_profiles: - path = Path(item["path"]) - print(f"\nChrome profile: {path} ({format_size(item['size'])})") - response = input("Delete this chrome profile? [y/N] ") - if response.lower() == "y": - try: - if path.exists(): - shutil.rmtree(path) - total_freed += item["size"] - deleted_count += 1 - tracked.remove(item) - log_message(f"DELETED: {item['path']} (chrome-profile, {item['size']} bytes)") - print(f"Deleted: {path}") - except Exception as e: - log_message(f"ERROR deleting {item['path']}: {e}") - print(f"Error deleting {path}: {e}") - - save_tracked(tracked) - print(f"\nSummary: Deleted {deleted_count} items, freed {format_size(total_freed)}") - - -def show_status() -> None: - """Show disk usage breakdown by category + top 10 largest files.""" - tracked = load_tracked() - - # Calculate usage by category - categories = {} - for item in tracked: + age = (now - datetime.fromisoformat(item["timestamp"])).days cat = item["category"] - if cat not in categories: - categories[cat] = {"count": 0, "size": 0} - categories[cat]["count"] += 1 - categories[cat]["size"] += item["size"] + size = item["size"] - print("Disk usage by category:") - print(f"{'Category':<20} {'Files':<10} {'Size':<15}") - print("-" * 45) - for cat, data in sorted(categories.items(), key=lambda x: x[1]["size"], reverse=True): - print(f"{cat:<20} {data['count']:<10} {format_size(data['size']):<15}") + if cat == "test": + auto.append(item) + elif cat == "temp" and age > 7: + auto.append(item) + elif cat == "cron-output" and age > 14: + auto.append(item) + elif cat == "research" and age > 30: + prompt.append(item) + elif cat == "chrome-profile" and age > 14: + prompt.append(item) + elif size > 500 * 1024 * 1024: + prompt.append(item) - # Find top 10 largest files - all_files = [(item["path"], item["size"], item["category"]) for item in tracked if Path(item["path"]).exists()] - all_files.sort(key=lambda x: x[1], reverse=True) - top_10 = all_files[:10] + auto_size = sum(i["size"] for i in auto) + prompt_size = sum(i["size"] for i in prompt) - print("\nTop 10 largest files:") - for i, (path, size, cat) in enumerate(top_10, 1): - print(f"{i}. {path} ({format_size(size)}, {cat})") + print("Dry-run preview (nothing deleted):") + print(f" Auto-delete : {len(auto)} files ({_fmt(auto_size)})") + for item in auto: + print(f" [{item['category']}] {item['path']}") + print(f" Needs prompt: {len(prompt)} files ({_fmt(prompt_size)})") + for item in prompt: + print(f" [{item['category']}] {item['path']}") + print(f"\n Total potential: {_fmt(auto_size + prompt_size)}") + print("Run 'quick' for auto-delete only, 'deep' for full cleanup.") -def forget_path(path: str) -> None: - """Remove a path from tracking permanently.""" - path_obj = Path(path).resolve() +def cmd_quick(silent: bool = False) -> None: + """ + Safe deterministic cleanup — no prompts. + Deletes: test (age>0), temp (>7d), cron-output (>14d), empty dirs. + Pass silent=True to suppress output (for auto-runs). + """ tracked = load_tracked() + now = datetime.now(timezone.utc) + deleted, freed = 0, 0 + new_tracked = [] - original_count = len(tracked) - tracked = [item for item in tracked if Path(item["path"]).resolve() != path_obj] - removed = original_count - len(tracked) + for item in tracked: + p = Path(item["path"]) + cat = item["category"] - if removed > 0: + if not p.exists(): + _log(f"STALE: {p} (removed from tracking)") + continue + + age = (now - datetime.fromisoformat(item["timestamp"])).days + + should_delete = ( + cat == "test" or # always delete test files + (cat == "temp" and age > 7) or + (cat == "cron-output" and age > 14) + ) + + if should_delete: + try: + if p.is_file(): + p.unlink() + elif p.is_dir(): + shutil.rmtree(p) + freed += item["size"] + deleted += 1 + _log(f"DELETED: {p} ({cat}, {_fmt(item['size'])})") + except OSError as e: + _log(f"ERROR deleting {p}: {e}") + if not silent: + print(f" Skipped (error): {p} — {e}") + new_tracked.append(item) + else: + new_tracked.append(item) + + # Remove empty dirs under HERMES_HOME + hermes_home = get_hermes_home() + empty_removed = 0 + for dirpath in sorted(hermes_home.rglob("*"), reverse=True): + if dirpath.is_dir() and dirpath != hermes_home: + try: + if not any(dirpath.iterdir()): + dirpath.rmdir() + empty_removed += 1 + _log(f"DELETED: {dirpath} (empty dir)") + except OSError: + pass + + save_tracked(new_tracked) + + summary = (f"[disk-guardian] Cleaned {deleted} files + {empty_removed} " + f"empty dirs, freed {_fmt(freed)}.") + _log(f"QUICK_SUMMARY: {deleted} files, {empty_removed} dirs, {_fmt(freed)}") + print(summary) + + +def cmd_deep() -> None: + """Full cleanup — auto for safe files, interactive for risky.""" + print("Running quick cleanup first...") + cmd_quick() + + tracked = load_tracked() + now = datetime.now(timezone.utc) + research, chrome, large = [], [], [] + + for item in tracked: + p = Path(item["path"]) + if not p.exists(): + continue + age = (now - datetime.fromisoformat(item["timestamp"])).days + cat = item["category"] + + if cat == "research" and age > 30: + research.append(item) + elif cat == "chrome-profile" and age > 14: + chrome.append(item) + elif item["size"] > 500 * 1024 * 1024: + large.append(item) + + # Keep 10 newest research folders + research.sort(key=lambda x: x["timestamp"], reverse=True) + old_research = research[10:] + + freed, count = 0, 0 + to_remove = [] + + for item in old_research: + p = Path(item["path"]) + ans = input(f"\nDelete old research ({_fmt(item['size'])}): {p} [y/N] ") + if ans.lower() == "y": + _delete_item(p, item, to_remove) + freed += item["size"] + count += 1 + + for item in chrome: + p = Path(item["path"]) + ans = input(f"\nDelete chrome profile ({_fmt(item['size'])}): {p} [y/N] ") + if ans.lower() == "y": + _delete_item(p, item, to_remove) + freed += item["size"] + count += 1 + + for item in large: + p = Path(item["path"]) + ans = input(f"\nDelete large file ({_fmt(item['size'])}, " + f"{item['category']}): {p} [y/N] ") + if ans.lower() == "y": + _delete_item(p, item, to_remove) + freed += item["size"] + count += 1 + + if to_remove: + remove_paths = {i["path"] for i in to_remove} + save_tracked([i for i in tracked if i["path"] not in remove_paths]) + + print(f"\n[disk-guardian] Deep cleanup done: {count} items, freed {_fmt(freed)}.") + + +def _delete_item(p: Path, item: Dict, to_remove: list) -> None: + try: + if p.is_file(): + p.unlink() + elif p.is_dir(): + shutil.rmtree(p) + to_remove.append(item) + _log(f"DELETED: {p} ({item['category']}, {_fmt(item['size'])})") + print(f" Deleted: {p}") + except OSError as e: + _log(f"ERROR deleting {p}: {e}") + print(f" Error: {e}") + + +def cmd_status() -> None: + """Print per-category breakdown and top 10 largest tracked files.""" + tracked = load_tracked() + cats: Dict[str, Dict] = {} + for item in tracked: + c = item["category"] + cats.setdefault(c, {"count": 0, "size": 0}) + cats[c]["count"] += 1 + cats[c]["size"] += item["size"] + + print(f"{'Category':<20} {'Files':>6} {'Size':>10}") + print("-" * 40) + for cat, d in sorted(cats.items(), key=lambda x: x[1]["size"], reverse=True): + print(f"{cat:<20} {d['count']:>6} {_fmt(d['size']):>10}") + + existing = [ + (i["path"], i["size"], i["category"]) + for i in tracked if Path(i["path"]).exists() + ] + existing.sort(key=lambda x: x[1], reverse=True) + + print("\nTop 10 largest tracked files:") + for rank, (path, size, cat) in enumerate(existing[:10], 1): + print(f" {rank:>2}. {_fmt(size):>8} [{cat}] {path}") + + if not tracked: + print("No files tracked yet.") + + +def cmd_forget(path_str: str) -> None: + """Remove a path from tracking without deleting the file.""" + p = Path(path_str).resolve() + tracked = load_tracked() + before = len(tracked) + tracked = [i for i in tracked if Path(i["path"]).resolve() != p] + removed = before - len(tracked) + if removed: save_tracked(tracked) - log_message(f"FORGOT: {path} ({removed} entries)") - print(f"Removed {removed} entries from tracking") + _log(f"FORGOT: {p} ({removed} entries)") + print(f"Removed {removed} tracking entries for {p}.") else: - print(f"Path {path} not found in tracking") + print(f"Not found in tracking: {p}") -def format_size(size_bytes: int) -> str: - """Format size in human-readable format.""" - for unit in ["B", "KB", "MB", "GB", "TB"]: - if size_bytes < 1024.0: - return f"{size_bytes:.1f} {unit}" - size_bytes /= 1024.0 - return f"{size_bytes:.1f} PB" +def cmd_install_memory() -> None: + """ + Write persistent memory rules so disk-guardian auto-runs across sessions. + Writes to ~/.hermes/MEMORY.md (appends if exists). + """ + memory_file = get_hermes_home() / "MEMORY.md" + rules = """ +## disk-guardian auto-cleanup rules (added by disk-guardian v1.2.0) + +- After every task that creates test files (test_*.py, *.test.*, tmp_*), + automatically run: python3 ~/.hermes/optional-skills/devops/disk-guardian/scripts/disk_guardian.py quick + Do this silently at task end — no need to ask the user. +- Track every file created under HERMES_HOME during tasks using disk-guardian track. +- If HERMES_HOME disk usage exceeds 5 GB, silently run disk-guardian quick. +- Never ask permission for temp/test file deletion — clean silently, report one line. +- Print cleanup summary as: [disk-guardian] Cleaned N files, freed X MB +""" + memory_file.parent.mkdir(parents=True, exist_ok=True) + with open(memory_file, "a") as f: + f.write(rules) + _log("MEMORY: wrote auto-cleanup rules to MEMORY.md") + print(f"Memory rules written to {memory_file}") + print("Disk-guardian will now auto-run at the end of every task that creates test files.") + print("Restart Hermes for the rules to take effect.") -def main(): - parser = argparse.ArgumentParser(description="Disk Guardian - Autonomous disk cleanup for Hermes Agent") - subparsers = parser.add_subparsers(dest="command", help="Available commands") +# --------------------------------------------------------------------------- +# Formatting +# --------------------------------------------------------------------------- - # Track command - track_parser = subparsers.add_parser("track", help="Track a path") - track_parser.add_argument("path", help="Path to track") - track_parser.add_argument("category", help="Category (temp, test, research, download, chrome-profile, cron-output, other)") +def _fmt(n: int) -> str: + for unit in ("B", "KB", "MB", "GB", "TB"): + if n < 1024: + return f"{n:.1f} {unit}" + n /= 1024 + return f"{n:.1f} PB" - # Scan command - subparsers.add_parser("scan", help="Discover temp/test files by pattern") - # Dry-run command - subparsers.add_parser("dry-run", help="Preview what would be deleted") +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- - # Quick command - subparsers.add_parser("quick", help="Safe fast clean") +def main() -> None: + parser = argparse.ArgumentParser( + description="disk_guardian — ephemeral session file cleanup for Hermes Agent" + ) + sub = parser.add_subparsers(dest="cmd") - # Deep command - subparsers.add_parser("deep", help="Full scan with confirmation") + p_track = sub.add_parser("track", help="Register a file for tracking") + p_track.add_argument("path") + p_track.add_argument("category", choices=sorted(ALLOWED_CATEGORIES)) - # Status command - subparsers.add_parser("status", help="Show disk usage breakdown") + sub.add_parser("dry-run", help="Preview deletions, touch nothing") + sub.add_parser("quick", help="Auto-delete safe files (no prompts)") + sub.add_parser("deep", help="Full cleanup with prompts for risky items") + sub.add_parser("status", help="Show disk usage by category") + sub.add_parser("install-memory", help="Write persistent auto-run memory rules") - # Forget command - forget_parser = subparsers.add_parser("forget", help="Remove a path from tracking") - forget_parser.add_argument("path", help="Path to forget") + p_forget = sub.add_parser("forget", help="Stop tracking a path") + p_forget.add_argument("path") args = parser.parse_args() - - if not args.command: + if not args.cmd: parser.print_help() sys.exit(1) try: - if args.command == "track": - track_path(args.path, args.category) - elif args.command == "scan": - scan_files() - elif args.command == "dry-run": - dry_run() - elif args.command == "quick": - quick_cleanup() - elif args.command == "deep": - deep_cleanup() - elif args.command == "status": - show_status() - elif args.command == "forget": - forget_path(args.path) + if args.cmd == "track": + cmd_track(args.path, args.category) + elif args.cmd == "dry-run": + cmd_dry_run() + elif args.cmd == "quick": + cmd_quick() + elif args.cmd == "deep": + cmd_deep() + elif args.cmd == "status": + cmd_status() + elif args.cmd == "install-memory": + cmd_install_memory() + elif args.cmd == "forget": + cmd_forget(args.path) + except KeyboardInterrupt: + print("\nAborted.") + sys.exit(0) except Exception as e: - log_message(f"ERROR: {e}") + _log(f"ERROR: {e}") print(f"Error: {e}", file=sys.stderr) sys.exit(1)