mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-30 01:41:43 +00:00
Adds the Curator — an auxiliary-model background task that periodically
reviews AGENT-CREATED skills and keeps the collection tidy: tracks usage,
transitions unused skills through active → stale → archived, and spawns
a forked AIAgent to consolidate overlaps and patch drift.
Default: enabled, inactivity-triggered (no cron daemon). Runs on CLI
startup and gateway boot when the last run is older than interval_hours
(default 24) AND the agent has been idle for min_idle_hours (default 2).
Invariants (all load-bearing):
- Never touches bundled or hub-installed skills (.bundled_manifest +
.hub/lock.json double-filter)
- Never auto-deletes — archive only. Archives are recoverable
via `hermes curator restore <skill>`
- Pinned skills bypass all auto-transitions
- Uses the aux client; never touches the main session's prompt cache
New files:
- tools/skill_usage.py — sidecar .usage.json telemetry, atomic writes,
provenance filter
- agent/curator.py — orchestrator: config, idle gating, state-machine
transitions (pure, no LLM), forked-agent review prompt
- hermes_cli/curator.py — `hermes curator {status,run,pause,resume,
pin,unpin,restore}` subcommand
- tests/tools/test_skill_usage.py — 29 tests
- tests/agent/test_curator.py — 25 tests
Modified files (surgical patches):
- tools/skills_tool.py — bump view_count on successful skill_view
- tools/skill_manager_tool.py — bump patch_count on skill_manage
patch/edit/write_file/remove_file; forget record on delete
- hermes_cli/config.py — add curator: section to DEFAULT_CONFIG
- hermes_cli/commands.py — add /curator CommandDef with subcommands
- hermes_cli/main.py — register `hermes curator` subparser via
register_cli() from hermes_cli.curator
- cli.py — /curator slash-command dispatch + startup hook
- gateway/run.py — gateway-boot hook (mirrors CLI)
Validation:
- 54 new tests across skill_usage + curator, all passing in 3s
- 346 tests across all touched files' neighbors green
- 2783 tests across hermes_cli/ + gateway/test_run_progress_topics.py green
- CLI smoke: `hermes curator status/pause/resume` work end-to-end
Companion to PR #16026 (class-first skill review prompt) — together
they form a loop: the review prompt stops near-duplicate skill creation
at the source, and the curator prunes/consolidates what still accumulates.
Refs #7816.
437 lines
15 KiB
Python
437 lines
15 KiB
Python
"""Skill usage telemetry + provenance tracking for the Curator feature.
|
|
|
|
Tracks per-skill usage metadata in a sidecar JSON file (~/.hermes/skills/.usage.json)
|
|
keyed by skill name. Counters are bumped by the existing skill tools (skill_view,
|
|
skill_manage); the curator orchestrator reads them to decide lifecycle transitions.
|
|
|
|
Design notes:
|
|
- Sidecar, not frontmatter. Keeps operational telemetry out of user-authored
|
|
SKILL.md content and avoids conflict pressure for bundled/hub skills.
|
|
- Atomic writes via tempfile + os.replace (same pattern as .bundled_manifest).
|
|
- All counter bumps are best-effort: failures log at DEBUG and return silently.
|
|
A broken sidecar never breaks the underlying tool call.
|
|
- Provenance filter: "agent-created" == not in .bundled_manifest AND not in
|
|
.hub/lock.json. The curator only ever mutates agent-created skills.
|
|
|
|
Lifecycle states:
|
|
active -> default
|
|
stale -> unused > stale_after_days (config)
|
|
archived -> unused > archive_after_days (config); moved to .archive/
|
|
pinned -> opt-out from auto transitions (boolean flag, orthogonal to state)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
|
|
|
|
from hermes_constants import get_hermes_home
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
STATE_ACTIVE = "active"
|
|
STATE_STALE = "stale"
|
|
STATE_ARCHIVED = "archived"
|
|
_VALID_STATES = {STATE_ACTIVE, STATE_STALE, STATE_ARCHIVED}
|
|
|
|
|
|
def _skills_dir() -> Path:
|
|
return get_hermes_home() / "skills"
|
|
|
|
|
|
def _usage_file() -> Path:
|
|
return _skills_dir() / ".usage.json"
|
|
|
|
|
|
def _archive_dir() -> Path:
|
|
return _skills_dir() / ".archive"
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Provenance — which skills are agent-created (and thus eligible for curation)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _read_bundled_manifest_names() -> Set[str]:
|
|
"""Return the set of skill names that were seeded from the bundled repo.
|
|
|
|
Reads ~/.hermes/skills/.bundled_manifest (format: "name:hash" per line).
|
|
Returns empty set if the file is missing or unreadable.
|
|
"""
|
|
manifest = _skills_dir() / ".bundled_manifest"
|
|
if not manifest.exists():
|
|
return set()
|
|
names: Set[str] = set()
|
|
try:
|
|
for line in manifest.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
name = line.split(":", 1)[0].strip()
|
|
if name:
|
|
names.add(name)
|
|
except OSError as e:
|
|
logger.debug("Failed to read bundled manifest: %s", e)
|
|
return names
|
|
|
|
|
|
def _read_hub_installed_names() -> Set[str]:
|
|
"""Return the set of skill names installed via the Skills Hub.
|
|
|
|
Reads ~/.hermes/skills/.hub/lock.json (see tools/skills_hub.py :: HubLockFile).
|
|
"""
|
|
lock_path = _skills_dir() / ".hub" / "lock.json"
|
|
if not lock_path.exists():
|
|
return set()
|
|
try:
|
|
data = json.loads(lock_path.read_text(encoding="utf-8"))
|
|
if isinstance(data, dict):
|
|
installed = data.get("installed") or {}
|
|
if isinstance(installed, dict):
|
|
return {str(k) for k in installed.keys()}
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
logger.debug("Failed to read hub lock file: %s", e)
|
|
return set()
|
|
|
|
|
|
def list_agent_created_skill_names() -> List[str]:
|
|
"""Enumerate skills that were authored by the agent (or user), NOT by a
|
|
bundled or hub-installed source.
|
|
|
|
The curator operates exclusively on this set. Bundled / hub skills are
|
|
maintained by their upstream sources and must never be pruned here.
|
|
"""
|
|
base = _skills_dir()
|
|
if not base.exists():
|
|
return []
|
|
bundled = _read_bundled_manifest_names()
|
|
hub = _read_hub_installed_names()
|
|
off_limits = bundled | hub
|
|
|
|
names: List[str] = []
|
|
# Top-level SKILL.md files (flat layout) AND nested category/skill/SKILL.md
|
|
for skill_md in base.rglob("SKILL.md"):
|
|
# Skip anything under .archive or .hub
|
|
try:
|
|
rel = skill_md.relative_to(base)
|
|
except ValueError:
|
|
continue
|
|
parts = rel.parts
|
|
if parts and (parts[0].startswith(".") or parts[0] == "node_modules"):
|
|
continue
|
|
name = _read_skill_name(skill_md, fallback=skill_md.parent.name)
|
|
if name in off_limits:
|
|
continue
|
|
names.append(name)
|
|
return sorted(set(names))
|
|
|
|
|
|
def _read_skill_name(skill_md: Path, fallback: str) -> str:
|
|
"""Parse the `name:` field from a SKILL.md YAML frontmatter."""
|
|
try:
|
|
text = skill_md.read_text(encoding="utf-8", errors="replace")[:4000]
|
|
except OSError:
|
|
return fallback
|
|
in_frontmatter = False
|
|
for line in text.split("\n"):
|
|
stripped = line.strip()
|
|
if stripped == "---":
|
|
if in_frontmatter:
|
|
break
|
|
in_frontmatter = True
|
|
continue
|
|
if in_frontmatter and stripped.startswith("name:"):
|
|
value = stripped.split(":", 1)[1].strip().strip("\"'")
|
|
if value:
|
|
return value
|
|
return fallback
|
|
|
|
|
|
def is_agent_created(skill_name: str) -> bool:
|
|
"""Whether *skill_name* is neither bundled nor hub-installed."""
|
|
off_limits = _read_bundled_manifest_names() | _read_hub_installed_names()
|
|
return skill_name not in off_limits
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sidecar I/O
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _empty_record() -> Dict[str, Any]:
|
|
return {
|
|
"use_count": 0,
|
|
"view_count": 0,
|
|
"last_used_at": None,
|
|
"last_viewed_at": None,
|
|
"patch_count": 0,
|
|
"last_patched_at": None,
|
|
"created_at": _now_iso(),
|
|
"state": STATE_ACTIVE,
|
|
"pinned": False,
|
|
"archived_at": None,
|
|
}
|
|
|
|
|
|
def load_usage() -> Dict[str, Dict[str, Any]]:
|
|
"""Read the entire .usage.json map. Returns empty dict on missing/corrupt."""
|
|
path = _usage_file()
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
logger.debug("Failed to read %s: %s", path, e)
|
|
return {}
|
|
if not isinstance(data, dict):
|
|
return {}
|
|
# Defensive: coerce any non-dict values to a fresh empty record
|
|
clean: Dict[str, Dict[str, Any]] = {}
|
|
for k, v in data.items():
|
|
if isinstance(v, dict):
|
|
clean[str(k)] = v
|
|
return clean
|
|
|
|
|
|
def save_usage(data: Dict[str, Dict[str, Any]]) -> None:
|
|
"""Write the usage map atomically. Best-effort — errors are logged, not raised."""
|
|
path = _usage_file()
|
|
try:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
fd, tmp_path = tempfile.mkstemp(
|
|
dir=str(path.parent), prefix=".usage_", suffix=".tmp"
|
|
)
|
|
try:
|
|
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2, sort_keys=True, ensure_ascii=False)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp_path, path)
|
|
except BaseException:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
except Exception as e:
|
|
logger.debug("Failed to write %s: %s", path, e, exc_info=True)
|
|
|
|
|
|
def get_record(skill_name: str) -> Dict[str, Any]:
|
|
"""Return the record for *skill_name*, creating a fresh one if missing."""
|
|
data = load_usage()
|
|
rec = data.get(skill_name)
|
|
if not isinstance(rec, dict):
|
|
return _empty_record()
|
|
# Backfill any missing keys so callers don't need to handle old files
|
|
base = _empty_record()
|
|
for k, v in base.items():
|
|
rec.setdefault(k, v)
|
|
return rec
|
|
|
|
|
|
def _mutate(skill_name: str, mutator) -> None:
|
|
"""Load, apply *mutator(record)* in place, save. Best-effort."""
|
|
if not skill_name:
|
|
return
|
|
try:
|
|
data = load_usage()
|
|
rec = data.get(skill_name)
|
|
if not isinstance(rec, dict):
|
|
rec = _empty_record()
|
|
mutator(rec)
|
|
data[skill_name] = rec
|
|
save_usage(data)
|
|
except Exception as e:
|
|
logger.debug("skill_usage._mutate(%s) failed: %s", skill_name, e, exc_info=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public counter-bump helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def bump_view(skill_name: str) -> None:
|
|
"""Bump view_count and last_viewed_at. Called from skill_view()."""
|
|
def _apply(rec: Dict[str, Any]) -> None:
|
|
rec["view_count"] = int(rec.get("view_count") or 0) + 1
|
|
rec["last_viewed_at"] = _now_iso()
|
|
_mutate(skill_name, _apply)
|
|
|
|
|
|
def bump_use(skill_name: str) -> None:
|
|
"""Bump use_count and last_used_at. Called when a skill is actively used
|
|
(e.g. loaded into the prompt path or referenced from an assistant turn)."""
|
|
def _apply(rec: Dict[str, Any]) -> None:
|
|
rec["use_count"] = int(rec.get("use_count") or 0) + 1
|
|
rec["last_used_at"] = _now_iso()
|
|
_mutate(skill_name, _apply)
|
|
|
|
|
|
def bump_patch(skill_name: str) -> None:
|
|
"""Bump patch_count and last_patched_at. Called from skill_manage (patch/edit)."""
|
|
def _apply(rec: Dict[str, Any]) -> None:
|
|
rec["patch_count"] = int(rec.get("patch_count") or 0) + 1
|
|
rec["last_patched_at"] = _now_iso()
|
|
_mutate(skill_name, _apply)
|
|
|
|
|
|
def set_state(skill_name: str, state: str) -> None:
|
|
"""Set lifecycle state. No-op if *state* is invalid."""
|
|
if state not in _VALID_STATES:
|
|
logger.debug("set_state: invalid state %r for %s", state, skill_name)
|
|
return
|
|
def _apply(rec: Dict[str, Any]) -> None:
|
|
rec["state"] = state
|
|
if state == STATE_ARCHIVED:
|
|
rec["archived_at"] = _now_iso()
|
|
elif state == STATE_ACTIVE:
|
|
rec["archived_at"] = None
|
|
_mutate(skill_name, _apply)
|
|
|
|
|
|
def set_pinned(skill_name: str, pinned: bool) -> None:
|
|
def _apply(rec: Dict[str, Any]) -> None:
|
|
rec["pinned"] = bool(pinned)
|
|
_mutate(skill_name, _apply)
|
|
|
|
|
|
def forget(skill_name: str) -> None:
|
|
"""Drop a skill's usage entry entirely. Called when the skill is deleted."""
|
|
if not skill_name:
|
|
return
|
|
try:
|
|
data = load_usage()
|
|
if skill_name in data:
|
|
del data[skill_name]
|
|
save_usage(data)
|
|
except Exception as e:
|
|
logger.debug("skill_usage.forget(%s) failed: %s", skill_name, e, exc_info=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Archive / restore
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def archive_skill(skill_name: str) -> Tuple[bool, str]:
|
|
"""Move an agent-created skill directory to ~/.hermes/skills/.archive/.
|
|
|
|
Returns (ok, message). Never archives bundled or hub skills — callers are
|
|
responsible for checking provenance, but we double-check here as a safety net.
|
|
"""
|
|
if not is_agent_created(skill_name):
|
|
return False, f"skill '{skill_name}' is bundled or hub-installed; never archive"
|
|
|
|
skill_dir = _find_skill_dir(skill_name)
|
|
if skill_dir is None:
|
|
return False, f"skill '{skill_name}' not found"
|
|
|
|
archive_root = _archive_dir()
|
|
try:
|
|
archive_root.mkdir(parents=True, exist_ok=True)
|
|
except OSError as e:
|
|
return False, f"failed to create archive dir: {e}"
|
|
|
|
# Flatten any category nesting into a single ".archive/<skill>/" so restores
|
|
# are simple. If a collision exists, append a timestamp.
|
|
dest = archive_root / skill_dir.name
|
|
if dest.exists():
|
|
dest = archive_root / f"{skill_dir.name}-{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}"
|
|
|
|
try:
|
|
skill_dir.rename(dest)
|
|
except OSError as e:
|
|
# Cross-device — fall back to shutil.move
|
|
import shutil
|
|
try:
|
|
shutil.move(str(skill_dir), str(dest))
|
|
except Exception as e2:
|
|
return False, f"failed to archive: {e2}"
|
|
|
|
set_state(skill_name, STATE_ARCHIVED)
|
|
return True, f"archived to {dest}"
|
|
|
|
|
|
def restore_skill(skill_name: str) -> Tuple[bool, str]:
|
|
"""Move an archived skill back to ~/.hermes/skills/. Restores to the flat
|
|
top-level layout; original category nesting is NOT reconstructed."""
|
|
archive_root = _archive_dir()
|
|
if not archive_root.exists():
|
|
return False, "no archive directory"
|
|
|
|
# Try exact name match first, then any prefix match (for timestamped dupes)
|
|
candidates = [p for p in archive_root.iterdir() if p.is_dir() and p.name == skill_name]
|
|
if not candidates:
|
|
candidates = sorted(
|
|
[p for p in archive_root.iterdir()
|
|
if p.is_dir() and p.name.startswith(f"{skill_name}-")],
|
|
reverse=True,
|
|
)
|
|
if not candidates:
|
|
return False, f"skill '{skill_name}' not found in archive"
|
|
|
|
src = candidates[0]
|
|
dest = _skills_dir() / skill_name
|
|
if dest.exists():
|
|
return False, f"destination already exists: {dest}"
|
|
|
|
try:
|
|
src.rename(dest)
|
|
except OSError:
|
|
import shutil
|
|
try:
|
|
shutil.move(str(src), str(dest))
|
|
except Exception as e:
|
|
return False, f"failed to restore: {e}"
|
|
|
|
set_state(skill_name, STATE_ACTIVE)
|
|
return True, f"restored to {dest}"
|
|
|
|
|
|
def _find_skill_dir(skill_name: str) -> Optional[Path]:
|
|
"""Locate the directory for a skill by its frontmatter `name:` field.
|
|
|
|
Handles both flat (~/.hermes/skills/<skill>/SKILL.md) and category-nested
|
|
(~/.hermes/skills/<category>/<skill>/SKILL.md) layouts.
|
|
"""
|
|
base = _skills_dir()
|
|
if not base.exists():
|
|
return None
|
|
for skill_md in base.rglob("SKILL.md"):
|
|
try:
|
|
rel = skill_md.relative_to(base)
|
|
except ValueError:
|
|
continue
|
|
if rel.parts and rel.parts[0].startswith("."):
|
|
continue
|
|
if _read_skill_name(skill_md, fallback=skill_md.parent.name) == skill_name:
|
|
return skill_md.parent
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Reporting — for the curator CLI / slash command
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def agent_created_report() -> List[Dict[str, Any]]:
|
|
"""Return a list of {name, state, pinned, last_used_at, use_count, ...}
|
|
records for every agent-created skill. Missing usage records are backfilled
|
|
with defaults so callers can always index fields."""
|
|
data = load_usage()
|
|
rows: List[Dict[str, Any]] = []
|
|
for name in list_agent_created_skill_names():
|
|
rec = data.get(name)
|
|
if not isinstance(rec, dict):
|
|
rec = _empty_record()
|
|
base = _empty_record()
|
|
for k, v in base.items():
|
|
rec.setdefault(k, v)
|
|
rows.append({"name": name, **rec})
|
|
return rows
|