feat: add session artifact disk cleanup

Port from qwibitai/nanoclaw#1632: Auto-prune stale session artifacts.

hermes-agent accumulates disk artifacts that are never cleaned up:
- Session transcript JSON files (~2 GB on a typical install)
- API request debug dumps
- Filesystem checkpoint shadow repos (~12 GB)
- Gateway JSONL transcript files

The existing 'hermes sessions prune' only deletes DB rows, leaving all
disk files behind.

Changes:
- New tools/session_cleanup.py module with safe, active-session-aware
  disk artifact pruning (session files, request dumps, checkpoints)
- Enhanced 'hermes sessions prune' with --include-files, --files-only,
  and --dry-run flags for disk artifact cleanup
- Enhanced 'hermes sessions stats' to show disk artifact counts and sizes
- Automated daily cleanup in gateway's session expiry watcher
- 28 new tests covering all cleanup paths, safety guards, and edge cases

Safety:
- Never deletes files belonging to active (non-ended) sessions
- Never touches sessions.json state file
- Checkpoints use age-based deletion only (no session ID correlation)
- Dry-run mode available for preview before deletion
- All errors are caught and logged, never crash the gateway
This commit is contained in:
Teknium 2026-04-12 17:24:17 -07:00
parent 0d0d27d45e
commit 62919b1ef7
No known key found for this signature in database
5 changed files with 688 additions and 5 deletions

View file

@ -1756,6 +1756,9 @@ class GatewayRunner:
await asyncio.sleep(60) # initial delay — let the gateway fully start
_flush_failures: dict[str, int] = {} # session_id -> consecutive failure count
_MAX_FLUSH_RETRIES = 3
# Daily artifact cleanup: 288 cycles × 5-min interval ≈ 24h
_ARTIFACT_CLEANUP_EVERY = max(1, (24 * 3600) // interval)
_artifact_tick = 0
while self._running:
try:
self.session_store._ensure_loaded()
@ -1856,6 +1859,30 @@ class GatewayRunner:
)
except Exception as e:
logger.debug("Session expiry watcher error: %s", e)
# --- Daily disk artifact cleanup ---
# Run once per day (every 288 cycles at 5-min interval = 24h).
# Cleans up stale session transcript files and checkpoint directories.
_artifact_tick += 1
if _artifact_tick >= _ARTIFACT_CLEANUP_EVERY:
_artifact_tick = 0
try:
from tools.session_cleanup import prune_all_artifacts
_hermes_home = get_hermes_home()
_results = await asyncio.get_event_loop().run_in_executor(
None,
lambda: prune_all_artifacts(_hermes_home, self._session_db),
)
_total_freed = sum(v[1] for v in _results.values())
_total_count = sum(v[0] for v in _results.values())
if _total_count:
logger.info(
"Artifact cleanup: removed %d items, freed %d KB",
_total_count, _total_freed // 1024,
)
except Exception as e:
logger.debug("Artifact cleanup error: %s", e)
# Sleep in small increments so we can stop quickly
for _ in range(interval):
if not self._running:

View file

@ -5397,6 +5397,12 @@ For more help on a command:
sessions_prune.add_argument("--older-than", type=int, default=90, help="Delete sessions older than N days (default: 90)")
sessions_prune.add_argument("--source", help="Only prune sessions from this source")
sessions_prune.add_argument("--yes", "-y", action="store_true", help="Skip confirmation")
sessions_prune.add_argument("--include-files", action="store_true",
help="Also delete session transcript files and stale checkpoints from disk")
sessions_prune.add_argument("--files-only", action="store_true",
help="Only clean disk files (session transcripts, request dumps, checkpoints) — skip DB pruning")
sessions_prune.add_argument("--dry-run", action="store_true",
help="Show what would be deleted without actually deleting")
sessions_subparsers.add_parser("stats", help="Show session store statistics")
@ -5503,12 +5509,50 @@ For more help on a command:
elif action == "prune":
days = args.older_than
source_msg = f" from '{args.source}'" if args.source else ""
if not args.yes:
if not _confirm_prompt(f"Delete all ended sessions older than {days} days{source_msg}? [y/N] "):
files_only = getattr(args, "files_only", False)
include_files = getattr(args, "include_files", False) or files_only
dry_run = getattr(args, "dry_run", False)
if dry_run:
print("Dry run — showing what would be deleted:\n")
elif not args.yes:
if files_only:
prompt_msg = f"Clean up disk files (session transcripts, request dumps, checkpoints) older than {days} days? [y/N] "
elif include_files:
prompt_msg = f"Delete all ended sessions older than {days} days{source_msg} AND clean up disk files? [y/N] "
else:
prompt_msg = f"Delete all ended sessions older than {days} days{source_msg}? [y/N] "
if not _confirm_prompt(prompt_msg):
print("Cancelled.")
return
count = db.prune_sessions(older_than_days=days, source=args.source)
print(f"Pruned {count} session(s).")
# DB pruning (skip if --files-only)
if not files_only:
if dry_run:
print(f"DB: Would prune ended sessions older than {days} days{source_msg}.")
print(" (Use without --dry-run to see exact count)")
else:
count = db.prune_sessions(older_than_days=days, source=args.source)
print(f"Pruned {count} session(s) from database.")
# Disk file cleanup
if include_files:
try:
from tools.session_cleanup import prune_all_artifacts, format_prune_summary
hermes_home = get_hermes_home()
results = prune_all_artifacts(
hermes_home, db,
session_retention_days=days,
checkpoint_retention_days=min(days, 14),
dry_run=dry_run,
)
summary = format_prune_summary(results)
if dry_run:
print(f"\nDisk files that would be cleaned:\n{summary}")
else:
print(f"\nDisk cleanup:\n{summary}")
except Exception as e:
print(f"Disk cleanup failed: {e}")
elif action == "rename":
resolved_session_id = db.resolve_session_id(args.session_id)
@ -5567,6 +5611,30 @@ For more help on a command:
size_mb = os.path.getsize(db_path) / (1024 * 1024)
print(f"Database size: {size_mb:.1f} MB")
# Disk artifact stats
hermes_home = get_hermes_home()
sessions_dir = hermes_home / "sessions"
checkpoints_dir = hermes_home / "checkpoints"
if sessions_dir.exists():
session_files = list(sessions_dir.glob("session_*.json"))
request_dumps = list(sessions_dir.glob("request_dump_*.json"))
jsonl_files = list(sessions_dir.glob("*.jsonl"))
session_bytes = sum(f.stat().st_size for f in session_files if f.is_file())
dump_bytes = sum(f.stat().st_size for f in request_dumps if f.is_file())
jsonl_bytes = sum(f.stat().st_size for f in jsonl_files if f.is_file())
print(f"\nDisk artifacts:")
if session_files:
print(f" Session transcripts: {len(session_files)} files ({session_bytes / (1024*1024):.1f} MB)")
if request_dumps:
print(f" Request dumps: {len(request_dumps)} files ({dump_bytes / (1024*1024):.1f} MB)")
if jsonl_files:
print(f" Gateway transcripts: {len(jsonl_files)} files ({jsonl_bytes / (1024*1024):.1f} MB)")
if checkpoints_dir.exists():
cp_dirs = [d for d in checkpoints_dir.iterdir() if d.is_dir()]
if cp_dirs:
print(f" Checkpoints: {len(cp_dirs)} directories")
print(f"\n Tip: Use 'hermes sessions prune --include-files' to clean up old disk artifacts.")
else:
sessions_parser.print_help()

View file

@ -84,7 +84,8 @@ TIPS = [
"hermes config check scans for missing or stale configuration options.",
"hermes sessions browse opens an interactive session picker with search.",
"hermes sessions stats shows session counts by platform and database size.",
"hermes sessions prune --older-than 30 cleans up old sessions.",
"hermes sessions prune --older-than 30 cleans up old sessions from the database.",
"hermes sessions prune --include-files cleans up both DB records AND disk artifacts (transcripts, checkpoints).",
"hermes skills search react --source skills-sh searches the skills.sh public directory.",
"hermes skills check scans installed hub skills for upstream updates.",
"hermes skills tap add myorg/skills-repo adds a custom GitHub skill source.",

View file

@ -0,0 +1,304 @@
"""Tests for tools/session_cleanup.py — session artifact disk cleanup."""
import os
import json
import time
import shutil
import tempfile
from pathlib import Path
from unittest.mock import MagicMock
import pytest
# Import the module under test
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from tools.session_cleanup import (
_extract_session_id_from_filename,
_get_active_session_ids,
_human_size,
format_prune_summary,
prune_all_artifacts,
prune_checkpoints,
prune_session_files,
SESSION_FILE_RETENTION_DAYS,
CHECKPOINT_RETENTION_DAYS,
)
# ─── Filename extraction ────────────────────────────────────────────────────
class TestExtractSessionId:
def test_session_json(self):
assert _extract_session_id_from_filename("session_20260412_171123_466c.json") == "20260412_171123_466c"
def test_session_uuid(self):
assert _extract_session_id_from_filename("session_13bac027-2f82-4267-a0c6-1c37a032945a.json") == \
"13bac027-2f82-4267-a0c6-1c37a032945a"
def test_request_dump(self):
result = _extract_session_id_from_filename(
"request_dump_20260412_171123_466c_20260412_171125_789abc.json"
)
assert result == "20260412_171123_466c"
def test_jsonl_transcript(self):
assert _extract_session_id_from_filename("20260412_012806_2013cd04.jsonl") == "20260412_012806_2013cd04"
def test_unknown_format(self):
assert _extract_session_id_from_filename("random_file.txt") is None
def test_sessions_json_state_file(self):
"""sessions.json should return None — it's not a session artifact."""
assert _extract_session_id_from_filename("sessions.json") is None
# ─── Active session detection ────────────────────────────────────────────────
class TestGetActiveSessionIds:
def test_returns_active_ids(self):
db = MagicMock()
db.list_sessions_rich.return_value = [
{"id": "active1", "ended_at": None},
{"id": "ended1", "ended_at": 1234567890.0},
{"id": "active2", "ended_at": None},
]
result = _get_active_session_ids(db)
assert result == {"active1", "active2"}
def test_returns_empty_on_error(self):
db = MagicMock()
db.list_sessions_rich.side_effect = Exception("DB error")
result = _get_active_session_ids(db)
assert result == set()
# ─── Session file pruning ────────────────────────────────────────────────────
class TestPruneSessionFiles:
@pytest.fixture
def sessions_dir(self, tmp_path):
d = tmp_path / "sessions"
d.mkdir()
return d
@pytest.fixture
def mock_db(self):
db = MagicMock()
db.list_sessions_rich.return_value = [
{"id": "active_session_1", "ended_at": None},
]
return db
def _create_old_file(self, directory, filename, age_days=60, content="{}"):
"""Create a file with a modification time in the past."""
f = directory / filename
f.write_text(content)
old_time = time.time() - (age_days * 86400)
os.utime(f, (old_time, old_time))
return f
def test_deletes_old_session_files(self, sessions_dir, mock_db):
self._create_old_file(sessions_dir, "session_old_ended_1.json", age_days=60)
self._create_old_file(sessions_dir, "session_old_ended_2.json", age_days=45)
files_del, bytes_freed = prune_session_files(
sessions_dir, mock_db, retention_days=30
)
assert files_del == 2
assert bytes_freed > 0
assert not (sessions_dir / "session_old_ended_1.json").exists()
assert not (sessions_dir / "session_old_ended_2.json").exists()
def test_preserves_recent_files(self, sessions_dir, mock_db):
# Recent file (5 days old) — should NOT be deleted
recent = sessions_dir / "session_recent_1.json"
recent.write_text("{}")
old_time = time.time() - (5 * 86400)
os.utime(recent, (old_time, old_time))
files_del, _ = prune_session_files(sessions_dir, mock_db, retention_days=30)
assert files_del == 0
assert recent.exists()
def test_preserves_active_session_files(self, sessions_dir, mock_db):
"""Files belonging to active sessions should never be deleted."""
self._create_old_file(
sessions_dir, "session_active_session_1.json", age_days=60
)
files_del, _ = prune_session_files(sessions_dir, mock_db, retention_days=30)
assert files_del == 0
assert (sessions_dir / "session_active_session_1.json").exists()
def test_preserves_sessions_json_state_file(self, sessions_dir, mock_db):
"""sessions.json must never be deleted."""
self._create_old_file(sessions_dir, "sessions.json", age_days=365)
files_del, _ = prune_session_files(sessions_dir, mock_db, retention_days=1)
assert files_del == 0
assert (sessions_dir / "sessions.json").exists()
def test_deletes_old_request_dumps(self, sessions_dir, mock_db):
self._create_old_file(
sessions_dir,
"request_dump_old_ended_1_20260412_171125_789abc.json",
age_days=60,
)
files_del, _ = prune_session_files(sessions_dir, mock_db, retention_days=30)
assert files_del == 1
def test_deletes_old_jsonl_transcripts(self, sessions_dir, mock_db):
self._create_old_file(sessions_dir, "20260312_012806_2013cd04.jsonl", age_days=60)
files_del, _ = prune_session_files(sessions_dir, mock_db, retention_days=30)
assert files_del == 1
def test_dry_run_does_not_delete(self, sessions_dir, mock_db):
f = self._create_old_file(sessions_dir, "session_dry_run_test.json", age_days=60)
files_del, _ = prune_session_files(
sessions_dir, mock_db, retention_days=30, dry_run=True
)
# Dry run returns 0 because nothing was actually deleted
assert files_del == 0
assert f.exists()
def test_nonexistent_directory(self, tmp_path, mock_db):
fake_dir = tmp_path / "nonexistent"
files_del, bytes_freed = prune_session_files(fake_dir, mock_db)
assert files_del == 0
assert bytes_freed == 0
# ─── Checkpoint pruning ──────────────────────────────────────────────────────
class TestPruneCheckpoints:
@pytest.fixture
def checkpoints_dir(self, tmp_path):
d = tmp_path / "checkpoints"
d.mkdir()
return d
def _create_old_checkpoint(self, directory, name, age_days=30, size_kb=10):
"""Create a checkpoint directory with some content."""
cp = directory / name
cp.mkdir()
# Create some files to simulate a git repo
(cp / "HEAD").write_text("ref: refs/heads/master\n")
(cp / "config").write_text("[core]\n\trepositoryformatversion = 0\n")
(cp / "HERMES_WORKDIR").write_text("/tmp/test\n")
# Create filler data
(cp / "objects").mkdir()
(cp / "objects" / "data").write_bytes(b"x" * (size_kb * 1024))
old_time = time.time() - (age_days * 86400)
os.utime(cp, (old_time, old_time))
return cp
def test_deletes_old_checkpoints(self, checkpoints_dir):
self._create_old_checkpoint(checkpoints_dir, "abc123", age_days=30)
self._create_old_checkpoint(checkpoints_dir, "def456", age_days=20)
dirs_del, bytes_freed = prune_checkpoints(
checkpoints_dir, retention_days=14
)
assert dirs_del == 2
assert bytes_freed > 0
assert not (checkpoints_dir / "abc123").exists()
assert not (checkpoints_dir / "def456").exists()
def test_preserves_recent_checkpoints(self, checkpoints_dir):
self._create_old_checkpoint(checkpoints_dir, "recent1", age_days=5)
dirs_del, _ = prune_checkpoints(checkpoints_dir, retention_days=14)
assert dirs_del == 0
assert (checkpoints_dir / "recent1").exists()
def test_dry_run_does_not_delete(self, checkpoints_dir):
cp = self._create_old_checkpoint(checkpoints_dir, "dryrun1", age_days=30)
dirs_del, _ = prune_checkpoints(
checkpoints_dir, retention_days=14, dry_run=True
)
assert dirs_del == 0
assert cp.exists()
def test_nonexistent_directory(self, tmp_path):
fake_dir = tmp_path / "nonexistent"
dirs_del, bytes_freed = prune_checkpoints(fake_dir)
assert dirs_del == 0
assert bytes_freed == 0
# ─── prune_all_artifacts integration ─────────────────────────────────────────
class TestPruneAllArtifacts:
@pytest.fixture
def hermes_home(self, tmp_path):
h = tmp_path / "hermes"
(h / "sessions").mkdir(parents=True)
(h / "checkpoints").mkdir(parents=True)
return h
@pytest.fixture
def mock_db(self):
db = MagicMock()
db.list_sessions_rich.return_value = []
return db
def test_returns_results_for_all_types(self, hermes_home, mock_db):
results = prune_all_artifacts(hermes_home, mock_db)
assert "session_files" in results
assert "checkpoints" in results
def test_combined_cleanup(self, hermes_home, mock_db):
# Create old session file
sf = hermes_home / "sessions" / "session_old_test.json"
sf.write_text("{}")
old_time = time.time() - (60 * 86400)
os.utime(sf, (old_time, old_time))
# Create old checkpoint
cp = hermes_home / "checkpoints" / "oldcp"
cp.mkdir()
(cp / "HEAD").write_text("ref: refs/heads/master\n")
os.utime(cp, (old_time, old_time))
results = prune_all_artifacts(
hermes_home, mock_db,
session_retention_days=30,
checkpoint_retention_days=14,
)
assert results["session_files"][0] == 1
assert results["checkpoints"][0] == 1
# ─── Formatting ──────────────────────────────────────────────────────────────
class TestFormatPruneSummary:
def test_no_artifacts(self):
results = {"session_files": (0, 0), "checkpoints": (0, 0)}
assert format_prune_summary(results) == "No stale artifacts found."
def test_with_files(self):
results = {"session_files": (5, 1024 * 1024 * 100), "checkpoints": (3, 1024 * 1024 * 500)}
summary = format_prune_summary(results)
assert "5 removed" in summary
assert "3 removed" in summary
assert "Total freed" in summary
class TestHumanSize:
def test_bytes(self):
assert _human_size(500) == "500 B"
def test_kb(self):
assert _human_size(2048) == "2.0 KB"
def test_mb(self):
assert _human_size(1024 * 1024 * 5) == "5.0 MB"
def test_gb(self):
assert _human_size(1024 * 1024 * 1024 * 2) == "2.0 GB"

283
tools/session_cleanup.py Normal file
View file

@ -0,0 +1,283 @@
"""
Session Artifact Cleanup Prune stale disk files that accumulate over time.
Hermes creates several on-disk artifacts per session that are never automatically
cleaned up:
~/.hermes/sessions/session_<id>.json CLI session transcript logs
~/.hermes/sessions/request_dump_<id>.json API debug request dumps
~/.hermes/sessions/<id>.jsonl Gateway legacy transcript files
~/.hermes/checkpoints/<hash>/ Filesystem checkpoint shadow repos
The SessionDB.prune_sessions() method only deletes DB rows. This module handles
the disk side: identifying stale files, protecting active sessions, and reclaiming
disk space.
Inspired by qwibitai/nanoclaw#1632 (auto-prune stale session artifacts).
"""
import logging
import os
import shutil
import time
from pathlib import Path
from typing import Dict, Optional, Set, Tuple
logger = logging.getLogger(__name__)
# Default retention periods (days)
SESSION_FILE_RETENTION_DAYS = 30
REQUEST_DUMP_RETENTION_DAYS = 7
CHECKPOINT_RETENTION_DAYS = 14
JSONL_TRANSCRIPT_RETENTION_DAYS = 30
def _get_active_session_ids(db) -> Set[str]:
"""Get session IDs that are still active (not ended) from the DB.
Returns a set of session ID strings. On error, returns an empty set
(fail-safe: if we can't determine active sessions, don't delete anything).
"""
try:
rows = db.list_sessions_rich(limit=100000, include_children=True)
return {r["id"] for r in rows if r.get("ended_at") is None}
except Exception as e:
logger.warning("Could not fetch active sessions: %s", e)
return set()
def _extract_session_id_from_filename(filename: str) -> Optional[str]:
"""Extract the session ID from a session artifact filename.
Examples:
session_20260412_171123_466c.json 20260412_171123_466c
request_dump_20260412_171123_466c_20260412_171125_789abc.json 20260412_171123_466c
20260412_012806_2013cd04.jsonl 20260412_012806_2013cd04
"""
name = filename
if name.startswith("session_") and name.endswith(".json"):
return name[len("session_"):-len(".json")]
elif name.startswith("request_dump_") and name.endswith(".json"):
# request_dump_{session_id}_{timestamp}.json
# Session ID is the part between "request_dump_" and the last timestamp
stem = name[len("request_dump_"):-len(".json")]
# Session IDs look like: 20260412_171123_466c or UUID format
# The dump adds another _YYYYMMDD_HHMMSS_ffffff suffix
# Try splitting off the last 3 underscore-separated components (date_time_microseconds)
parts = stem.rsplit("_", 3)
if len(parts) >= 4:
return "_".join(parts[:-3])
return stem
elif name.endswith(".jsonl"):
return name[:-len(".jsonl")]
return None
def prune_session_files(
sessions_dir: Path,
db,
retention_days: int = SESSION_FILE_RETENTION_DAYS,
dry_run: bool = False,
) -> Tuple[int, int]:
"""Delete stale session_*.json files from the sessions directory.
Only deletes files for sessions that are:
1. Older than retention_days (by file modification time)
2. NOT currently active in the DB
Args:
sessions_dir: Path to ~/.hermes/sessions/
db: SessionDB instance for checking active sessions
retention_days: Only delete files older than this many days
dry_run: If True, report what would be deleted without deleting
Returns:
(files_deleted, bytes_freed) tuple
"""
if not sessions_dir.exists():
return 0, 0
active_ids = _get_active_session_ids(db)
cutoff = time.time() - (retention_days * 86400)
files_deleted = 0
bytes_freed = 0
for f in sessions_dir.iterdir():
if not f.is_file():
continue
# Skip the sessions.json state file
if f.name == "sessions.json":
continue
# Only process session files and request dumps
if not (f.name.startswith("session_") or f.name.startswith("request_dump_")):
if not f.name.endswith(".jsonl"):
continue
try:
stat = f.stat()
except OSError:
continue
# Skip files newer than retention period
if stat.st_mtime > cutoff:
continue
# Extract session ID and check if it's active
session_id = _extract_session_id_from_filename(f.name)
if session_id and session_id in active_ids:
continue
size = stat.st_size
if dry_run:
logger.info("Would remove: %s (%d KB)", f.name, size // 1024)
else:
try:
f.unlink()
files_deleted += 1
bytes_freed += size
except OSError as e:
logger.debug("Failed to remove %s: %s", f.name, e)
return files_deleted, bytes_freed
def prune_checkpoints(
checkpoints_dir: Path,
retention_days: int = CHECKPOINT_RETENTION_DAYS,
dry_run: bool = False,
) -> Tuple[int, int]:
"""Delete stale checkpoint shadow repos.
Checkpoints are keyed by sha256(working_dir)[:16], not by session ID.
We use modification time as the sole criterion for staleness.
Args:
checkpoints_dir: Path to ~/.hermes/checkpoints/
retention_days: Only delete checkpoints older than this many days
dry_run: If True, report what would be deleted without deleting
Returns:
(dirs_deleted, bytes_freed) tuple
"""
if not checkpoints_dir.exists():
return 0, 0
cutoff = time.time() - (retention_days * 86400)
dirs_deleted = 0
bytes_freed = 0
for entry in checkpoints_dir.iterdir():
if not entry.is_dir():
continue
try:
mtime = entry.stat().st_mtime
except OSError:
continue
if mtime > cutoff:
continue
# Calculate size before deletion
try:
size = sum(
f.stat().st_size
for f in entry.rglob("*")
if f.is_file()
)
except OSError:
size = 0
if dry_run:
logger.info("Would remove checkpoint: %s (%d KB)", entry.name, size // 1024)
else:
try:
shutil.rmtree(entry)
dirs_deleted += 1
bytes_freed += size
except OSError as e:
logger.debug("Failed to remove checkpoint %s: %s", entry.name, e)
return dirs_deleted, bytes_freed
def prune_all_artifacts(
hermes_home: Path,
db,
session_retention_days: int = SESSION_FILE_RETENTION_DAYS,
checkpoint_retention_days: int = CHECKPOINT_RETENTION_DAYS,
dry_run: bool = False,
) -> Dict[str, Tuple[int, int]]:
"""Prune all stale session artifacts from disk.
This is the main entry point for both the CLI command and automated cleanup.
Args:
hermes_home: Path to ~/.hermes/
db: SessionDB instance
session_retention_days: Retention for session files and request dumps
checkpoint_retention_days: Retention for checkpoint directories
dry_run: If True, report what would be deleted without deleting
Returns:
Dict mapping artifact type to (count_deleted, bytes_freed):
{
"session_files": (N, bytes),
"checkpoints": (N, bytes),
}
"""
results = {}
sessions_dir = hermes_home / "sessions"
files_del, files_bytes = prune_session_files(
sessions_dir, db,
retention_days=session_retention_days,
dry_run=dry_run,
)
results["session_files"] = (files_del, files_bytes)
checkpoints_dir = hermes_home / "checkpoints"
cp_del, cp_bytes = prune_checkpoints(
checkpoints_dir,
retention_days=checkpoint_retention_days,
dry_run=dry_run,
)
results["checkpoints"] = (cp_del, cp_bytes)
return results
def format_prune_summary(results: Dict[str, Tuple[int, int]]) -> str:
"""Format prune results as a human-readable summary."""
lines = []
total_freed = 0
files_del, files_bytes = results.get("session_files", (0, 0))
if files_del:
lines.append(f" Session files: {files_del} removed ({_human_size(files_bytes)})")
total_freed += files_bytes
cp_del, cp_bytes = results.get("checkpoints", (0, 0))
if cp_del:
lines.append(f" Checkpoints: {cp_del} removed ({_human_size(cp_bytes)})")
total_freed += cp_bytes
if not lines:
return "No stale artifacts found."
lines.append(f" Total freed: {_human_size(total_freed)}")
return "\n".join(lines)
def _human_size(size_bytes: int) -> str:
"""Convert bytes to human-readable size string."""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.1f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"