fix(skills,pairing): path traversal guard in uninstall, lock list_pending, hash file paths

- skills_hub: validate that uninstall_skill's install_path resolves
  inside SKILLS_DIR before calling shutil.rmtree, preventing recursive
  deletion of arbitrary directories via poisoned lock.json entries
- skills_hub: include file paths (not just contents) in
  bundle_content_hash so swapping filenames between files changes the
  hash, strengthening update-detection integrity
- pairing: wrap list_pending() in self._lock so _cleanup_expired() file
  writes don't race with concurrent generate_code()/approve_code() calls

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
aaronagent 2026-04-09 22:29:23 +08:00 committed by Teknium
parent 8cf977c8b1
commit b82608a6f5
2 changed files with 32 additions and 20 deletions

View file

@ -287,26 +287,27 @@ class PairingStore:
can see them age out without crashing on a missing ``hash`` field.
"""
results = []
platforms = [platform] if platform else self._all_platforms("pending")
for p in platforms:
self._cleanup_expired(p)
pending = self._load_json(self._pending_path(p))
for entry_id, info in pending.items():
if not isinstance(info, dict):
continue
created_at = info.get("created_at")
if not isinstance(created_at, (int, float)):
continue
age_min = int((time.time() - created_at) / 60)
hash_val = info.get("hash")
code_display = hash_val[:8] if isinstance(hash_val, str) else "legacy"
results.append({
"platform": p,
"code": code_display,
"user_id": info.get("user_id", ""),
"user_name": info.get("user_name", ""),
"age_minutes": age_min,
})
with self._lock:
platforms = [platform] if platform else self._all_platforms("pending")
for p in platforms:
self._cleanup_expired(p)
pending = self._load_json(self._pending_path(p))
for entry_id, info in pending.items():
if not isinstance(info, dict):
continue
created_at = info.get("created_at")
if not isinstance(created_at, (int, float)):
continue
age_min = int((time.time() - created_at) / 60)
hash_val = info.get("hash")
code_display = hash_val[:8] if isinstance(hash_val, str) else "legacy"
results.append({
"platform": p,
"code": code_display,
"user_id": info.get("user_id", ""),
"user_name": info.get("user_name", ""),
"age_minutes": age_min,
})
return results
def clear_pending(self, platform: str = None) -> int:

View file

@ -3000,6 +3000,13 @@ def uninstall_skill(skill_name: str) -> Tuple[bool, str]:
return False, f"'{skill_name}' is not a hub-installed skill (may be a builtin)"
install_path = SKILLS_DIR / entry["install_path"]
# Prevent path traversal from poisoned lock.json entries
try:
resolved = install_path.resolve()
if not resolved.is_relative_to(SKILLS_DIR.resolve()):
return False, f"Refusing to remove '{entry['install_path']}': resolves outside skills directory"
except (ValueError, OSError):
return False, f"Refusing to remove '{entry['install_path']}': path resolution failed"
if install_path.exists():
shutil.rmtree(install_path)
@ -3013,6 +3020,10 @@ def bundle_content_hash(bundle: SkillBundle) -> str:
"""Compute a deterministic hash for an in-memory skill bundle."""
h = hashlib.sha256()
for rel_path in sorted(bundle.files):
# Include the path so swapping file contents between two paths
# changes the hash (avoids filename-swap evading update detection).
h.update(rel_path.encode("utf-8"))
h.update(b"\x00")
content = bundle.files[rel_path]
if isinstance(content, bytes):
h.update(content)