mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-17 09:41:58 +00:00
When display.memory_notifications is set to 'verbose', skill_manage notifications now show meaningful change details instead of just the generic tool message. Before (verbose mode): 💾 📝 Patched SKILL.md in skill 'gogcli' (1 replacement). After (verbose mode): 💾 📝 Skill 'gogcli' patched: "old pitfall text..." → "new pitfall text..." Changes: - skill_manager_tool.py: _patch_skill() now includes old/new string previews (truncated to 200 chars) in the result via '_change' key. _create_skill() and _edit_skill() include skill description from frontmatter for verbose create/edit notifications. - run_agent.py: Background review notification builder now reads the '_change' dict from skill tool results and formats descriptive notifications per action type (patch → old→new diff, create/edit → description preview). Falls back to generic message when _change data is unavailable (backwards compatible). This is especially useful when subagents patch skills, since neither the user nor the parent agent can see what the subagent changed.
1233 lines
46 KiB
Python
1233 lines
46 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Skill Manager Tool -- Agent-Managed Skill Creation & Editing
|
|
|
|
Allows the agent to create, update, and delete skills, turning successful
|
|
approaches into reusable procedural knowledge. New skills are created in
|
|
~/.hermes/skills/. Existing skills (bundled, hub-installed, or user-created)
|
|
can be modified or deleted wherever they live.
|
|
|
|
Skills are the agent's procedural memory: they capture *how to do a specific
|
|
type of task* based on proven experience. General memory (MEMORY.md, USER.md) is
|
|
broad and declarative. Skills are narrow and actionable.
|
|
|
|
Actions:
|
|
create -- Create a new skill (SKILL.md + directory structure)
|
|
edit -- Replace the SKILL.md content of a user skill (full rewrite)
|
|
patch -- Targeted find-and-replace within SKILL.md or any supporting file
|
|
delete -- Remove a user skill entirely
|
|
write_file -- Add/overwrite a supporting file (reference, template, script, asset)
|
|
remove_file-- Remove a supporting file from a user skill
|
|
|
|
Directory layout for user skills:
|
|
~/.hermes/skills/
|
|
├── my-skill/
|
|
│ ├── SKILL.md
|
|
│ ├── references/
|
|
│ ├── templates/
|
|
│ ├── scripts/
|
|
│ └── assets/
|
|
└── category-name/
|
|
└── another-skill/
|
|
└── SKILL.md
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
import tempfile
|
|
from pathlib import Path
|
|
from hermes_constants import get_hermes_home, display_hermes_home
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
|
|
from utils import atomic_replace, is_truthy_value
|
|
from hermes_cli.config import cfg_get
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Import security scanner — external hub installs always get scanned;
|
|
# agent-created skills only get scanned when skills.guard_agent_created is on.
|
|
try:
|
|
from tools.skills_guard import scan_skill, should_allow_install, format_scan_report
|
|
_GUARD_AVAILABLE = True
|
|
except ImportError:
|
|
_GUARD_AVAILABLE = False
|
|
|
|
|
|
def _guard_agent_created_enabled() -> bool:
|
|
"""Read skills.guard_agent_created from config (default False).
|
|
|
|
Off by default because the agent can already execute the same code
|
|
paths via terminal() with no gate, so the scan adds friction without
|
|
meaningful security. Users who want belt-and-suspenders can turn it
|
|
on via `hermes config set skills.guard_agent_created true`.
|
|
"""
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
cfg = load_config()
|
|
return is_truthy_value(
|
|
cfg_get(cfg, "skills", "guard_agent_created"),
|
|
default=False,
|
|
)
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _security_scan_skill(skill_dir: Path) -> Optional[str]:
|
|
"""Scan a skill directory after write. Returns error string if blocked, else None.
|
|
|
|
No-op when skills.guard_agent_created is disabled (the default).
|
|
"""
|
|
if not _GUARD_AVAILABLE:
|
|
return None
|
|
if not _guard_agent_created_enabled():
|
|
return None
|
|
try:
|
|
result = scan_skill(skill_dir, source="agent-created")
|
|
allowed, reason = should_allow_install(result)
|
|
if allowed is False:
|
|
report = format_scan_report(result)
|
|
return f"Security scan blocked this skill ({reason}):\n{report}"
|
|
if allowed is None:
|
|
# "ask" verdict — for agent-created skills this means dangerous
|
|
# findings were detected. Surface as an error so the agent can
|
|
# retry with the flagged content removed.
|
|
report = format_scan_report(result)
|
|
logger.warning("Agent-created skill blocked (dangerous findings): %s", reason)
|
|
return f"Security scan blocked this skill ({reason}):\n{report}"
|
|
except Exception as e:
|
|
logger.warning("Security scan failed for %s: %s", skill_dir, e, exc_info=True)
|
|
return None
|
|
|
|
import yaml
|
|
|
|
|
|
# All skills live in ~/.hermes/skills/ (single source of truth)
|
|
HERMES_HOME = get_hermes_home()
|
|
SKILLS_DIR = HERMES_HOME / "skills"
|
|
|
|
MAX_NAME_LENGTH = 64
|
|
MAX_DESCRIPTION_LENGTH = 1024
|
|
|
|
|
|
def _containing_skills_root(skill_path: Path) -> Path:
|
|
"""Return the skills root directory (local or external_dirs entry) that
|
|
contains ``skill_path``. Falls back to the local ``SKILLS_DIR`` if no
|
|
match is found (defensive — callers should have located the skill via
|
|
``_find_skill`` first).
|
|
"""
|
|
from agent.skill_utils import get_all_skills_dirs
|
|
|
|
try:
|
|
resolved = skill_path.resolve()
|
|
except OSError:
|
|
resolved = skill_path
|
|
|
|
for root in get_all_skills_dirs():
|
|
try:
|
|
resolved.relative_to(root.resolve())
|
|
return root
|
|
except (ValueError, OSError):
|
|
continue
|
|
return SKILLS_DIR
|
|
|
|
|
|
def _is_path_redirect(path: Path) -> bool:
|
|
"""True when ``path`` is a symlink or (on Windows) a directory junction.
|
|
|
|
Either form lets a poisoned skills tree redirect a subsequent
|
|
``shutil.rmtree`` to content outside the skills root. ``is_junction``
|
|
only exists on Python 3.12+ Windows; gate with ``hasattr``.
|
|
"""
|
|
try:
|
|
return path.is_symlink() or (hasattr(path, "is_junction") and path.is_junction())
|
|
except OSError:
|
|
return False
|
|
|
|
|
|
def _validate_delete_target(skill_dir: Path) -> Optional[str]:
|
|
"""Last-line guard before ``shutil.rmtree(skill_dir)`` in ``_delete_skill``.
|
|
|
|
``_find_skill`` already restricts ``skill_dir`` to a real ``SKILL.md``
|
|
parent discovered by walking the skills roots, so the agent cannot inject
|
|
an arbitrary path the way Kilo Code's HTTP endpoint could (their issue
|
|
#11227: a built-in-skill sentinel resolved to the server cwd and a
|
|
recursive delete wiped the user's entire working directory). This is the
|
|
matching defense-in-depth for our agent-facing ``skill_manage`` delete
|
|
path: even if discovery or a poisoned tree hands us a bad directory, never
|
|
recursively delete
|
|
|
|
1. a path that is not strictly *inside* one of the known skills roots,
|
|
2. a skills root itself (would wipe every installed skill), or
|
|
3. a directory reached via a symlink / junction (``rmtree`` would follow
|
|
it into content outside the skills tree).
|
|
|
|
Returns an error string to refuse on, or ``None`` when the delete is safe.
|
|
"""
|
|
from agent.skill_utils import get_all_skills_dirs
|
|
|
|
# (3) Reject symlink/junction redirects on the skill directory itself.
|
|
if _is_path_redirect(skill_dir):
|
|
return (
|
|
f"Refusing to delete '{skill_dir}': the skill directory is a "
|
|
f"symlink/junction. Remove the link target manually if intended."
|
|
)
|
|
|
|
try:
|
|
resolved = skill_dir.resolve()
|
|
except OSError as exc:
|
|
return f"Refusing to delete '{skill_dir}': could not resolve path ({exc})."
|
|
|
|
roots = []
|
|
for root in get_all_skills_dirs():
|
|
try:
|
|
roots.append(root.resolve())
|
|
except OSError:
|
|
continue
|
|
|
|
for root in roots:
|
|
# (2) Never rmtree a skills root itself.
|
|
if resolved == root:
|
|
return (
|
|
f"Refusing to delete '{skill_dir}': resolves to the skills root "
|
|
f"itself, which would remove every installed skill."
|
|
)
|
|
# (1) Must be strictly inside a known root.
|
|
try:
|
|
rel = resolved.relative_to(root)
|
|
except ValueError:
|
|
continue
|
|
if rel.parts: # at least one component below the root
|
|
return None
|
|
|
|
return (
|
|
f"Refusing to delete '{skill_dir}': path does not resolve inside any "
|
|
f"known skills root."
|
|
)
|
|
|
|
|
|
def _pinned_guard(name: str) -> Optional[str]:
|
|
"""Return a refusal message if *name* is pinned, else None.
|
|
|
|
Pin protects a skill from **deletion** — both the curator's auto-archive
|
|
passes and the agent's ``skill_manage(action="delete")`` tool call. The
|
|
agent can still patch/edit pinned skills; pin only guards against
|
|
irrecoverable loss, not against content evolution.
|
|
|
|
Best-effort: if the sidecar is unreadable we let the delete through
|
|
rather than block on a broken telemetry file.
|
|
"""
|
|
try:
|
|
from tools import skill_usage
|
|
rec = skill_usage.get_record(name)
|
|
if rec.get("pinned"):
|
|
return (
|
|
f"Skill '{name}' is pinned and cannot be deleted by "
|
|
f"skill_manage. Ask the user to run "
|
|
f"`hermes curator unpin {name}` if they want to delete it. "
|
|
f"Patches and edits are allowed on pinned skills; only "
|
|
f"deletion is blocked."
|
|
)
|
|
except Exception:
|
|
logger.debug("pinned-guard lookup failed for %s", name, exc_info=True)
|
|
return None
|
|
|
|
|
|
MAX_SKILL_CONTENT_CHARS = 100_000 # ~36k tokens at 2.75 chars/token
|
|
MAX_SKILL_FILE_BYTES = 1_048_576 # 1 MiB per supporting file
|
|
|
|
# Characters allowed in skill names (filesystem-safe, URL-friendly)
|
|
VALID_NAME_RE = re.compile(r'^[a-z0-9][a-z0-9._-]*$')
|
|
|
|
# Subdirectories allowed for write_file/remove_file
|
|
ALLOWED_SUBDIRS = {"references", "templates", "scripts", "assets"}
|
|
|
|
|
|
# =============================================================================
|
|
# Validation helpers
|
|
# =============================================================================
|
|
|
|
def _validate_name(name: str) -> Optional[str]:
|
|
"""Validate a skill name. Returns error message or None if valid."""
|
|
if not name:
|
|
return "Skill name is required."
|
|
if len(name) > MAX_NAME_LENGTH:
|
|
return f"Skill name exceeds {MAX_NAME_LENGTH} characters."
|
|
if not VALID_NAME_RE.match(name):
|
|
return (
|
|
f"Invalid skill name '{name}'. Use lowercase letters, numbers, "
|
|
f"hyphens, dots, and underscores. Must start with a letter or digit."
|
|
)
|
|
return None
|
|
|
|
|
|
def _validate_category(category: Optional[str]) -> Optional[str]:
|
|
"""Validate an optional category name used as a single directory segment."""
|
|
if category is None:
|
|
return None
|
|
if not isinstance(category, str):
|
|
return "Category must be a string."
|
|
|
|
category = category.strip()
|
|
if not category:
|
|
return None
|
|
if "/" in category or "\\" in category:
|
|
return (
|
|
f"Invalid category '{category}'. Use lowercase letters, numbers, "
|
|
"hyphens, dots, and underscores. Categories must be a single directory name."
|
|
)
|
|
if len(category) > MAX_NAME_LENGTH:
|
|
return f"Category exceeds {MAX_NAME_LENGTH} characters."
|
|
if not VALID_NAME_RE.match(category):
|
|
return (
|
|
f"Invalid category '{category}'. Use lowercase letters, numbers, "
|
|
"hyphens, dots, and underscores. Categories must be a single directory name."
|
|
)
|
|
return None
|
|
|
|
|
|
def _validate_frontmatter(content: str) -> Optional[str]:
|
|
"""
|
|
Validate that SKILL.md content has proper frontmatter with required fields.
|
|
Returns error message or None if valid.
|
|
"""
|
|
if not content.strip():
|
|
return "Content cannot be empty."
|
|
|
|
if not content.startswith("---"):
|
|
return "SKILL.md must start with YAML frontmatter (---). See existing skills for format."
|
|
|
|
end_match = re.search(r'\n---\s*\n', content[3:])
|
|
if not end_match:
|
|
return "SKILL.md frontmatter is not closed. Ensure you have a closing '---' line."
|
|
|
|
yaml_content = content[3:end_match.start() + 3]
|
|
|
|
try:
|
|
parsed = yaml.safe_load(yaml_content)
|
|
except yaml.YAMLError as e:
|
|
return f"YAML frontmatter parse error: {e}"
|
|
|
|
if not isinstance(parsed, dict):
|
|
return "Frontmatter must be a YAML mapping (key: value pairs)."
|
|
|
|
if "name" not in parsed:
|
|
return "Frontmatter must include 'name' field."
|
|
if "description" not in parsed:
|
|
return "Frontmatter must include 'description' field."
|
|
if len(str(parsed["description"])) > MAX_DESCRIPTION_LENGTH:
|
|
return f"Description exceeds {MAX_DESCRIPTION_LENGTH} characters."
|
|
|
|
body = content[end_match.end() + 3:].strip()
|
|
if not body:
|
|
return "SKILL.md must have content after the frontmatter (instructions, procedures, etc.)."
|
|
|
|
return None
|
|
|
|
|
|
def _validate_content_size(content: str, label: str = "SKILL.md") -> Optional[str]:
|
|
"""Check that content doesn't exceed the character limit for agent writes.
|
|
|
|
Returns an error message or None if within bounds.
|
|
"""
|
|
if len(content) > MAX_SKILL_CONTENT_CHARS:
|
|
return (
|
|
f"{label} content is {len(content):,} characters "
|
|
f"(limit: {MAX_SKILL_CONTENT_CHARS:,}). "
|
|
f"Consider splitting into a smaller SKILL.md with supporting files "
|
|
f"in references/ or templates/."
|
|
)
|
|
return None
|
|
|
|
|
|
def _resolve_skill_dir(name: str, category: str = None) -> Path:
|
|
"""Build the directory path for a new skill, optionally under a category."""
|
|
if category:
|
|
return SKILLS_DIR / category / name
|
|
return SKILLS_DIR / name
|
|
|
|
|
|
def _find_skill(name: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Find a skill by name across all skill directories.
|
|
|
|
Searches the local skills dir (~/.hermes/skills/) first, then any
|
|
external dirs configured via skills.external_dirs. Returns
|
|
{"path": Path} or None.
|
|
"""
|
|
from agent.skill_utils import get_all_skills_dirs, is_excluded_skill_path
|
|
for skills_dir in get_all_skills_dirs():
|
|
if not skills_dir.exists():
|
|
continue
|
|
for skill_md in skills_dir.rglob("SKILL.md"):
|
|
if is_excluded_skill_path(skill_md):
|
|
continue
|
|
if skill_md.parent.name == name:
|
|
return {"path": skill_md.parent}
|
|
return None
|
|
|
|
|
|
def _find_skill_in_other_profiles(name: str) -> List[Tuple[str, Path]]:
|
|
"""Look for ``name`` under SKILL.md across OTHER Hermes profiles.
|
|
|
|
Returns a list of ``(profile_name, skill_dir)`` pairs. Used to make
|
|
the "Skill X not found" error explain when the user is editing the
|
|
wrong profile. Empty list when no other profile has the skill (or
|
|
when profile discovery fails — fail-quiet, the caller falls back to
|
|
the plain "not found" error).
|
|
"""
|
|
matches: List[Tuple[str, Path]] = []
|
|
try:
|
|
from hermes_constants import get_default_hermes_root
|
|
from agent.skill_utils import is_excluded_skill_path
|
|
except Exception:
|
|
return matches
|
|
|
|
try:
|
|
root = get_default_hermes_root()
|
|
except Exception:
|
|
return matches
|
|
|
|
# Collect (profile_name, skills_dir) for every profile EXCEPT the
|
|
# one whose SKILLS_DIR we already searched in _find_skill().
|
|
active_dir = SKILLS_DIR.resolve() if SKILLS_DIR.exists() else SKILLS_DIR
|
|
candidates: List[Tuple[str, Path]] = []
|
|
|
|
# Default profile (~/.hermes/skills) — only consider when active is non-default.
|
|
default_skills = root / "skills"
|
|
try:
|
|
if default_skills.resolve() != active_dir:
|
|
candidates.append(("default", default_skills))
|
|
except (OSError, RuntimeError):
|
|
pass
|
|
|
|
# All named profiles (~/.hermes/profiles/*/skills)
|
|
profiles_root = root / "profiles"
|
|
if profiles_root.is_dir():
|
|
try:
|
|
for entry in profiles_root.iterdir():
|
|
if not entry.is_dir():
|
|
continue
|
|
pskills = entry / "skills"
|
|
try:
|
|
if pskills.resolve() == active_dir:
|
|
continue
|
|
except (OSError, RuntimeError):
|
|
continue
|
|
candidates.append((entry.name, pskills))
|
|
except OSError:
|
|
pass
|
|
|
|
for profile_name, skills_dir in candidates:
|
|
if not skills_dir.is_dir():
|
|
continue
|
|
try:
|
|
for skill_md in skills_dir.rglob("SKILL.md"):
|
|
if is_excluded_skill_path(skill_md):
|
|
continue
|
|
if skill_md.parent.name == name:
|
|
matches.append((profile_name, skill_md.parent))
|
|
break # one match per profile is enough
|
|
except OSError:
|
|
continue
|
|
return matches
|
|
|
|
|
|
def _skill_not_found_error(name: str, suffix: str = "") -> str:
|
|
"""Build a "skill not found" error that names other profiles holding
|
|
the same skill, so the agent can recognize a profile-scoping mistake.
|
|
|
|
``suffix`` is appended after the cross-profile hint if present
|
|
(e.g. ``" Create it first with action='create'."``).
|
|
"""
|
|
from agent.file_safety import _resolve_active_profile_name
|
|
active = _resolve_active_profile_name()
|
|
base = f"Skill '{name}' not found in active profile '{active}'."
|
|
|
|
others = _find_skill_in_other_profiles(name)
|
|
if others:
|
|
if len(others) == 1:
|
|
other_profile, other_path = others[0]
|
|
base += (
|
|
f" A skill by that name exists in profile "
|
|
f"'{other_profile}' ({other_path}). To edit a skill in "
|
|
f"another profile, switch profiles (`hermes -p "
|
|
f"{other_profile}`) or operate via explicit file tools "
|
|
f"with ``cross_profile=True``."
|
|
)
|
|
else:
|
|
names = ", ".join(f"'{p}'" for p, _ in others)
|
|
base += (
|
|
f" Skills by that name exist in other profiles: {names}. "
|
|
f"Switch profiles (`hermes -p <name>`) to edit there, or "
|
|
f"operate via explicit file tools with ``cross_profile=True``."
|
|
)
|
|
else:
|
|
base += " Use skills_list() to see available skills."
|
|
|
|
if suffix:
|
|
base += suffix
|
|
return base
|
|
|
|
|
|
def _validate_file_path(file_path: str) -> Optional[str]:
|
|
"""
|
|
Validate a file path for write_file/remove_file.
|
|
Must be under an allowed subdirectory and not escape the skill dir.
|
|
"""
|
|
from tools.path_security import has_traversal_component
|
|
|
|
if not file_path:
|
|
return "file_path is required."
|
|
|
|
normalized = Path(file_path)
|
|
|
|
# Prevent path traversal (checked before any allow-listing so the SKILL.md
|
|
# exception below can never be reached by a traversal-laden path).
|
|
if has_traversal_component(file_path):
|
|
return "Path traversal ('..') is not allowed."
|
|
|
|
# SKILL.md is the canonical skill file and lives at the skill root, not
|
|
# under an allowed subdirectory. Accept its two natural spellings —
|
|
# 'SKILL.md' and '<skill-name>/SKILL.md' — so callers can target the main
|
|
# file. The traversal guard above still applies, so this can't escape.
|
|
if normalized.parts and normalized.name == "SKILL.md":
|
|
if len(normalized.parts) == 1 or len(normalized.parts) == 2:
|
|
return None
|
|
|
|
# Must be under an allowed subdirectory
|
|
if not normalized.parts or normalized.parts[0] not in ALLOWED_SUBDIRS:
|
|
allowed = ", ".join(sorted(ALLOWED_SUBDIRS))
|
|
return f"File must be under one of: {allowed}. Got: '{file_path}'"
|
|
|
|
# Must have a filename (not just a directory)
|
|
if len(normalized.parts) < 2:
|
|
return f"Provide a file path, not just a directory. Example: '{normalized.parts[0]}/myfile.md'"
|
|
|
|
return None
|
|
|
|
|
|
def _resolve_skill_target(skill_dir: Path, file_path: str) -> Tuple[Optional[Path], Optional[str]]:
|
|
"""Resolve a supporting-file path and ensure it stays within the skill directory."""
|
|
from tools.path_security import validate_within_dir
|
|
|
|
target = skill_dir / file_path
|
|
error = validate_within_dir(target, skill_dir)
|
|
if error:
|
|
return None, error
|
|
return target, None
|
|
|
|
|
|
def _atomic_write_text(file_path: Path, content: str, encoding: str = "utf-8") -> None:
|
|
"""
|
|
Atomically write text content to a file.
|
|
|
|
Uses a temporary file in the same directory and os.replace() to ensure
|
|
the target file is never left in a partially-written state if the process
|
|
crashes or is interrupted.
|
|
|
|
Args:
|
|
file_path: Target file path
|
|
content: Content to write
|
|
encoding: Text encoding (default: utf-8)
|
|
"""
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
fd, temp_path = tempfile.mkstemp(
|
|
dir=str(file_path.parent),
|
|
prefix=f".{file_path.name}.tmp.",
|
|
suffix="",
|
|
)
|
|
try:
|
|
with os.fdopen(fd, "w", encoding=encoding) as f:
|
|
f.write(content)
|
|
atomic_replace(temp_path, file_path)
|
|
except Exception:
|
|
# Clean up temp file on error
|
|
try:
|
|
os.unlink(temp_path)
|
|
except OSError:
|
|
logger.error("Failed to remove temporary file %s during atomic write", temp_path, exc_info=True)
|
|
raise
|
|
|
|
|
|
# =============================================================================
|
|
# Core actions
|
|
# =============================================================================
|
|
|
|
def _create_skill(name: str, content: str, category: str = None) -> Dict[str, Any]:
|
|
"""Create a new user skill with SKILL.md content."""
|
|
# Validate name
|
|
err = _validate_name(name)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
|
|
err = _validate_category(category)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
|
|
# Validate content
|
|
err = _validate_frontmatter(content)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
|
|
err = _validate_content_size(content)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
|
|
# Check for name collisions across all directories
|
|
existing = _find_skill(name)
|
|
if existing:
|
|
return {
|
|
"success": False,
|
|
"error": f"A skill named '{name}' already exists at {existing['path']}."
|
|
}
|
|
|
|
# Create the skill directory
|
|
skill_dir = _resolve_skill_dir(name, category)
|
|
skill_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Write SKILL.md atomically
|
|
skill_md = skill_dir / "SKILL.md"
|
|
_atomic_write_text(skill_md, content)
|
|
|
|
# Security scan — roll back on block
|
|
scan_error = _security_scan_skill(skill_dir)
|
|
if scan_error:
|
|
shutil.rmtree(skill_dir, ignore_errors=True)
|
|
return {"success": False, "error": scan_error}
|
|
|
|
# Extract description from frontmatter for verbose notifications
|
|
_desc = ""
|
|
try:
|
|
_fm_end = re.search(r'\n---\s*\n', content[3:])
|
|
if _fm_end:
|
|
_parsed = yaml.safe_load(content[3:_fm_end.start() + 3])
|
|
_desc = str(_parsed.get("description", ""))[:120]
|
|
except Exception:
|
|
pass
|
|
|
|
result = {
|
|
"success": True,
|
|
"message": f"Skill '{name}' created.",
|
|
"path": str(skill_dir.relative_to(SKILLS_DIR)),
|
|
"skill_md": str(skill_md),
|
|
"_change": {"description": _desc},
|
|
}
|
|
if category:
|
|
result["category"] = category
|
|
result["hint"] = (
|
|
"To add reference files, templates, or scripts, use "
|
|
"skill_manage(action='write_file', name='{}', file_path='references/example.md', file_content='...')".format(name)
|
|
)
|
|
return result
|
|
|
|
|
|
def _edit_skill(name: str, content: str) -> Dict[str, Any]:
|
|
"""Replace the SKILL.md of any existing skill (full rewrite)."""
|
|
err = _validate_frontmatter(content)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
|
|
err = _validate_content_size(content)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
|
|
existing = _find_skill(name)
|
|
if not existing:
|
|
return {"success": False, "error": _skill_not_found_error(name)}
|
|
|
|
skill_md = existing["path"] / "SKILL.md"
|
|
# Back up original content for rollback
|
|
original_content = skill_md.read_text(encoding="utf-8") if skill_md.exists() else None
|
|
_atomic_write_text(skill_md, content)
|
|
|
|
# Security scan — roll back on block
|
|
scan_error = _security_scan_skill(existing["path"])
|
|
if scan_error:
|
|
if original_content is not None:
|
|
_atomic_write_text(skill_md, original_content)
|
|
return {"success": False, "error": scan_error}
|
|
|
|
# Extract description from new content for verbose notifications
|
|
_desc = ""
|
|
try:
|
|
_fm_end = re.search(r'\n---\s*\n', content[3:])
|
|
if _fm_end:
|
|
_parsed = yaml.safe_load(content[3:_fm_end.start() + 3])
|
|
_desc = str(_parsed.get("description", ""))[:120]
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Skill '{name}' updated (full rewrite).",
|
|
"path": str(existing["path"]),
|
|
"_change": {"description": _desc},
|
|
}
|
|
|
|
|
|
def _patch_skill(
|
|
name: str,
|
|
old_string: str,
|
|
new_string: str,
|
|
file_path: str = None,
|
|
replace_all: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""Targeted find-and-replace within a skill file.
|
|
|
|
Defaults to SKILL.md. Use file_path to patch a supporting file instead.
|
|
Requires a unique match unless replace_all is True.
|
|
"""
|
|
if not old_string:
|
|
return {"success": False, "error": "old_string is required for 'patch'."}
|
|
if new_string is None:
|
|
return {"success": False, "error": "new_string is required for 'patch'. Use an empty string to delete matched text."}
|
|
|
|
existing = _find_skill(name)
|
|
if not existing:
|
|
return {"success": False, "error": _skill_not_found_error(name)}
|
|
|
|
skill_dir = existing["path"]
|
|
|
|
if file_path:
|
|
# Patching a supporting file
|
|
err = _validate_file_path(file_path)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
target, err = _resolve_skill_target(skill_dir, file_path)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
else:
|
|
# Patching SKILL.md
|
|
target = skill_dir / "SKILL.md"
|
|
|
|
if not target.exists():
|
|
return {"success": False, "error": f"File not found: {target.relative_to(skill_dir)}"}
|
|
|
|
content = target.read_text(encoding="utf-8")
|
|
|
|
# Use the same fuzzy matching engine as the file patch tool.
|
|
# This handles whitespace normalization, indentation differences,
|
|
# escape sequences, and block-anchor matching — saving the agent
|
|
# from exact-match failures on minor formatting mismatches.
|
|
from tools.fuzzy_match import fuzzy_find_and_replace
|
|
|
|
new_content, match_count, _strategy, match_error = fuzzy_find_and_replace(
|
|
content, old_string, new_string, replace_all
|
|
)
|
|
if match_error:
|
|
# Show a short preview of the file so the model can self-correct
|
|
preview = content[:500] + ("..." if len(content) > 500 else "")
|
|
err_msg = match_error
|
|
try:
|
|
from tools.fuzzy_match import format_no_match_hint
|
|
err_msg += format_no_match_hint(match_error, match_count, old_string, content)
|
|
except Exception:
|
|
pass
|
|
return {
|
|
"success": False,
|
|
"error": err_msg,
|
|
"file_preview": preview,
|
|
}
|
|
|
|
# Check size limit on the result
|
|
target_label = "SKILL.md" if not file_path else file_path
|
|
err = _validate_content_size(new_content, label=target_label)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
|
|
# If patching SKILL.md, validate frontmatter is still intact
|
|
if not file_path:
|
|
err = _validate_frontmatter(new_content)
|
|
if err:
|
|
return {
|
|
"success": False,
|
|
"error": f"Patch would break SKILL.md structure: {err}",
|
|
}
|
|
|
|
original_content = content # for rollback
|
|
_atomic_write_text(target, new_content)
|
|
|
|
# Security scan — roll back on block
|
|
scan_error = _security_scan_skill(skill_dir)
|
|
if scan_error:
|
|
_atomic_write_text(target, original_content)
|
|
return {"success": False, "error": scan_error}
|
|
|
|
result = {
|
|
"success": True,
|
|
"message": f"Patched {'SKILL.md' if not file_path else file_path} in skill '{name}' ({match_count} replacement{'s' if match_count > 1 else ''}).",
|
|
}
|
|
# Include change previews for verbose notifications
|
|
result["_change"] = {
|
|
"old": old_string[:200] + ("…" if len(old_string) > 200 else ""),
|
|
"new": new_string[:200] + ("…" if len(new_string) > 200 else ""),
|
|
}
|
|
return result
|
|
|
|
|
|
def _delete_skill(name: str, absorbed_into: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Delete a skill.
|
|
|
|
``absorbed_into`` declares intent:
|
|
- ``None`` / missing → caller didn't declare (legacy / non-curator path);
|
|
accepted for backward compat but logs a warning because the curator
|
|
classification pipeline can't tell consolidation from pruning without it.
|
|
- ``""`` (empty) → explicit "truly pruned, no forwarding target".
|
|
- ``"<skill-name>"`` → content was absorbed into that umbrella; the
|
|
target must exist on disk. Validated here so the model can't claim an
|
|
umbrella that doesn't exist.
|
|
"""
|
|
existing = _find_skill(name)
|
|
if not existing:
|
|
return {"success": False, "error": _skill_not_found_error(name)}
|
|
|
|
pinned_err = _pinned_guard(name)
|
|
if pinned_err:
|
|
return {"success": False, "error": pinned_err}
|
|
|
|
# Validate absorbed_into target when declared non-empty
|
|
if absorbed_into is not None and isinstance(absorbed_into, str) and absorbed_into.strip():
|
|
target_name = absorbed_into.strip()
|
|
if target_name == name:
|
|
return {
|
|
"success": False,
|
|
"error": f"absorbed_into='{target_name}' cannot equal the skill being deleted.",
|
|
}
|
|
target = _find_skill(target_name)
|
|
if not target:
|
|
return {
|
|
"success": False,
|
|
"error": (
|
|
f"absorbed_into='{target_name}' does not exist. "
|
|
f"Create or patch the umbrella skill first, then retry the delete."
|
|
),
|
|
}
|
|
|
|
skill_dir = existing["path"]
|
|
skills_root = _containing_skills_root(skill_dir)
|
|
|
|
# Defense-in-depth before the recursive delete (port of Kilo Code #11240).
|
|
unsafe = _validate_delete_target(skill_dir)
|
|
if unsafe:
|
|
return {"success": False, "error": unsafe}
|
|
|
|
shutil.rmtree(skill_dir)
|
|
|
|
# Clean up empty category directories (don't remove the skills root itself)
|
|
parent = skill_dir.parent
|
|
if parent != skills_root and parent.exists() and not any(parent.iterdir()):
|
|
parent.rmdir()
|
|
|
|
message = f"Skill '{name}' deleted."
|
|
if absorbed_into is not None and isinstance(absorbed_into, str) and absorbed_into.strip():
|
|
message += f" Content absorbed into '{absorbed_into.strip()}'."
|
|
|
|
return {
|
|
"success": True,
|
|
"message": message,
|
|
}
|
|
|
|
|
|
def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
|
|
"""Add or overwrite a supporting file within any skill directory."""
|
|
err = _validate_file_path(file_path)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
|
|
if not file_content and file_content != "":
|
|
return {"success": False, "error": "file_content is required."}
|
|
|
|
# Check size limits
|
|
content_bytes = len(file_content.encode("utf-8"))
|
|
if content_bytes > MAX_SKILL_FILE_BYTES:
|
|
return {
|
|
"success": False,
|
|
"error": (
|
|
f"File content is {content_bytes:,} bytes "
|
|
f"(limit: {MAX_SKILL_FILE_BYTES:,} bytes / 1 MiB). "
|
|
f"Consider splitting into smaller files."
|
|
),
|
|
}
|
|
err = _validate_content_size(file_content, label=file_path)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
|
|
existing = _find_skill(name)
|
|
if not existing:
|
|
return {"success": False, "error": _skill_not_found_error(name, " Create it first with action='create'.")}
|
|
|
|
target, err = _resolve_skill_target(existing["path"], file_path)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
# Back up for rollback
|
|
original_content = target.read_text(encoding="utf-8") if target.exists() else None
|
|
_atomic_write_text(target, file_content)
|
|
|
|
# Security scan — roll back on block
|
|
scan_error = _security_scan_skill(existing["path"])
|
|
if scan_error:
|
|
if original_content is not None:
|
|
_atomic_write_text(target, original_content)
|
|
else:
|
|
target.unlink(missing_ok=True)
|
|
return {"success": False, "error": scan_error}
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"File '{file_path}' written to skill '{name}'.",
|
|
"path": str(target),
|
|
}
|
|
|
|
|
|
def _remove_file(name: str, file_path: str) -> Dict[str, Any]:
|
|
"""Remove a supporting file from any skill directory."""
|
|
err = _validate_file_path(file_path)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
|
|
existing = _find_skill(name)
|
|
if not existing:
|
|
return {"success": False, "error": _skill_not_found_error(name)}
|
|
|
|
skill_dir = existing["path"]
|
|
|
|
target, err = _resolve_skill_target(skill_dir, file_path)
|
|
if err:
|
|
return {"success": False, "error": err}
|
|
if not target.exists():
|
|
# List what's actually there for the model to see
|
|
available = []
|
|
for subdir in ALLOWED_SUBDIRS:
|
|
d = skill_dir / subdir
|
|
if d.exists():
|
|
for f in d.rglob("*"):
|
|
if f.is_file():
|
|
available.append(str(f.relative_to(skill_dir)))
|
|
return {
|
|
"success": False,
|
|
"error": f"File '{file_path}' not found in skill '{name}'.",
|
|
"available_files": available if available else None,
|
|
}
|
|
|
|
target.unlink()
|
|
|
|
# Clean up empty subdirectories
|
|
parent = target.parent
|
|
if parent != skill_dir and parent.exists() and not any(parent.iterdir()):
|
|
parent.rmdir()
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"File '{file_path}' removed from skill '{name}'.",
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# Main entry point
|
|
# =============================================================================
|
|
|
|
# ContextVar bypass: set while replaying an already-approved staged skill write
|
|
# so skill_manage() does not re-gate (and re-stage) it.
|
|
import contextvars as _ctxvars
|
|
_skill_gate_bypass: "_ctxvars.ContextVar[bool]" = _ctxvars.ContextVar(
|
|
"skill_gate_bypass", default=False
|
|
)
|
|
|
|
|
|
def _apply_skill_write_gate(action, name, **payload_kwargs):
|
|
"""Evaluate the skill write gate. Returns a JSON tool-result string when the
|
|
write should NOT proceed (blocked or staged), or None to perform the real
|
|
write. Bypassed during approved-pending replay.
|
|
"""
|
|
if action not in {"create", "edit", "patch", "delete", "write_file", "remove_file"}:
|
|
return None
|
|
if _skill_gate_bypass.get():
|
|
return None
|
|
|
|
try:
|
|
from tools import write_approval as wa
|
|
except Exception:
|
|
return None # fail open
|
|
|
|
decision = wa.evaluate_gate(wa.SKILLS)
|
|
if decision.allow:
|
|
return None
|
|
if decision.blocked:
|
|
return tool_error(decision.message, success=False)
|
|
|
|
# stage — record the full skill_manage kwargs so approval can replay it.
|
|
payload = {"action": action, "name": name}
|
|
payload.update({k: v for k, v in payload_kwargs.items() if v is not None})
|
|
gist = wa.skill_gist(
|
|
action, name,
|
|
content=payload_kwargs.get("content") or "",
|
|
file_path=payload_kwargs.get("file_path") or "",
|
|
old_string=payload_kwargs.get("old_string") or "",
|
|
new_string=payload_kwargs.get("new_string") or "",
|
|
)
|
|
record = wa.stage_write(wa.SKILLS, payload, summary=gist, origin=wa.current_origin())
|
|
return json.dumps(
|
|
{"success": True, "staged": True, "pending_id": record["id"],
|
|
"gist": gist, "message": decision.message},
|
|
ensure_ascii=False,
|
|
)
|
|
|
|
|
|
def apply_skill_pending(payload: Dict[str, Any]) -> str:
|
|
"""Replay a staged skill write, bypassing the gate. Returns the tool result
|
|
JSON string. Called by the /skills approve handler.
|
|
"""
|
|
token = _skill_gate_bypass.set(True)
|
|
try:
|
|
return skill_manage(
|
|
action=payload.get("action", ""),
|
|
name=payload.get("name", ""),
|
|
content=payload.get("content"),
|
|
category=payload.get("category"),
|
|
file_path=payload.get("file_path"),
|
|
file_content=payload.get("file_content"),
|
|
old_string=payload.get("old_string"),
|
|
new_string=payload.get("new_string"),
|
|
replace_all=payload.get("replace_all", False),
|
|
absorbed_into=payload.get("absorbed_into"),
|
|
)
|
|
finally:
|
|
_skill_gate_bypass.reset(token)
|
|
|
|
|
|
def skill_manage(
|
|
action: str,
|
|
name: str,
|
|
content: str = None,
|
|
category: str = None,
|
|
file_path: str = None,
|
|
file_content: str = None,
|
|
old_string: str = None,
|
|
new_string: str = None,
|
|
replace_all: bool = False,
|
|
absorbed_into: str = None,
|
|
) -> str:
|
|
"""
|
|
Manage user-created skills. Dispatches to the appropriate action handler.
|
|
|
|
Returns JSON string with results.
|
|
"""
|
|
# Approval gate: when on, stages the write for review (skills are too large
|
|
# to review inline, so they always stage regardless of origin); when off
|
|
# (default) passes straight through. The gate is bypassed when this call is
|
|
# itself replaying an already-approved staged write (_skill_apply_pending).
|
|
gate_result = _apply_skill_write_gate(
|
|
action, name, content=content, category=category,
|
|
file_path=file_path, file_content=file_content,
|
|
old_string=old_string, new_string=new_string,
|
|
replace_all=replace_all, absorbed_into=absorbed_into,
|
|
)
|
|
if gate_result is not None:
|
|
return gate_result
|
|
|
|
if action == "create":
|
|
if not content:
|
|
return tool_error("content is required for 'create'. Provide the full SKILL.md text (frontmatter + body).", success=False)
|
|
result = _create_skill(name, content, category)
|
|
|
|
elif action == "edit":
|
|
if not content:
|
|
return tool_error("content is required for 'edit'. Provide the full updated SKILL.md text.", success=False)
|
|
result = _edit_skill(name, content)
|
|
|
|
elif action == "patch":
|
|
if not old_string:
|
|
return tool_error("old_string is required for 'patch'. Provide the text to find.", success=False)
|
|
if new_string is None:
|
|
return tool_error("new_string is required for 'patch'. Use empty string to delete matched text.", success=False)
|
|
result = _patch_skill(name, old_string, new_string, file_path, replace_all)
|
|
|
|
elif action == "delete":
|
|
result = _delete_skill(name, absorbed_into=absorbed_into)
|
|
|
|
elif action == "write_file":
|
|
if not file_path:
|
|
return tool_error("file_path is required for 'write_file'. Example: 'references/api-guide.md'", success=False)
|
|
if file_content is None:
|
|
return tool_error("file_content is required for 'write_file'.", success=False)
|
|
result = _write_file(name, file_path, file_content)
|
|
|
|
elif action == "remove_file":
|
|
if not file_path:
|
|
return tool_error("file_path is required for 'remove_file'.", success=False)
|
|
result = _remove_file(name, file_path)
|
|
|
|
else:
|
|
result = {"success": False, "error": f"Unknown action '{action}'. Use: create, edit, patch, delete, write_file, remove_file"}
|
|
|
|
if result.get("success"):
|
|
try:
|
|
from agent.prompt_builder import clear_skills_system_prompt_cache
|
|
clear_skills_system_prompt_cache(clear_snapshot=True)
|
|
except Exception:
|
|
pass
|
|
# Curator telemetry: bump patch_count on edit/patch/write_file (the actions
|
|
# that mutate an existing skill's guidance), drop the record on delete.
|
|
# Only mark a skill as agent-created when the background self-improvement
|
|
# review fork creates it — foreground `skill_manage(create)` calls are
|
|
# user-directed, and those skills belong to the user (the curator must
|
|
# not touch them). Best-effort; telemetry failures never break the tool.
|
|
try:
|
|
from tools.skill_usage import bump_patch, forget, mark_agent_created
|
|
from tools.skill_provenance import is_background_review
|
|
if action == "create":
|
|
if is_background_review():
|
|
mark_agent_created(name)
|
|
elif action in {"patch", "edit", "write_file", "remove_file"}:
|
|
bump_patch(name)
|
|
elif action == "delete":
|
|
forget(name)
|
|
except Exception:
|
|
pass
|
|
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
|
|
# =============================================================================
|
|
# OpenAI Function-Calling Schema
|
|
# =============================================================================
|
|
|
|
SKILL_MANAGE_SCHEMA = {
|
|
"name": "skill_manage",
|
|
"description": (
|
|
"Manage skills (create, update, delete). Skills are your procedural "
|
|
"memory — reusable approaches for recurring task types. "
|
|
f"New skills go to {display_hermes_home()}/skills/; existing skills can be modified wherever they live.\n\n"
|
|
"Actions: create (full SKILL.md + optional category), "
|
|
"patch (old_string/new_string — preferred for fixes), "
|
|
"edit (full SKILL.md rewrite — major overhauls only), "
|
|
"delete, write_file, remove_file.\n\n"
|
|
"On delete, pass `absorbed_into=<umbrella>` when you're merging this "
|
|
"skill's content into another one, or `absorbed_into=\"\"` when you're "
|
|
"pruning it with no forwarding target. This lets the curator tell "
|
|
"consolidation from pruning without guessing, so downstream consumers "
|
|
"(cron jobs that reference the old skill name, etc.) get updated "
|
|
"correctly. The target you name in `absorbed_into` must already "
|
|
"exist — create/patch the umbrella first, then delete.\n\n"
|
|
"Create when: complex task succeeded (5+ calls), errors overcome, "
|
|
"user-corrected approach worked, non-trivial workflow discovered, "
|
|
"or user asks you to remember a procedure.\n"
|
|
"Update when: instructions stale/wrong, OS-specific failures, "
|
|
"missing steps or pitfalls found during use. "
|
|
"If you used a skill and hit issues not covered by it, patch it immediately.\n\n"
|
|
"After difficult/iterative tasks, offer to save as a skill. "
|
|
"Skip for simple one-offs. Confirm with user before creating/deleting.\n\n"
|
|
"Good skills: trigger conditions, numbered steps with exact commands, "
|
|
"pitfalls section, verification steps. Use skill_view() to see format examples.\n\n"
|
|
"Pinned skills are protected from deletion only — skill_manage(action='delete') "
|
|
"will refuse with a message pointing the user to `hermes curator unpin <name>`. "
|
|
"Patches and edits go through on pinned skills so you can still improve them as "
|
|
"pitfalls come up; pin only guards against irrecoverable loss."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"action": {
|
|
"type": "string",
|
|
"enum": ["create", "patch", "edit", "delete", "write_file", "remove_file"],
|
|
"description": "The action to perform."
|
|
},
|
|
"name": {
|
|
"type": "string",
|
|
"description": (
|
|
"Skill name (lowercase, hyphens/underscores, max 64 chars). "
|
|
"Must match an existing skill for patch/edit/delete/write_file/remove_file."
|
|
)
|
|
},
|
|
"content": {
|
|
"type": "string",
|
|
"description": (
|
|
"Full SKILL.md content (YAML frontmatter + markdown body). "
|
|
"Required for 'create' and 'edit'. For 'edit', read the skill "
|
|
"first with skill_view() and provide the complete updated text."
|
|
)
|
|
},
|
|
"old_string": {
|
|
"type": "string",
|
|
"description": (
|
|
"Text to find in the file (required for 'patch'). Must be unique "
|
|
"unless replace_all=true. Include enough surrounding context to "
|
|
"ensure uniqueness."
|
|
)
|
|
},
|
|
"new_string": {
|
|
"type": "string",
|
|
"description": (
|
|
"Replacement text (required for 'patch'). Can be empty string "
|
|
"to delete the matched text."
|
|
)
|
|
},
|
|
"replace_all": {
|
|
"type": "boolean",
|
|
"description": "For 'patch': replace all occurrences instead of requiring a unique match (default: false)."
|
|
},
|
|
"category": {
|
|
"type": "string",
|
|
"description": (
|
|
"Optional category/domain for organizing the skill (e.g., 'devops', "
|
|
"'data-science', 'mlops'). Creates a subdirectory grouping. "
|
|
"Only used with 'create'."
|
|
)
|
|
},
|
|
"file_path": {
|
|
"type": "string",
|
|
"description": (
|
|
"Path to a supporting file within the skill directory. "
|
|
"For 'write_file'/'remove_file': required, must be under references/, "
|
|
"templates/, scripts/, or assets/. "
|
|
"For 'patch': optional, defaults to SKILL.md if omitted."
|
|
)
|
|
},
|
|
"file_content": {
|
|
"type": "string",
|
|
"description": "Content for the file. Required for 'write_file'."
|
|
},
|
|
"absorbed_into": {
|
|
"type": "string",
|
|
"description": (
|
|
"For 'delete' only — declares intent so the curator can "
|
|
"tell consolidation from pruning without guessing. "
|
|
"Pass the umbrella skill name when this skill's content "
|
|
"was merged into another (the target must already exist). "
|
|
"Pass an empty string when the skill is truly stale and "
|
|
"being pruned with no forwarding target. Omitting the arg "
|
|
"on delete is supported for backward compatibility but "
|
|
"downstream tooling (e.g. cron-job skill reference "
|
|
"rewriting) will have to guess at intent."
|
|
)
|
|
},
|
|
},
|
|
"required": ["action", "name"],
|
|
},
|
|
}
|
|
|
|
|
|
# --- Registry ---
|
|
from tools.registry import registry, tool_error
|
|
|
|
registry.register(
|
|
name="skill_manage",
|
|
toolset="skills",
|
|
schema=SKILL_MANAGE_SCHEMA,
|
|
handler=lambda args, **kw: skill_manage(
|
|
action=args.get("action", ""),
|
|
name=args.get("name", ""),
|
|
content=args.get("content"),
|
|
category=args.get("category"),
|
|
file_path=args.get("file_path"),
|
|
file_content=args.get("file_content"),
|
|
old_string=args.get("old_string"),
|
|
new_string=args.get("new_string"),
|
|
replace_all=args.get("replace_all", False),
|
|
absorbed_into=args.get("absorbed_into")),
|
|
emoji="📝",
|
|
)
|