hermes-agent/tools/skills_sync.py
kshitijk4poor 0537e2600d fix(skills): atomic lock write + drop dead _validate_category_name
Self-review follow-ups on the salvage of #33177 + #33188 + #33209:

W3 (real, lock_path.write_text was non-atomic AND the read path silently
resets data to an empty installed dict on JSONDecodeError — a crash mid-
write could nuke ALL hub provenance, not just official-optional). Switch
to the same mkstemp + fsync + atomic_replace pattern that _write_manifest
already uses in this module.

W5 (dead code) — _validate_category_name had one caller on origin/main
(install_from_quarantine), swapped to _validate_install_parent_path by
#33177. Remove the now-unused definition to avoid the attractive-nuisance
of contributors picking the wrong validator.

Behavior preserved on the happy path; verified all 200 skills/hub tests
plus the three E2E scenarios (destructive restore, backfill idempotency,
adversarial nonexistent skill) still pass after both fixes.
2026-05-27 13:39:58 -07:00

683 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Skills Sync -- Manifest-based seeding and updating of bundled skills.
Copies bundled skills from the repo's skills/ directory into ~/.hermes/skills/
and uses a manifest to track which skills have been synced and their origin hash.
Manifest format (v2): each line is "skill_name:origin_hash" where origin_hash
is the MD5 of the bundled skill at the time it was last synced to the user dir.
Old v1 manifests (plain names without hashes) are auto-migrated.
Update logic:
- NEW skills (not in manifest): copied to user dir, origin hash recorded.
- EXISTING skills (in manifest, present in user dir):
* If user copy matches origin hash: user hasn't modified it → safe to
update from bundled if bundled changed. New origin hash recorded.
* If user copy differs from origin hash: user customized it → SKIP.
- DELETED by user (in manifest, absent from user dir): respected, not re-added.
- REMOVED from bundled (in manifest, gone from repo): cleaned from manifest.
The manifest lives at ~/.hermes/skills/.bundled_manifest.
"""
import hashlib
import json
import logging
import os
import shutil
from datetime import datetime, timezone
from pathlib import Path, PurePosixPath
from hermes_constants import get_bundled_skills_dir, get_hermes_home, get_optional_skills_dir
from agent.skill_utils import is_excluded_skill_path
from typing import Dict, List, Tuple
from utils import atomic_replace
logger = logging.getLogger(__name__)
HERMES_HOME = get_hermes_home()
SKILLS_DIR = HERMES_HOME / "skills"
MANIFEST_FILE = SKILLS_DIR / ".bundled_manifest"
def _get_bundled_dir() -> Path:
"""Locate the bundled skills/ directory.
Checks HERMES_BUNDLED_SKILLS env var first (set by Nix wrapper),
then a wheel-installed data dir, then falls back to the relative
path from this source file.
"""
return get_bundled_skills_dir(Path(__file__).parent.parent / "skills")
def _get_optional_dir() -> Path:
"""Locate the official optional-skills/ directory."""
return get_optional_skills_dir(Path(__file__).parent.parent / "optional-skills")
def _read_manifest() -> Dict[str, str]:
"""
Read the manifest as a dict of {skill_name: origin_hash}.
Handles both v1 (plain names) and v2 (name:hash) formats.
v1 entries get an empty hash string which triggers migration on next sync.
"""
if not MANIFEST_FILE.exists():
return {}
try:
result = {}
for line in MANIFEST_FILE.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
if ":" in line:
# v2 format: name:hash
name, _, hash_val = line.partition(":")
result[name.strip()] = hash_val.strip()
else:
# v1 format: plain name — empty hash triggers migration
result[line] = ""
return result
except (OSError, IOError):
return {}
def _write_manifest(entries: Dict[str, str]):
"""Write the manifest file atomically in v2 format (name:hash).
Uses a temp file + os.replace() to avoid corruption if the process
crashes or is interrupted mid-write.
"""
import tempfile
MANIFEST_FILE.parent.mkdir(parents=True, exist_ok=True)
data = "\n".join(f"{name}:{hash_val}" for name, hash_val in sorted(entries.items())) + "\n"
try:
fd, tmp_path = tempfile.mkstemp(
dir=str(MANIFEST_FILE.parent),
prefix=".bundled_manifest_",
suffix=".tmp",
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(data)
f.flush()
os.fsync(f.fileno())
atomic_replace(tmp_path, MANIFEST_FILE)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
except Exception as e:
logger.debug("Failed to write skills manifest %s: %s", MANIFEST_FILE, e, exc_info=True)
def _read_skill_name(skill_md: Path, fallback: str) -> str:
"""Read the name field from SKILL.md YAML frontmatter, falling back to *fallback*."""
try:
content = skill_md.read_text(encoding="utf-8", errors="replace")[:4000]
except OSError:
return fallback
in_frontmatter = False
for line in content.split("\n"):
stripped = line.strip()
if stripped == "---":
if in_frontmatter:
break
in_frontmatter = True
continue
if in_frontmatter and stripped.startswith("name:"):
value = stripped.split(":", 1)[1].strip().strip("\"'")
if value:
return value
return fallback
def _discover_bundled_skills(bundled_dir: Path) -> List[Tuple[str, Path]]:
"""
Find all SKILL.md files in the bundled directory.
Returns list of (skill_name, skill_directory_path) tuples.
"""
skills = []
if not bundled_dir.exists():
return skills
for skill_md in bundled_dir.rglob("SKILL.md"):
if is_excluded_skill_path(skill_md):
continue
skill_dir = skill_md.parent
skill_name = _read_skill_name(skill_md, skill_dir.name)
skills.append((skill_name, skill_dir))
return skills
def _compute_relative_dest(skill_dir: Path, bundled_dir: Path) -> Path:
"""
Compute the destination path in SKILLS_DIR preserving the category structure.
e.g., bundled/skills/mlops/axolotl -> ~/.hermes/skills/mlops/axolotl
"""
rel = skill_dir.relative_to(bundled_dir)
return SKILLS_DIR / rel
def _dir_hash(directory: Path) -> str:
"""Compute a hash of all file contents in a directory for change detection."""
hasher = hashlib.md5()
try:
for fpath in sorted(directory.rglob("*")):
if fpath.is_file():
rel = fpath.relative_to(directory)
hasher.update(str(rel).encode("utf-8"))
hasher.update(fpath.read_bytes())
except (OSError, IOError):
pass
return hasher.hexdigest()
def _safe_rel_install_path(path: Path, base: Path) -> str:
"""Return a normalized relative POSIX path, rejecting traversal/absolute paths."""
rel = path.relative_to(base)
posix = rel.as_posix()
pure = PurePosixPath(posix)
parts = [part for part in pure.parts if part not in {"", "."}]
if pure.is_absolute() or not parts or any(part == ".." for part in parts):
raise ValueError(f"Unsafe optional skill path: {posix}")
return "/".join(parts)
def _skill_file_list(skill_dir: Path) -> List[str]:
"""List files inside a skill directory in lock-file format."""
files: List[str] = []
for fpath in sorted(skill_dir.rglob("*")):
if fpath.is_file():
files.append(fpath.relative_to(skill_dir).as_posix())
return files
def _content_hash(directory: Path) -> str:
"""Return the same hash style the skills hub lock uses, falling back locally."""
try:
from tools.skills_guard import content_hash
return content_hash(directory)
except Exception:
# Hashing is provenance metadata only; keep sync resilient if guard
# dependencies are unavailable in a packaged/update context.
return _dir_hash(directory)
def _optional_skill_index() -> Dict[str, Tuple[str, str, Path]]:
"""Return official optional skills keyed by folder name and frontmatter name.
Values are ``(folder_name, install_path, source_dir)``. Multiple keys may
point to the same skill so callers can accept either the folder slug used
by the hub lock or the user-facing frontmatter name.
"""
optional_dir = _get_optional_dir()
index: Dict[str, Tuple[str, str, Path]] = {}
if not optional_dir.exists():
return index
for skill_md in sorted(optional_dir.rglob("SKILL.md")):
if is_excluded_skill_path(skill_md):
continue
src = skill_md.parent
try:
install_path = _safe_rel_install_path(src, optional_dir)
except ValueError:
continue
folder_name = src.name
frontmatter_name = _read_skill_name(skill_md, folder_name)
value = (folder_name, install_path, src)
index[folder_name] = value
index[frontmatter_name] = value
return index
def _move_to_restore_backup(path: Path, backup_root: Path) -> str:
"""Move an existing skill directory into a restore backup, preserving rel path."""
rel = path.relative_to(SKILLS_DIR)
target = backup_root / rel
target.parent.mkdir(parents=True, exist_ok=True)
if target.exists():
suffix = 1
while target.with_name(f"{target.name}-{suffix}").exists():
suffix += 1
target = target.with_name(f"{target.name}-{suffix}")
shutil.move(str(path), str(target))
return rel.as_posix()
def restore_official_optional_skill(name: str, *, restore: bool = False) -> dict:
"""Restore one or all official optional skills from repo source.
``restore=False`` only performs exact-match provenance backfill. ``restore=True``
repairs already-mutated/reorganized skills by backing up matching active
copies and copying the official optional source into its canonical path.
"""
index = _optional_skill_index()
if not index:
return {"ok": False, "message": "No official optional skills directory found.", "restored": [], "backfilled": [], "backed_up": []}
targets = sorted(set(index.values()), key=lambda item: item[1]) if name in {"all", "*"} else []
if not targets:
target = index.get(name)
if target is None:
return {"ok": False, "message": f"Official optional skill not found: {name}", "restored": [], "backfilled": [], "backed_up": []}
targets = [target]
restored: List[str] = []
backed_up: List[str] = []
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
backup_root = SKILLS_DIR / ".restore-backups" / f"official-optional-{timestamp}"
for folder_name, install_path, src in targets:
dest = SKILLS_DIR / Path(*install_path.split("/"))
src_hash = _dir_hash(src)
canonical_ok = dest.exists() and _dir_hash(dest) == src_hash
# Find already-active copies of this official skill by frontmatter name
# or folder slug, even if curator moved it into another category.
src_frontmatter = _read_skill_name(src / "SKILL.md", folder_name)
matches: List[Path] = []
if SKILLS_DIR.exists():
for skill_md in sorted(SKILLS_DIR.rglob("SKILL.md")):
if is_excluded_skill_path(skill_md):
continue
candidate = skill_md.parent
try:
candidate.relative_to(SKILLS_DIR)
except ValueError:
continue
candidate_name = _read_skill_name(skill_md, candidate.name)
if candidate == dest:
continue
if candidate.name == folder_name or candidate_name in {folder_name, src_frontmatter}:
matches.append(candidate)
if restore:
for match in matches:
if match.exists():
backed_up.append(_move_to_restore_backup(match, backup_root))
if dest.exists() and not canonical_ok:
backed_up.append(_move_to_restore_backup(dest, backup_root))
if not dest.exists():
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(src, dest)
restored.append(folder_name)
elif not canonical_ok:
continue
backfilled = _backfill_optional_provenance(quiet=True)
return {
"ok": True,
"message": "Official optional skill repair complete.",
"restored": restored,
"backfilled": backfilled,
"backed_up": backed_up,
"backup_dir": str(backup_root) if backed_up else "",
}
def _backfill_optional_provenance(quiet: bool = False) -> List[str]:
"""Mark already-present official optional skills as hub-installed.
This covers the migration case where a skill used to be bundled (or was
manually copied into the active skills tree) and later lives under
optional-skills/. If the active copy is byte-identical to the official
optional source, record official hub provenance without copying or
reinstalling anything. Modified/local skills are left alone.
"""
optional_dir = _get_optional_dir()
if not optional_dir.exists():
return []
lock_path = SKILLS_DIR / ".hub" / "lock.json"
try:
data = json.loads(lock_path.read_text()) if lock_path.exists() else {"version": 1, "installed": {}}
except (json.JSONDecodeError, OSError):
data = {"version": 1, "installed": {}}
installed = data.setdefault("installed", {})
existing_paths = {
entry.get("install_path")
for entry in installed.values()
if isinstance(entry, dict)
}
backfilled: List[str] = []
changed = False
for skill_md in sorted(optional_dir.rglob("SKILL.md")):
if is_excluded_skill_path(skill_md):
continue
src = skill_md.parent
try:
install_path = _safe_rel_install_path(src, optional_dir)
except ValueError as e:
logger.debug("Skipping optional skill with unsafe path %s: %s", src, e)
continue
dest = SKILLS_DIR / Path(*install_path.split("/"))
if not dest.exists() or not dest.is_dir():
continue
if _dir_hash(dest) != _dir_hash(src):
continue
lock_name = src.name
if lock_name in installed or install_path in existing_paths:
continue
timestamp = datetime.now(timezone.utc).isoformat()
installed[lock_name] = {
"source": "official",
"identifier": f"official/{install_path}",
"trust_level": "builtin",
"scan_verdict": "backfilled",
"content_hash": _content_hash(dest),
"install_path": install_path,
"files": _skill_file_list(dest),
"metadata": {"backfilled_from": "optional-skills"},
"installed_at": timestamp,
"updated_at": timestamp,
}
existing_paths.add(install_path)
backfilled.append(lock_name)
changed = True
if not quiet:
print(f" = {lock_name} (official optional provenance backfilled)")
if changed:
lock_path.parent.mkdir(parents=True, exist_ok=True)
# Atomic write so a crash mid-write can't silently wipe all provenance
# via the JSONDecodeError fallback above (which resets `installed` to
# an empty dict).
import tempfile
payload = json.dumps(data, indent=2, ensure_ascii=False) + "\n"
fd, tmp_path = tempfile.mkstemp(
dir=str(lock_path.parent),
prefix=".lock_",
suffix=".tmp",
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(payload)
f.flush()
os.fsync(f.fileno())
atomic_replace(tmp_path, lock_path)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
return backfilled
def sync_skills(quiet: bool = False) -> dict:
"""
Sync bundled skills into ~/.hermes/skills/ using the manifest.
Returns:
dict with keys: copied (list), updated (list), skipped (int),
user_modified (list), cleaned (list), total_bundled (int)
"""
bundled_dir = _get_bundled_dir()
if not bundled_dir.exists():
return {
"copied": [], "updated": [], "skipped": 0,
"user_modified": [], "cleaned": [], "total_bundled": 0,
"optional_provenance_backfilled": [],
}
SKILLS_DIR.mkdir(parents=True, exist_ok=True)
manifest = _read_manifest()
bundled_skills = _discover_bundled_skills(bundled_dir)
bundled_names = {name for name, _ in bundled_skills}
copied = []
updated = []
user_modified = []
skipped = 0
for skill_name, skill_src in bundled_skills:
dest = _compute_relative_dest(skill_src, bundled_dir)
bundled_hash = _dir_hash(skill_src)
if skill_name not in manifest:
# ── New skill — never offered before ──
try:
if dest.exists():
# User already has a skill with the same name — don't overwrite.
# Only baseline in the manifest when the on-disk copy is
# byte-identical to bundled (e.g. a reset that re-syncs, or
# a coincidentally identical install); that case is harmless
# to track. If the copy differs (custom skill, hub-installed,
# or user-edited) skip the manifest write: recording
# bundled_hash there would poison update detection by making
# user_hash != origin_hash read as "user-modified" on every
# subsequent sync, permanently blocking bundled updates.
skipped += 1
if _dir_hash(dest) == bundled_hash:
manifest[skill_name] = bundled_hash
elif not quiet:
print(
f"{skill_name}: bundled version shipped but you "
f"already have a local skill by this name — yours "
f"was kept. Run `hermes skills reset {skill_name}` "
f"to replace it with the bundled version."
)
else:
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(skill_src, dest)
copied.append(skill_name)
manifest[skill_name] = bundled_hash
if not quiet:
print(f" + {skill_name}")
except (OSError, IOError) as e:
if not quiet:
print(f" ! Failed to copy {skill_name}: {e}")
# Do NOT add to manifest — next sync should retry
elif dest.exists():
# ── Existing skill — in manifest AND on disk ──
origin_hash = manifest.get(skill_name, "")
user_hash = _dir_hash(dest)
if not origin_hash:
# v1 migration: no origin hash recorded. Set baseline from
# user's current copy so future syncs can detect modifications.
manifest[skill_name] = user_hash
if user_hash == bundled_hash:
skipped += 1 # already in sync
else:
# Can't tell if user modified or bundled changed — be safe
skipped += 1
continue
if user_hash != origin_hash:
# User modified this skill — don't overwrite their changes
user_modified.append(skill_name)
if not quiet:
print(f" ~ {skill_name} (user-modified, skipping)")
continue
# User copy matches origin — check if bundled has a newer version
if bundled_hash != origin_hash:
try:
# Move old copy to a backup so we can restore on failure
backup = dest.with_suffix(".bak")
shutil.move(str(dest), str(backup))
try:
shutil.copytree(skill_src, dest)
manifest[skill_name] = bundled_hash
updated.append(skill_name)
if not quiet:
print(f"{skill_name} (updated)")
# Remove backup after successful copy
shutil.rmtree(backup, ignore_errors=True)
except (OSError, IOError):
# Restore from backup
if backup.exists() and not dest.exists():
shutil.move(str(backup), str(dest))
raise
except (OSError, IOError) as e:
if not quiet:
print(f" ! Failed to update {skill_name}: {e}")
else:
skipped += 1 # bundled unchanged, user unchanged
else:
# ── In manifest but not on disk — user deleted it ──
skipped += 1
# Clean stale manifest entries (skills removed from bundled dir)
cleaned = sorted(set(manifest.keys()) - bundled_names)
for name in cleaned:
del manifest[name]
# Also copy DESCRIPTION.md files for categories (if not already present)
for desc_md in bundled_dir.rglob("DESCRIPTION.md"):
rel = desc_md.relative_to(bundled_dir)
dest_desc = SKILLS_DIR / rel
if not dest_desc.exists():
try:
dest_desc.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(desc_md, dest_desc)
except (OSError, IOError) as e:
logger.debug("Could not copy %s: %s", desc_md, e)
_write_manifest(manifest)
optional_provenance_backfilled = _backfill_optional_provenance(quiet=quiet)
return {
"copied": copied,
"updated": updated,
"skipped": skipped,
"user_modified": user_modified,
"cleaned": cleaned,
"total_bundled": len(bundled_skills),
"optional_provenance_backfilled": optional_provenance_backfilled,
}
def reset_bundled_skill(name: str, restore: bool = False) -> dict:
"""
Reset a bundled skill's manifest tracking so future syncs work normally.
When a user edits a bundled skill, subsequent syncs mark it as
``user_modified`` and skip it forever — even if the user later copies
the bundled version back into place, because the manifest still holds
the *old* origin hash. This function breaks that loop.
Args:
name: The skill name (matches the manifest key / skill frontmatter name).
restore: If True, also delete the user's copy in SKILLS_DIR and let
the next sync re-copy the current bundled version. If False
(default), only clear the manifest entry — the user's
current copy is preserved but future updates work again.
Returns:
dict with keys:
- ok: bool, whether the reset succeeded
- action: one of "manifest_cleared", "restored", "not_in_manifest",
"bundled_missing"
- message: human-readable description
- synced: dict from sync_skills() if a sync was triggered, else None
"""
manifest = _read_manifest()
bundled_dir = _get_bundled_dir()
bundled_skills = _discover_bundled_skills(bundled_dir)
bundled_by_name = dict(bundled_skills)
in_manifest = name in manifest
is_bundled = name in bundled_by_name
if not in_manifest and not is_bundled:
return {
"ok": False,
"action": "not_in_manifest",
"message": (
f"'{name}' is not a tracked bundled skill. Nothing to reset. "
f"(Hub-installed skills use `hermes skills uninstall`.)"
),
"synced": None,
}
# Step 1: drop the manifest entry so next sync treats it as new
if in_manifest:
del manifest[name]
_write_manifest(manifest)
# Step 2 (optional): delete the user's copy so next sync re-copies bundled
deleted_user_copy = False
if restore:
if not is_bundled:
return {
"ok": False,
"action": "bundled_missing",
"message": (
f"'{name}' has no bundled source — manifest entry cleared "
f"but cannot restore from bundled (skill was removed upstream)."
),
"synced": None,
}
# The destination mirrors the bundled path relative to bundled_dir.
dest = _compute_relative_dest(bundled_by_name[name], bundled_dir)
if dest.exists():
try:
shutil.rmtree(dest)
deleted_user_copy = True
except (OSError, IOError) as e:
return {
"ok": False,
"action": "manifest_cleared",
"message": (
f"Cleared manifest entry for '{name}' but could not "
f"delete user copy at {dest}: {e}"
),
"synced": None,
}
# Step 3: run sync to re-baseline (or re-copy if we deleted)
synced = sync_skills(quiet=True)
if restore and deleted_user_copy:
action = "restored"
message = f"Restored '{name}' from bundled source."
elif restore:
# Nothing on disk to delete, but we re-synced — acts like a fresh install
action = "restored"
message = f"Restored '{name}' (no prior user copy, re-copied from bundled)."
else:
action = "manifest_cleared"
message = (
f"Cleared manifest entry for '{name}'. Future `hermes update` runs "
f"will re-baseline against your current copy and accept upstream changes."
)
return {"ok": True, "action": action, "message": message, "synced": synced}
if __name__ == "__main__":
print("Syncing bundled skills into ~/.hermes/skills/ ...")
result = sync_skills(quiet=False)
parts = [
f"{len(result['copied'])} new",
f"{len(result['updated'])} updated",
f"{result['skipped']} unchanged",
]
if result["user_modified"]:
names = result["user_modified"]
MAX_SHOW = 5
shown = ", ".join(names[:MAX_SHOW])
if len(names) > MAX_SHOW:
shown += f", +{len(names) - MAX_SHOW} more"
parts.append(f"{len(names)} user-modified (kept): {shown}")
if result["cleaned"]:
parts.append(f"{len(result['cleaned'])} cleaned from manifest")
if result.get("optional_provenance_backfilled"):
parts.append(f"{len(result['optional_provenance_backfilled'])} official optional backfilled")
print(f"\nDone: {', '.join(parts)}. {result['total_bundled']} total bundled.")