fix: backfill official optional skill provenance

This commit is contained in:
wysie 2026-05-27 18:32:03 +08:00 committed by kshitij
parent a38e283395
commit f040710d04
4 changed files with 414 additions and 2 deletions

View file

@ -22,11 +22,13 @@ The manifest lives at ~/.hermes/skills/.bundled_manifest.
"""
import hashlib
import json
import logging
import os
import shutil
from pathlib import Path
from hermes_constants import get_bundled_skills_dir, get_hermes_home
from datetime import datetime, timezone
from pathlib import Path, PurePosixPath
from hermes_constants import get_bundled_skills_dir, get_hermes_home, get_optional_skills_dir
from agent.skill_utils import is_excluded_skill_path
from typing import Dict, List, Tuple
from utils import atomic_replace
@ -49,6 +51,11 @@ def _get_bundled_dir() -> Path:
return get_bundled_skills_dir(Path(__file__).parent.parent / "skills")
def _get_optional_dir() -> Path:
"""Locate the official optional-skills/ directory."""
return get_optional_skills_dir(Path(__file__).parent.parent / "optional-skills")
def _read_manifest() -> Dict[str, str]:
"""
Read the manifest as a dict of {skill_name: origin_hash}.
@ -172,6 +179,221 @@ def _dir_hash(directory: Path) -> str:
return hasher.hexdigest()
def _safe_rel_install_path(path: Path, base: Path) -> str:
"""Return a normalized relative POSIX path, rejecting traversal/absolute paths."""
rel = path.relative_to(base)
posix = rel.as_posix()
pure = PurePosixPath(posix)
parts = [part for part in pure.parts if part not in {"", "."}]
if pure.is_absolute() or not parts or any(part == ".." for part in parts):
raise ValueError(f"Unsafe optional skill path: {posix}")
return "/".join(parts)
def _skill_file_list(skill_dir: Path) -> List[str]:
"""List files inside a skill directory in lock-file format."""
files: List[str] = []
for fpath in sorted(skill_dir.rglob("*")):
if fpath.is_file():
files.append(fpath.relative_to(skill_dir).as_posix())
return files
def _content_hash(directory: Path) -> str:
"""Return the same hash style the skills hub lock uses, falling back locally."""
try:
from tools.skills_guard import content_hash
return content_hash(directory)
except Exception:
# Hashing is provenance metadata only; keep sync resilient if guard
# dependencies are unavailable in a packaged/update context.
return _dir_hash(directory)
def _optional_skill_index() -> Dict[str, Tuple[str, str, Path]]:
"""Return official optional skills keyed by folder name and frontmatter name.
Values are ``(folder_name, install_path, source_dir)``. Multiple keys may
point to the same skill so callers can accept either the folder slug used
by the hub lock or the user-facing frontmatter name.
"""
optional_dir = _get_optional_dir()
index: Dict[str, Tuple[str, str, Path]] = {}
if not optional_dir.exists():
return index
for skill_md in sorted(optional_dir.rglob("SKILL.md")):
if is_excluded_skill_path(skill_md):
continue
src = skill_md.parent
try:
install_path = _safe_rel_install_path(src, optional_dir)
except ValueError:
continue
folder_name = src.name
frontmatter_name = _read_skill_name(skill_md, folder_name)
value = (folder_name, install_path, src)
index[folder_name] = value
index[frontmatter_name] = value
return index
def _move_to_restore_backup(path: Path, backup_root: Path) -> str:
"""Move an existing skill directory into a restore backup, preserving rel path."""
rel = path.relative_to(SKILLS_DIR)
target = backup_root / rel
target.parent.mkdir(parents=True, exist_ok=True)
if target.exists():
suffix = 1
while target.with_name(f"{target.name}-{suffix}").exists():
suffix += 1
target = target.with_name(f"{target.name}-{suffix}")
shutil.move(str(path), str(target))
return rel.as_posix()
def restore_official_optional_skill(name: str, *, restore: bool = False) -> dict:
"""Restore one or all official optional skills from repo source.
``restore=False`` only performs exact-match provenance backfill. ``restore=True``
repairs already-mutated/reorganized skills by backing up matching active
copies and copying the official optional source into its canonical path.
"""
index = _optional_skill_index()
if not index:
return {"ok": False, "message": "No official optional skills directory found.", "restored": [], "backfilled": [], "backed_up": []}
targets = sorted(set(index.values()), key=lambda item: item[1]) if name in {"all", "*"} else []
if not targets:
target = index.get(name)
if target is None:
return {"ok": False, "message": f"Official optional skill not found: {name}", "restored": [], "backfilled": [], "backed_up": []}
targets = [target]
restored: List[str] = []
backed_up: List[str] = []
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
backup_root = SKILLS_DIR / ".restore-backups" / f"official-optional-{timestamp}"
for folder_name, install_path, src in targets:
dest = SKILLS_DIR / Path(*install_path.split("/"))
src_hash = _dir_hash(src)
canonical_ok = dest.exists() and _dir_hash(dest) == src_hash
# Find already-active copies of this official skill by frontmatter name
# or folder slug, even if curator moved it into another category.
src_frontmatter = _read_skill_name(src / "SKILL.md", folder_name)
matches: List[Path] = []
if SKILLS_DIR.exists():
for skill_md in sorted(SKILLS_DIR.rglob("SKILL.md")):
if is_excluded_skill_path(skill_md):
continue
candidate = skill_md.parent
try:
candidate.relative_to(SKILLS_DIR)
except ValueError:
continue
candidate_name = _read_skill_name(skill_md, candidate.name)
if candidate == dest:
continue
if candidate.name == folder_name or candidate_name in {folder_name, src_frontmatter}:
matches.append(candidate)
if restore:
for match in matches:
if match.exists():
backed_up.append(_move_to_restore_backup(match, backup_root))
if dest.exists() and not canonical_ok:
backed_up.append(_move_to_restore_backup(dest, backup_root))
if not dest.exists():
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(src, dest)
restored.append(folder_name)
elif not canonical_ok:
continue
backfilled = _backfill_optional_provenance(quiet=True)
return {
"ok": True,
"message": "Official optional skill repair complete.",
"restored": restored,
"backfilled": backfilled,
"backed_up": backed_up,
"backup_dir": str(backup_root) if backed_up else "",
}
def _backfill_optional_provenance(quiet: bool = False) -> List[str]:
"""Mark already-present official optional skills as hub-installed.
This covers the migration case where a skill used to be bundled (or was
manually copied into the active skills tree) and later lives under
optional-skills/. If the active copy is byte-identical to the official
optional source, record official hub provenance without copying or
reinstalling anything. Modified/local skills are left alone.
"""
optional_dir = _get_optional_dir()
if not optional_dir.exists():
return []
lock_path = SKILLS_DIR / ".hub" / "lock.json"
try:
data = json.loads(lock_path.read_text()) if lock_path.exists() else {"version": 1, "installed": {}}
except (json.JSONDecodeError, OSError):
data = {"version": 1, "installed": {}}
installed = data.setdefault("installed", {})
existing_paths = {
entry.get("install_path")
for entry in installed.values()
if isinstance(entry, dict)
}
backfilled: List[str] = []
changed = False
for skill_md in sorted(optional_dir.rglob("SKILL.md")):
if is_excluded_skill_path(skill_md):
continue
src = skill_md.parent
try:
install_path = _safe_rel_install_path(src, optional_dir)
except ValueError as e:
logger.debug("Skipping optional skill with unsafe path %s: %s", src, e)
continue
dest = SKILLS_DIR / Path(*install_path.split("/"))
if not dest.exists() or not dest.is_dir():
continue
if _dir_hash(dest) != _dir_hash(src):
continue
lock_name = src.name
if lock_name in installed or install_path in existing_paths:
continue
timestamp = datetime.now(timezone.utc).isoformat()
installed[lock_name] = {
"source": "official",
"identifier": f"official/{install_path}",
"trust_level": "builtin",
"scan_verdict": "backfilled",
"content_hash": _content_hash(dest),
"install_path": install_path,
"files": _skill_file_list(dest),
"metadata": {"backfilled_from": "optional-skills"},
"installed_at": timestamp,
"updated_at": timestamp,
}
existing_paths.add(install_path)
backfilled.append(lock_name)
changed = True
if not quiet:
print(f" = {lock_name} (official optional provenance backfilled)")
if changed:
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text(json.dumps(data, indent=2, ensure_ascii=False) + "\n")
return backfilled
def sync_skills(quiet: bool = False) -> dict:
"""
Sync bundled skills into ~/.hermes/skills/ using the manifest.
@ -185,6 +407,7 @@ def sync_skills(quiet: bool = False) -> dict:
return {
"copied": [], "updated": [], "skipped": 0,
"user_modified": [], "cleaned": [], "total_bundled": 0,
"optional_provenance_backfilled": [],
}
SKILLS_DIR.mkdir(parents=True, exist_ok=True)
@ -305,6 +528,7 @@ def sync_skills(quiet: bool = False) -> dict:
logger.debug("Could not copy %s: %s", desc_md, e)
_write_manifest(manifest)
optional_provenance_backfilled = _backfill_optional_provenance(quiet=quiet)
return {
"copied": copied,
@ -313,6 +537,7 @@ def sync_skills(quiet: bool = False) -> dict:
"user_modified": user_modified,
"cleaned": cleaned,
"total_bundled": len(bundled_skills),
"optional_provenance_backfilled": optional_provenance_backfilled,
}
@ -431,4 +656,6 @@ if __name__ == "__main__":
parts.append(f"{len(names)} user-modified (kept): {shown}")
if result["cleaned"]:
parts.append(f"{len(result['cleaned'])} cleaned from manifest")
if result.get("optional_provenance_backfilled"):
parts.append(f"{len(result['optional_provenance_backfilled'])} official optional backfilled")
print(f"\nDone: {', '.join(parts)}. {result['total_bundled']} total bundled.")