hermes-agent/tools/skill_usage.py
Teknium cc38282b04 feat(cross-platform): psutil for PID/process management + Windows footgun checker
## Why

Hermes supports Linux, macOS, and native Windows, but the codebase grew up
POSIX-first and has accumulated patterns that silently break (or worse,
silently kill!) on Windows:

- `os.kill(pid, 0)` as a liveness probe — on Windows this maps to
  CTRL_C_EVENT and broadcasts Ctrl+C to the target's entire console
  process group (bpo-14484, open since 2012).
- `os.killpg` — doesn't exist on Windows at all (AttributeError).
- `os.setsid` / `os.getuid` / `os.geteuid` — same.
- `signal.SIGKILL` / `signal.SIGHUP` / `signal.SIGUSR1` — module-attr
  errors at runtime on Windows.
- `open(path)` / `open(path, "r")` without explicit encoding= — inherits
  the platform default, which is cp1252/mbcs on Windows (UTF-8 on POSIX),
  causing mojibake round-tripping between hosts.
- `wmic` — removed from Windows 10 21H1+.

This commit does three things:

1. Makes `psutil` a core dependency and migrates critical callsites to it.
2. Adds a grep-based CI gate (`scripts/check-windows-footguns.py`) that
   blocks new instances of any of the above patterns.
3. Fixes every existing instance in the codebase so the baseline is clean.

## What changed

### 1. psutil as a core dependency (pyproject.toml)

Added `psutil>=5.9.0,<8` to core deps. psutil is the canonical
cross-platform answer for "is this PID alive" and "kill this process
tree" — its `pid_exists()` uses `OpenProcess + GetExitCodeProcess` on
Windows (NOT a signal call), and its `Process.children(recursive=True)`
+ `.kill()` combo replaces `os.killpg()` portably.

### 2. `gateway/status.py::_pid_exists`

Rewrote to call `psutil.pid_exists()` first, falling back to the
hand-rolled ctypes `OpenProcess + WaitForSingleObject` dance on Windows
(and `os.kill(pid, 0)` on POSIX) only if psutil is somehow missing —
e.g. during the scaffold phase of a fresh install before pip finishes.

### 3. `os.killpg` migration to psutil (7 callsites, 5 files)

- `tools/code_execution_tool.py`
- `tools/process_registry.py`
- `tools/tts_tool.py`
- `tools/environments/local.py` (3 sites kept as-is, suppressed with
  `# windows-footgun: ok` — the pgid semantics psutil can't replicate,
  and the calls are already Windows-guarded at the outer branch)
- `gateway/platforms/whatsapp.py`

### 4. `scripts/check-windows-footguns.py` (NEW, 500 lines)

Grep-based checker with 11 rules covering every Windows cross-platform
footgun we've hit so far:

1. `os.kill(pid, 0)` — the silent killer
2. `os.setsid` without guard
3. `os.killpg` (recommends psutil)
4. `os.getuid` / `os.geteuid` / `os.getgid`
5. `os.fork`
6. `signal.SIGKILL`
7. `signal.SIGHUP/SIGUSR1/SIGUSR2/SIGALRM/SIGCHLD/SIGPIPE/SIGQUIT`
8. `subprocess` shebang script invocation
9. `wmic` without `shutil.which` guard
10. Hardcoded `~/Desktop` (OneDrive trap)
11. `asyncio.add_signal_handler` without try/except
12. `open()` without `encoding=` on text mode

Features:
- Triple-quoted-docstring aware (won't flag prose inside docstrings)
- Trailing-comment aware (won't flag mentions in `# os.kill(pid, 0)` comments)
- Guard-hint aware (skips lines with `hasattr(os, ...)`,
  `shutil.which(...)`, `if platform.system() != 'Windows'`, etc.)
- Inline suppression with `# windows-footgun: ok — <reason>`
- `--list` to print all rules with fixes
- `--all` / `--diff <ref>` / staged-files (default) modes
- Scans 380 files in under 2 seconds

### 5. CI integration

A GitHub Actions workflow that runs the checker on every PR and push is
staged at `/tmp/hermes-stash/windows-footguns.yml` — not included in this
commit because the GH token on the push machine lacks `workflow` scope.
A maintainer with `workflow` permissions should add it as
`.github/workflows/windows-footguns.yml` in a follow-up. Content:

```yaml
name: Windows footgun check
on:
  push:
    branches: [main]
  pull_request:
    branches: [main]
jobs:
  check:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: {python-version: "3.11"}
      - run: python scripts/check-windows-footguns.py --all
```

### 6. CONTRIBUTING.md — "Cross-Platform Compatibility" expansion

Expanded from 5 to 16 rules, each with message, example, and fix.
Recommends psutil as the preferred API for PID / process-tree operations.

### 7. Baseline cleanup (91 → 0 findings)

- 14 `open()` sites → added `encoding='utf-8'` (internal logs/caches) or
  `encoding='utf-8-sig'` (user-editable files that Notepad may BOM)
- 23 POSIX-only callsites in systemd helpers, pty_bridge, and plugin
  tool subprocess management → annotated with
  `# windows-footgun: ok — <reason>`
- 7 `os.killpg` sites → migrated to psutil (see §3 above)

## Verification

```
$ python scripts/check-windows-footguns.py --all
✓ No Windows footguns found (380 file(s) scanned).

$ python -c "from gateway.status import _pid_exists; import os
> print('self:', _pid_exists(os.getpid())); print('bogus:', _pid_exists(999999))"
self: True
bogus: False
```

Proof-of-repro that `os.kill(pid, 0)` was actually killing processes
before this fix — see commit `1cbe39914` and bpo-14484. This commit
removes the last hand-rolled ctypes path from the hot liveness-check
path and defers to the best-maintained cross-platform answer.
2026-05-08 14:27:40 -07:00

609 lines
21 KiB
Python

"""Skill usage telemetry + provenance tracking for the Curator feature.
Tracks per-skill usage metadata in a sidecar JSON file (~/.hermes/skills/.usage.json)
keyed by skill name. Counters are bumped by the existing skill tools (skill_view,
skill_manage); the curator orchestrator reads the derived activity timestamp to
decide lifecycle transitions.
Design notes:
- Sidecar, not frontmatter. Keeps operational telemetry out of user-authored
SKILL.md content and avoids conflict pressure for bundled/hub skills.
- Atomic writes via tempfile + os.replace (same pattern as .bundled_manifest).
- All counter bumps are best-effort: failures log at DEBUG and return silently.
A broken sidecar never breaks the underlying tool call.
- Provenance filter: curator-managed skills are explicitly marked when
created through skill_manage. Bundled / hub-installed skills stay
off-limits, and manually authored skills are not inferred from location.
Lifecycle states:
active -> default
stale -> unused > stale_after_days (config)
archived -> unused > archive_after_days (config); moved to .archive/
pinned -> opt-out from auto transitions (boolean flag, orthogonal to state)
"""
from __future__ import annotations
import json
import logging
import os
import tempfile
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# fcntl is Unix-only; on Windows use msvcrt for file locking.
msvcrt = None
try:
import fcntl
except ImportError: # pragma: no cover - platform-specific fallback
fcntl = None
try:
import msvcrt
except ImportError:
pass
STATE_ACTIVE = "active"
STATE_STALE = "stale"
STATE_ARCHIVED = "archived"
_VALID_STATES = {STATE_ACTIVE, STATE_STALE, STATE_ARCHIVED}
def _skills_dir() -> Path:
return get_hermes_home() / "skills"
def _usage_file() -> Path:
return _skills_dir() / ".usage.json"
@contextmanager
def _usage_file_lock():
"""Serialize .usage.json read-modify-write cycles across processes."""
lock_path = _usage_file().with_suffix(".json.lock")
lock_path.parent.mkdir(parents=True, exist_ok=True)
if fcntl is None and msvcrt is None:
yield
return
if msvcrt and (not lock_path.exists() or lock_path.stat().st_size == 0):
lock_path.write_text(" ", encoding="utf-8")
fd = open(lock_path, "r+" if msvcrt else "a+", encoding="utf-8")
try:
if fcntl:
fcntl.flock(fd, fcntl.LOCK_EX)
else:
fd.seek(0)
msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1)
yield
finally:
if fcntl:
fcntl.flock(fd, fcntl.LOCK_UN)
elif msvcrt:
try:
fd.seek(0)
msvcrt.locking(fd.fileno(), msvcrt.LK_UNLCK, 1)
except (OSError, IOError):
pass
fd.close()
def _archive_dir() -> Path:
return _skills_dir() / ".archive"
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _parse_iso_timestamp(value: Any) -> Optional[datetime]:
"""Parse an ISO timestamp defensively for activity comparisons."""
if not value:
return None
try:
parsed = datetime.fromisoformat(str(value))
except (TypeError, ValueError):
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
def latest_activity_at(record: Dict[str, Any]) -> Optional[str]:
"""Return the newest actual activity timestamp for a usage record.
"Activity" means a skill was used, viewed, or patched. Creation time is
intentionally excluded so callers can still distinguish never-active skills;
lifecycle code can fall back to ``created_at`` as its own anchor.
"""
latest_dt: Optional[datetime] = None
latest_raw: Optional[str] = None
for key in ("last_used_at", "last_viewed_at", "last_patched_at"):
raw = record.get(key)
dt = _parse_iso_timestamp(raw)
if dt is None:
continue
if latest_dt is None or dt > latest_dt:
latest_dt = dt
latest_raw = str(raw)
return latest_raw
def activity_count(record: Dict[str, Any]) -> int:
"""Return the total observed activity count across use/view/patch events."""
total = 0
for key in ("use_count", "view_count", "patch_count"):
try:
total += int(record.get(key) or 0)
except (TypeError, ValueError):
continue
return total
# ---------------------------------------------------------------------------
# Provenance — which skills are agent-created (and thus eligible for curation)
# ---------------------------------------------------------------------------
def _read_bundled_manifest_names() -> Set[str]:
"""Return the set of skill names that were seeded from the bundled repo.
Reads ~/.hermes/skills/.bundled_manifest (format: "name:hash" per line).
Returns empty set if the file is missing or unreadable.
"""
manifest = _skills_dir() / ".bundled_manifest"
if not manifest.exists():
return set()
names: Set[str] = set()
try:
for line in manifest.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
name = line.split(":", 1)[0].strip()
if name:
names.add(name)
except OSError as e:
logger.debug("Failed to read bundled manifest: %s", e)
return names
def _read_hub_installed_names() -> Set[str]:
"""Return the set of skill names installed via the Skills Hub.
Reads ~/.hermes/skills/.hub/lock.json (see tools/skills_hub.py :: HubLockFile).
"""
lock_path = _skills_dir() / ".hub" / "lock.json"
if not lock_path.exists():
return set()
try:
data = json.loads(lock_path.read_text(encoding="utf-8"))
if isinstance(data, dict):
installed = data.get("installed") or {}
if isinstance(installed, dict):
names = {str(k) for k in installed.keys()}
skills_dir = _skills_dir()
for entry in installed.values():
if not isinstance(entry, dict):
continue
install_path = entry.get("install_path")
if not isinstance(install_path, str) or not install_path.strip():
continue
skill_dir = Path(install_path)
if not skill_dir.is_absolute():
skill_dir = skills_dir / skill_dir
try:
resolved = skill_dir.resolve()
resolved.relative_to(skills_dir.resolve())
except (OSError, ValueError):
continue
skill_md = resolved / "SKILL.md"
if skill_md.exists():
names.add(_read_skill_name(skill_md, fallback=resolved.name))
return names
except (OSError, json.JSONDecodeError) as e:
logger.debug("Failed to read hub lock file: %s", e)
return set()
def list_agent_created_skill_names() -> List[str]:
"""Enumerate skills explicitly authored by the agent.
The curator operates exclusively on this set. Skills are only eligible
after ``skill_manage(action="create")`` marks them in ``.usage.json``;
manually authored skills must not be inferred from filesystem location.
Bundled / hub skills are maintained by their upstream sources and must
never be pruned here.
"""
base = _skills_dir()
if not base.exists():
return []
bundled = _read_bundled_manifest_names()
hub = _read_hub_installed_names()
off_limits = bundled | hub
usage = load_usage()
names: List[str] = []
# Top-level SKILL.md files (flat layout) AND nested category/skill/SKILL.md
for skill_md in base.rglob("SKILL.md"):
# Skip anything under .archive or .hub
try:
rel = skill_md.relative_to(base)
except ValueError:
continue
parts = rel.parts
if parts and (parts[0].startswith(".") or parts[0] == "node_modules"):
continue
name = _read_skill_name(skill_md, fallback=skill_md.parent.name)
if name in off_limits:
continue
if not _is_curator_managed_record(usage.get(name)):
continue
names.append(name)
return sorted(set(names))
def list_archived_skill_names() -> List[str]:
"""Enumerate skills in ``~/.hermes/skills/.archive/``.
Archive layout is flat (``.archive/<skill>/``) as set by ``archive_skill``,
so the directory name is the skill name. Used by ``hermes curator
list-archived`` to help users pass a name to ``hermes curator restore``.
"""
archive_root = _archive_dir()
if not archive_root.exists():
return []
return sorted({p.name for p in archive_root.iterdir() if p.is_dir()})
def _read_skill_name(skill_md: Path, fallback: str) -> str:
"""Parse the `name:` field from a SKILL.md YAML frontmatter."""
try:
text = skill_md.read_text(encoding="utf-8", errors="replace")[:4000]
except OSError:
return fallback
in_frontmatter = False
for line in text.split("\n"):
stripped = line.strip()
if stripped == "---":
if in_frontmatter:
break
in_frontmatter = True
continue
if in_frontmatter and stripped.startswith("name:"):
value = stripped.split(":", 1)[1].strip().strip("\"'")
if value:
return value
return fallback
def is_agent_created(skill_name: str) -> bool:
"""Whether *skill_name* is neither bundled nor hub-installed."""
off_limits = _read_bundled_manifest_names() | _read_hub_installed_names()
return skill_name not in off_limits
def _is_curator_managed_record(record: Any) -> bool:
"""Return True when a usage record opts a skill into curator management."""
if not isinstance(record, dict):
return False
return record.get("created_by") == "agent" or record.get("agent_created") is True
# ---------------------------------------------------------------------------
# Sidecar I/O
# ---------------------------------------------------------------------------
def _empty_record() -> Dict[str, Any]:
return {
"created_by": None,
"use_count": 0,
"view_count": 0,
"last_used_at": None,
"last_viewed_at": None,
"patch_count": 0,
"last_patched_at": None,
"created_at": _now_iso(),
"state": STATE_ACTIVE,
"pinned": False,
"archived_at": None,
}
def load_usage() -> Dict[str, Dict[str, Any]]:
"""Read the entire .usage.json map. Returns empty dict on missing/corrupt."""
path = _usage_file()
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as e:
logger.debug("Failed to read %s: %s", path, e)
return {}
if not isinstance(data, dict):
return {}
# Defensive: coerce any non-dict values to a fresh empty record
clean: Dict[str, Dict[str, Any]] = {}
for k, v in data.items():
if isinstance(v, dict):
clean[str(k)] = v
return clean
def save_usage(data: Dict[str, Dict[str, Any]]) -> None:
"""Write the usage map atomically. Best-effort — errors are logged, not raised."""
path = _usage_file()
try:
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(
dir=str(path.parent), prefix=".usage_", suffix=".tmp"
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, sort_keys=True, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
except Exception as e:
logger.debug("Failed to write %s: %s", path, e, exc_info=True)
def get_record(skill_name: str) -> Dict[str, Any]:
"""Return the record for *skill_name*, creating a fresh one if missing."""
data = load_usage()
rec = data.get(skill_name)
if not isinstance(rec, dict):
return _empty_record()
# Backfill any missing keys so callers don't need to handle old files
base = _empty_record()
for k, v in base.items():
rec.setdefault(k, v)
return rec
def _mutate(skill_name: str, mutator) -> None:
"""Load, apply *mutator(record)* in place, save. Best-effort.
Bundled and hub-installed skills are NEVER recorded in the sidecar.
Local manual skills may still accrue usage telemetry, but they only
become curator-managed when ``created_by`` is explicitly marked.
"""
if not skill_name:
return
try:
if not is_agent_created(skill_name):
return
with _usage_file_lock():
data = load_usage()
rec = data.get(skill_name)
if not isinstance(rec, dict):
rec = _empty_record()
mutator(rec)
data[skill_name] = rec
save_usage(data)
except Exception as e:
logger.debug("skill_usage._mutate(%s) failed: %s", skill_name, e, exc_info=True)
# ---------------------------------------------------------------------------
# Public counter-bump helpers
# ---------------------------------------------------------------------------
def bump_view(skill_name: str) -> None:
"""Bump view_count and last_viewed_at. Called from skill_view()."""
def _apply(rec: Dict[str, Any]) -> None:
rec["view_count"] = int(rec.get("view_count") or 0) + 1
rec["last_viewed_at"] = _now_iso()
_mutate(skill_name, _apply)
def bump_use(skill_name: str) -> None:
"""Bump use_count and last_used_at. Called when a skill is actively used
(e.g. loaded into the prompt path or referenced from an assistant turn)."""
def _apply(rec: Dict[str, Any]) -> None:
rec["use_count"] = int(rec.get("use_count") or 0) + 1
rec["last_used_at"] = _now_iso()
_mutate(skill_name, _apply)
def bump_patch(skill_name: str) -> None:
"""Bump patch_count and last_patched_at. Called from skill_manage (patch/edit)."""
def _apply(rec: Dict[str, Any]) -> None:
rec["patch_count"] = int(rec.get("patch_count") or 0) + 1
rec["last_patched_at"] = _now_iso()
_mutate(skill_name, _apply)
def mark_agent_created(skill_name: str) -> None:
"""Opt a skill created by skill_manage into curator management.
Viewing or invoking a manually authored skill may still create telemetry,
but only this explicit marker makes it eligible for automatic curation.
"""
def _apply(rec: Dict[str, Any]) -> None:
rec["created_by"] = "agent"
_mutate(skill_name, _apply)
def set_state(skill_name: str, state: str) -> None:
"""Set lifecycle state. No-op if *state* is invalid."""
if state not in _VALID_STATES:
logger.debug("set_state: invalid state %r for %s", state, skill_name)
return
def _apply(rec: Dict[str, Any]) -> None:
rec["state"] = state
if state == STATE_ARCHIVED:
rec["archived_at"] = _now_iso()
elif state == STATE_ACTIVE:
rec["archived_at"] = None
_mutate(skill_name, _apply)
def set_pinned(skill_name: str, pinned: bool) -> None:
def _apply(rec: Dict[str, Any]) -> None:
rec["pinned"] = bool(pinned)
_mutate(skill_name, _apply)
def forget(skill_name: str) -> None:
"""Drop a skill's usage entry entirely. Called when the skill is deleted."""
if not skill_name:
return
try:
with _usage_file_lock():
data = load_usage()
if skill_name in data:
del data[skill_name]
save_usage(data)
except Exception as e:
logger.debug("skill_usage.forget(%s) failed: %s", skill_name, e, exc_info=True)
# ---------------------------------------------------------------------------
# Archive / restore
# ---------------------------------------------------------------------------
def archive_skill(skill_name: str) -> Tuple[bool, str]:
"""Move an agent-created skill directory to ~/.hermes/skills/.archive/.
Returns (ok, message). Never archives bundled or hub skills — callers are
responsible for checking provenance, but we double-check here as a safety net.
"""
if not is_agent_created(skill_name):
return False, f"skill '{skill_name}' is bundled or hub-installed; never archive"
skill_dir = _find_skill_dir(skill_name)
if skill_dir is None:
return False, f"skill '{skill_name}' not found"
archive_root = _archive_dir()
try:
archive_root.mkdir(parents=True, exist_ok=True)
except OSError as e:
return False, f"failed to create archive dir: {e}"
# Flatten any category nesting into a single ".archive/<skill>/" so restores
# are simple. If a collision exists, append a timestamp.
dest = archive_root / skill_dir.name
if dest.exists():
dest = archive_root / f"{skill_dir.name}-{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}"
try:
skill_dir.rename(dest)
except OSError as e:
# Cross-device — fall back to shutil.move
import shutil
try:
shutil.move(str(skill_dir), str(dest))
except Exception as e2:
return False, f"failed to archive: {e2}"
set_state(skill_name, STATE_ARCHIVED)
return True, f"archived to {dest}"
def restore_skill(skill_name: str) -> Tuple[bool, str]:
"""Move an archived skill back to ~/.hermes/skills/. Restores to the flat
top-level layout; original category nesting is NOT reconstructed.
Refuses to restore under a name that now collides with a bundled or
hub-installed skill — that would shadow the upstream version.
"""
# If a bundled or hub skill has since been installed under the same
# name, refuse to restore rather than shadow it.
if not is_agent_created(skill_name):
return False, (
f"skill '{skill_name}' is now bundled or hub-installed; "
"restore would shadow the upstream version"
)
archive_root = _archive_dir()
if not archive_root.exists():
return False, "no archive directory"
# Try exact name match first, then any prefix match (for timestamped dupes).
# Recursive walk handles nested archive layouts (e.g. .archive/<category>/<skill>/)
# left behind by older archive paths or external imports.
candidates = [p for p in archive_root.rglob("*") if p.is_dir() and p.name == skill_name]
if not candidates:
candidates = sorted(
[p for p in archive_root.rglob("*")
if p.is_dir() and p.name.startswith(f"{skill_name}-")],
reverse=True,
)
if not candidates:
return False, f"skill '{skill_name}' not found in archive"
src = candidates[0]
dest = _skills_dir() / skill_name
if dest.exists():
return False, f"destination already exists: {dest}"
try:
src.rename(dest)
except OSError:
import shutil
try:
shutil.move(str(src), str(dest))
except Exception as e:
return False, f"failed to restore: {e}"
set_state(skill_name, STATE_ACTIVE)
return True, f"restored to {dest}"
def _find_skill_dir(skill_name: str) -> Optional[Path]:
"""Locate the directory for a skill by its frontmatter `name:` field.
Handles both flat (~/.hermes/skills/<skill>/SKILL.md) and category-nested
(~/.hermes/skills/<category>/<skill>/SKILL.md) layouts.
"""
base = _skills_dir()
if not base.exists():
return None
for skill_md in base.rglob("SKILL.md"):
try:
rel = skill_md.relative_to(base)
except ValueError:
continue
if rel.parts and rel.parts[0].startswith("."):
continue
if _read_skill_name(skill_md, fallback=skill_md.parent.name) == skill_name:
return skill_md.parent
return None
# ---------------------------------------------------------------------------
# Reporting — for the curator CLI / slash command
# ---------------------------------------------------------------------------
def agent_created_report() -> List[Dict[str, Any]]:
"""Return a list of {name, state, pinned, last_activity_at, ...}
records for every agent-created skill. Missing usage records are backfilled
with defaults so callers can always index fields."""
data = load_usage()
rows: List[Dict[str, Any]] = []
for name in list_agent_created_skill_names():
rec = data.get(name)
if not isinstance(rec, dict):
rec = _empty_record()
base = _empty_record()
for k, v in base.items():
rec.setdefault(k, v)
row = {"name": name, **rec}
row["last_activity_at"] = latest_activity_at(row)
row["activity_count"] = activity_count(row)
rows.append(row)
return rows