mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Expands the plugin interface so slash command handlers can dispatch tool calls through the registry with parent agent context wired up automatically. This is the public API for plugins that need to orchestrate tools like delegate_task — they call ctx.dispatch_tool() instead of reaching into framework internals. The parent agent is resolved lazily from _cli_ref when available (CLI mode) and omitted in gateway mode (tools degrade gracefully). Enables the hermes-deliver-plugin pattern where /deliver and /fanout slash commands spawn subagents via delegate_task without touching the agent conversation loop. 7 new tests covering: registry delegation, parent_agent injection from cli_ref, gateway mode (no cli_ref), uninitialized agent, explicit parent_agent override, kwargs forwarding, return value passthrough.
843 lines
31 KiB
Python
843 lines
31 KiB
Python
"""
|
||
Hermes Plugin System
|
||
====================
|
||
|
||
Discovers, loads, and manages plugins from three sources:
|
||
|
||
1. **User plugins** – ``~/.hermes/plugins/<name>/``
|
||
2. **Project plugins** – ``./.hermes/plugins/<name>/`` (opt-in via
|
||
``HERMES_ENABLE_PROJECT_PLUGINS``)
|
||
3. **Pip plugins** – packages that expose the ``hermes_agent.plugins``
|
||
entry-point group.
|
||
|
||
Each directory plugin must contain a ``plugin.yaml`` manifest **and** an
|
||
``__init__.py`` with a ``register(ctx)`` function.
|
||
|
||
Lifecycle hooks
|
||
---------------
|
||
Plugins may register callbacks for any of the hooks in ``VALID_HOOKS``.
|
||
The agent core calls ``invoke_hook(name, **kwargs)`` at the appropriate
|
||
points.
|
||
|
||
Tool registration
|
||
-----------------
|
||
``PluginContext.register_tool()`` delegates to ``tools.registry.register()``
|
||
so plugin-defined tools appear alongside the built-in tools.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import importlib
|
||
import importlib.metadata
|
||
import importlib.util
|
||
import logging
|
||
import sys
|
||
import types
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
from typing import Any, Callable, Dict, List, Optional, Set, Union
|
||
|
||
from hermes_constants import get_hermes_home
|
||
from utils import env_var_enabled
|
||
|
||
try:
|
||
import yaml
|
||
except ImportError: # pragma: no cover – yaml is optional at import time
|
||
yaml = None # type: ignore[assignment]
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Constants
|
||
# ---------------------------------------------------------------------------
|
||
|
||
VALID_HOOKS: Set[str] = {
|
||
"pre_tool_call",
|
||
"post_tool_call",
|
||
"pre_llm_call",
|
||
"post_llm_call",
|
||
"pre_api_request",
|
||
"post_api_request",
|
||
"on_session_start",
|
||
"on_session_end",
|
||
"on_session_finalize",
|
||
"on_session_reset",
|
||
}
|
||
|
||
ENTRY_POINTS_GROUP = "hermes_agent.plugins"
|
||
|
||
_NS_PARENT = "hermes_plugins"
|
||
|
||
|
||
def _env_enabled(name: str) -> bool:
|
||
"""Return True when an env var is set to a truthy opt-in value."""
|
||
return env_var_enabled(name)
|
||
|
||
|
||
def _get_disabled_plugins() -> set:
|
||
"""Read the disabled plugins list from config.yaml."""
|
||
try:
|
||
from hermes_cli.config import load_config
|
||
config = load_config()
|
||
disabled = config.get("plugins", {}).get("disabled", [])
|
||
return set(disabled) if isinstance(disabled, list) else set()
|
||
except Exception:
|
||
return set()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Data classes
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass
|
||
class PluginManifest:
|
||
"""Parsed representation of a plugin.yaml manifest."""
|
||
|
||
name: str
|
||
version: str = ""
|
||
description: str = ""
|
||
author: str = ""
|
||
requires_env: List[Union[str, Dict[str, Any]]] = field(default_factory=list)
|
||
provides_tools: List[str] = field(default_factory=list)
|
||
provides_hooks: List[str] = field(default_factory=list)
|
||
source: str = "" # "user", "project", or "entrypoint"
|
||
path: Optional[str] = None
|
||
|
||
|
||
@dataclass
|
||
class LoadedPlugin:
|
||
"""Runtime state for a single loaded plugin."""
|
||
|
||
manifest: PluginManifest
|
||
module: Optional[types.ModuleType] = None
|
||
tools_registered: List[str] = field(default_factory=list)
|
||
hooks_registered: List[str] = field(default_factory=list)
|
||
commands_registered: List[str] = field(default_factory=list)
|
||
enabled: bool = False
|
||
error: Optional[str] = None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# PluginContext – handed to each plugin's ``register()`` function
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class PluginContext:
|
||
"""Facade given to plugins so they can register tools and hooks."""
|
||
|
||
def __init__(self, manifest: PluginManifest, manager: "PluginManager"):
|
||
self.manifest = manifest
|
||
self._manager = manager
|
||
|
||
# -- tool registration --------------------------------------------------
|
||
|
||
def register_tool(
|
||
self,
|
||
name: str,
|
||
toolset: str,
|
||
schema: dict,
|
||
handler: Callable,
|
||
check_fn: Callable | None = None,
|
||
requires_env: list | None = None,
|
||
is_async: bool = False,
|
||
description: str = "",
|
||
emoji: str = "",
|
||
) -> None:
|
||
"""Register a tool in the global registry **and** track it as plugin-provided."""
|
||
from tools.registry import registry
|
||
|
||
registry.register(
|
||
name=name,
|
||
toolset=toolset,
|
||
schema=schema,
|
||
handler=handler,
|
||
check_fn=check_fn,
|
||
requires_env=requires_env,
|
||
is_async=is_async,
|
||
description=description,
|
||
emoji=emoji,
|
||
)
|
||
self._manager._plugin_tool_names.add(name)
|
||
logger.debug("Plugin %s registered tool: %s", self.manifest.name, name)
|
||
|
||
# -- message injection --------------------------------------------------
|
||
|
||
def inject_message(self, content: str, role: str = "user") -> bool:
|
||
"""Inject a message into the active conversation.
|
||
|
||
If the agent is idle (waiting for user input), this starts a new turn.
|
||
If the agent is running, this interrupts and injects the message.
|
||
|
||
This enables plugins (e.g. remote control viewers, messaging bridges)
|
||
to send messages into the conversation from external sources.
|
||
|
||
Returns True if the message was queued successfully.
|
||
"""
|
||
cli = self._manager._cli_ref
|
||
if cli is None:
|
||
logger.warning("inject_message: no CLI reference (not available in gateway mode)")
|
||
return False
|
||
|
||
msg = content if role == "user" else f"[{role}] {content}"
|
||
|
||
if getattr(cli, "_agent_running", False):
|
||
# Agent is mid-turn — interrupt with the message
|
||
cli._interrupt_queue.put(msg)
|
||
else:
|
||
# Agent is idle — queue as next input
|
||
cli._pending_input.put(msg)
|
||
return True
|
||
|
||
# -- CLI command registration --------------------------------------------
|
||
|
||
def register_cli_command(
|
||
self,
|
||
name: str,
|
||
help: str,
|
||
setup_fn: Callable,
|
||
handler_fn: Callable | None = None,
|
||
description: str = "",
|
||
) -> None:
|
||
"""Register a CLI subcommand (e.g. ``hermes honcho ...``).
|
||
|
||
The *setup_fn* receives an argparse subparser and should add any
|
||
arguments/sub-subparsers. If *handler_fn* is provided it is set
|
||
as the default dispatch function via ``set_defaults(func=...)``."""
|
||
self._manager._cli_commands[name] = {
|
||
"name": name,
|
||
"help": help,
|
||
"description": description,
|
||
"setup_fn": setup_fn,
|
||
"handler_fn": handler_fn,
|
||
"plugin": self.manifest.name,
|
||
}
|
||
logger.debug("Plugin %s registered CLI command: %s", self.manifest.name, name)
|
||
|
||
# -- slash command registration -------------------------------------------
|
||
|
||
def register_command(
|
||
self,
|
||
name: str,
|
||
handler: Callable,
|
||
description: str = "",
|
||
) -> None:
|
||
"""Register a slash command (e.g. ``/lcm``) available in CLI and gateway sessions.
|
||
|
||
The handler signature is ``fn(raw_args: str) -> str | None``.
|
||
It may also be an async callable — the gateway dispatch handles both.
|
||
|
||
Unlike ``register_cli_command()`` (which creates ``hermes <subcommand>``
|
||
terminal commands), this registers in-session slash commands that users
|
||
invoke during a conversation.
|
||
|
||
Names conflicting with built-in commands are rejected with a warning.
|
||
"""
|
||
clean = name.lower().strip().lstrip("/").replace(" ", "-")
|
||
if not clean:
|
||
logger.warning(
|
||
"Plugin '%s' tried to register a command with an empty name.",
|
||
self.manifest.name,
|
||
)
|
||
return
|
||
|
||
# Reject if it conflicts with a built-in command
|
||
try:
|
||
from hermes_cli.commands import resolve_command
|
||
if resolve_command(clean) is not None:
|
||
logger.warning(
|
||
"Plugin '%s' tried to register command '/%s' which conflicts "
|
||
"with a built-in command. Skipping.",
|
||
self.manifest.name, clean,
|
||
)
|
||
return
|
||
except Exception:
|
||
pass # If commands module isn't available, skip the check
|
||
|
||
self._manager._plugin_commands[clean] = {
|
||
"handler": handler,
|
||
"description": description or "Plugin command",
|
||
"plugin": self.manifest.name,
|
||
}
|
||
logger.debug("Plugin %s registered command: /%s", self.manifest.name, clean)
|
||
|
||
# -- tool dispatch -------------------------------------------------------
|
||
|
||
def dispatch_tool(self, tool_name: str, args: dict, **kwargs) -> str:
|
||
"""Dispatch a tool call through the registry, with parent agent context.
|
||
|
||
This is the public interface for plugin slash commands that need to call
|
||
tools like ``delegate_task`` without reaching into framework internals.
|
||
The parent agent (if available) is resolved automatically — plugins never
|
||
need to access the agent directly.
|
||
|
||
Args:
|
||
tool_name: Registry name of the tool (e.g. ``"delegate_task"``).
|
||
args: Tool arguments dict (same as what the model would pass).
|
||
**kwargs: Extra keyword args forwarded to the registry dispatch.
|
||
|
||
Returns:
|
||
JSON string from the tool handler (same format as model tool calls).
|
||
"""
|
||
from tools.registry import registry
|
||
|
||
# Wire up parent agent context when available (CLI mode).
|
||
# In gateway mode _cli_ref is None — tools degrade gracefully
|
||
# (workspace hints fall back to TERMINAL_CWD, no spinner).
|
||
if "parent_agent" not in kwargs:
|
||
cli = self._manager._cli_ref
|
||
agent = getattr(cli, "agent", None) if cli else None
|
||
if agent is not None:
|
||
kwargs["parent_agent"] = agent
|
||
|
||
return registry.dispatch(tool_name, args, **kwargs)
|
||
|
||
# -- context engine registration -----------------------------------------
|
||
|
||
def register_context_engine(self, engine) -> None:
|
||
"""Register a context engine to replace the built-in ContextCompressor.
|
||
|
||
Only one context engine plugin is allowed. If a second plugin tries
|
||
to register one, it is rejected with a warning.
|
||
|
||
The engine must be an instance of ``agent.context_engine.ContextEngine``.
|
||
"""
|
||
if self._manager._context_engine is not None:
|
||
logger.warning(
|
||
"Plugin '%s' tried to register a context engine, but one is "
|
||
"already registered. Only one context engine plugin is allowed.",
|
||
self.manifest.name,
|
||
)
|
||
return
|
||
# Defer the import to avoid circular deps at module level
|
||
from agent.context_engine import ContextEngine
|
||
if not isinstance(engine, ContextEngine):
|
||
logger.warning(
|
||
"Plugin '%s' tried to register a context engine that does not "
|
||
"inherit from ContextEngine. Ignoring.",
|
||
self.manifest.name,
|
||
)
|
||
return
|
||
self._manager._context_engine = engine
|
||
logger.info(
|
||
"Plugin '%s' registered context engine: %s",
|
||
self.manifest.name, engine.name,
|
||
)
|
||
|
||
# -- hook registration --------------------------------------------------
|
||
|
||
def register_hook(self, hook_name: str, callback: Callable) -> None:
|
||
"""Register a lifecycle hook callback.
|
||
|
||
Unknown hook names produce a warning but are still stored so
|
||
forward-compatible plugins don't break.
|
||
"""
|
||
if hook_name not in VALID_HOOKS:
|
||
logger.warning(
|
||
"Plugin '%s' registered unknown hook '%s' "
|
||
"(valid: %s)",
|
||
self.manifest.name,
|
||
hook_name,
|
||
", ".join(sorted(VALID_HOOKS)),
|
||
)
|
||
self._manager._hooks.setdefault(hook_name, []).append(callback)
|
||
logger.debug("Plugin %s registered hook: %s", self.manifest.name, hook_name)
|
||
|
||
# -- skill registration -------------------------------------------------
|
||
|
||
def register_skill(
|
||
self,
|
||
name: str,
|
||
path: Path,
|
||
description: str = "",
|
||
) -> None:
|
||
"""Register a read-only skill provided by this plugin.
|
||
|
||
The skill becomes resolvable as ``'<plugin_name>:<name>'`` via
|
||
``skill_view()``. It does **not** enter the flat
|
||
``~/.hermes/skills/`` tree and is **not** listed in the system
|
||
prompt's ``<available_skills>`` index — plugin skills are
|
||
opt-in explicit loads only.
|
||
|
||
Raises:
|
||
ValueError: if *name* contains ``':'`` or invalid characters.
|
||
FileNotFoundError: if *path* does not exist.
|
||
"""
|
||
from agent.skill_utils import _NAMESPACE_RE
|
||
|
||
if ":" in name:
|
||
raise ValueError(
|
||
f"Skill name '{name}' must not contain ':' "
|
||
f"(the namespace is derived from the plugin name "
|
||
f"'{self.manifest.name}' automatically)."
|
||
)
|
||
if not name or not _NAMESPACE_RE.match(name):
|
||
raise ValueError(
|
||
f"Invalid skill name '{name}'. Must match [a-zA-Z0-9_-]+."
|
||
)
|
||
if not path.exists():
|
||
raise FileNotFoundError(f"SKILL.md not found at {path}")
|
||
|
||
qualified = f"{self.manifest.name}:{name}"
|
||
self._manager._plugin_skills[qualified] = {
|
||
"path": path,
|
||
"plugin": self.manifest.name,
|
||
"bare_name": name,
|
||
"description": description,
|
||
}
|
||
logger.debug(
|
||
"Plugin %s registered skill: %s",
|
||
self.manifest.name, qualified,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# PluginManager
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class PluginManager:
|
||
"""Central manager that discovers, loads, and invokes plugins."""
|
||
|
||
def __init__(self) -> None:
|
||
self._plugins: Dict[str, LoadedPlugin] = {}
|
||
self._hooks: Dict[str, List[Callable]] = {}
|
||
self._plugin_tool_names: Set[str] = set()
|
||
self._cli_commands: Dict[str, dict] = {}
|
||
self._context_engine = None # Set by a plugin via register_context_engine()
|
||
self._plugin_commands: Dict[str, dict] = {} # Slash commands registered by plugins
|
||
self._discovered: bool = False
|
||
self._cli_ref = None # Set by CLI after plugin discovery
|
||
# Plugin skill registry: qualified name → metadata dict.
|
||
self._plugin_skills: Dict[str, Dict[str, Any]] = {}
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Public
|
||
# -----------------------------------------------------------------------
|
||
|
||
def discover_and_load(self) -> None:
|
||
"""Scan all plugin sources and load each plugin found."""
|
||
if self._discovered:
|
||
return
|
||
self._discovered = True
|
||
|
||
manifests: List[PluginManifest] = []
|
||
|
||
# 1. User plugins (~/.hermes/plugins/)
|
||
user_dir = get_hermes_home() / "plugins"
|
||
manifests.extend(self._scan_directory(user_dir, source="user"))
|
||
|
||
# 2. Project plugins (./.hermes/plugins/)
|
||
if _env_enabled("HERMES_ENABLE_PROJECT_PLUGINS"):
|
||
project_dir = Path.cwd() / ".hermes" / "plugins"
|
||
manifests.extend(self._scan_directory(project_dir, source="project"))
|
||
|
||
# 3. Pip / entry-point plugins
|
||
manifests.extend(self._scan_entry_points())
|
||
|
||
# Load each manifest (skip user-disabled plugins)
|
||
disabled = _get_disabled_plugins()
|
||
for manifest in manifests:
|
||
if manifest.name in disabled:
|
||
loaded = LoadedPlugin(manifest=manifest, enabled=False)
|
||
loaded.error = "disabled via config"
|
||
self._plugins[manifest.name] = loaded
|
||
logger.debug("Skipping disabled plugin '%s'", manifest.name)
|
||
continue
|
||
self._load_plugin(manifest)
|
||
|
||
if manifests:
|
||
logger.info(
|
||
"Plugin discovery complete: %d found, %d enabled",
|
||
len(self._plugins),
|
||
sum(1 for p in self._plugins.values() if p.enabled),
|
||
)
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Directory scanning
|
||
# -----------------------------------------------------------------------
|
||
|
||
def _scan_directory(self, path: Path, source: str) -> List[PluginManifest]:
|
||
"""Read ``plugin.yaml`` manifests from subdirectories of *path*."""
|
||
manifests: List[PluginManifest] = []
|
||
if not path.is_dir():
|
||
return manifests
|
||
|
||
for child in sorted(path.iterdir()):
|
||
if not child.is_dir():
|
||
continue
|
||
manifest_file = child / "plugin.yaml"
|
||
if not manifest_file.exists():
|
||
manifest_file = child / "plugin.yml"
|
||
if not manifest_file.exists():
|
||
logger.debug("Skipping %s (no plugin.yaml)", child)
|
||
continue
|
||
|
||
try:
|
||
if yaml is None:
|
||
logger.warning("PyYAML not installed – cannot load %s", manifest_file)
|
||
continue
|
||
data = yaml.safe_load(manifest_file.read_text()) or {}
|
||
manifest = PluginManifest(
|
||
name=data.get("name", child.name),
|
||
version=str(data.get("version", "")),
|
||
description=data.get("description", ""),
|
||
author=data.get("author", ""),
|
||
requires_env=data.get("requires_env", []),
|
||
provides_tools=data.get("provides_tools", []),
|
||
provides_hooks=data.get("provides_hooks", []),
|
||
source=source,
|
||
path=str(child),
|
||
)
|
||
manifests.append(manifest)
|
||
except Exception as exc:
|
||
logger.warning("Failed to parse %s: %s", manifest_file, exc)
|
||
|
||
return manifests
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Entry-point scanning
|
||
# -----------------------------------------------------------------------
|
||
|
||
def _scan_entry_points(self) -> List[PluginManifest]:
|
||
"""Check ``importlib.metadata`` for pip-installed plugins."""
|
||
manifests: List[PluginManifest] = []
|
||
try:
|
||
eps = importlib.metadata.entry_points()
|
||
# Python 3.12+ returns a SelectableGroups; earlier returns dict
|
||
if hasattr(eps, "select"):
|
||
group_eps = eps.select(group=ENTRY_POINTS_GROUP)
|
||
elif isinstance(eps, dict):
|
||
group_eps = eps.get(ENTRY_POINTS_GROUP, [])
|
||
else:
|
||
group_eps = [ep for ep in eps if ep.group == ENTRY_POINTS_GROUP]
|
||
|
||
for ep in group_eps:
|
||
manifest = PluginManifest(
|
||
name=ep.name,
|
||
source="entrypoint",
|
||
path=ep.value,
|
||
)
|
||
manifests.append(manifest)
|
||
except Exception as exc:
|
||
logger.debug("Entry-point scan failed: %s", exc)
|
||
|
||
return manifests
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Loading
|
||
# -----------------------------------------------------------------------
|
||
|
||
def _load_plugin(self, manifest: PluginManifest) -> None:
|
||
"""Import a plugin module and call its ``register(ctx)`` function."""
|
||
loaded = LoadedPlugin(manifest=manifest)
|
||
|
||
try:
|
||
if manifest.source in ("user", "project"):
|
||
module = self._load_directory_module(manifest)
|
||
else:
|
||
module = self._load_entrypoint_module(manifest)
|
||
|
||
loaded.module = module
|
||
|
||
# Call register()
|
||
register_fn = getattr(module, "register", None)
|
||
if register_fn is None:
|
||
loaded.error = "no register() function"
|
||
logger.warning("Plugin '%s' has no register() function", manifest.name)
|
||
else:
|
||
ctx = PluginContext(manifest, self)
|
||
register_fn(ctx)
|
||
loaded.tools_registered = [
|
||
t for t in self._plugin_tool_names
|
||
if t not in {
|
||
n
|
||
for name, p in self._plugins.items()
|
||
for n in p.tools_registered
|
||
}
|
||
]
|
||
loaded.hooks_registered = list(
|
||
{
|
||
h
|
||
for h, cbs in self._hooks.items()
|
||
if cbs # non-empty
|
||
}
|
||
- {
|
||
h
|
||
for name, p in self._plugins.items()
|
||
for h in p.hooks_registered
|
||
}
|
||
)
|
||
loaded.commands_registered = [
|
||
c for c in self._plugin_commands
|
||
if self._plugin_commands[c].get("plugin") == manifest.name
|
||
]
|
||
loaded.enabled = True
|
||
|
||
except Exception as exc:
|
||
loaded.error = str(exc)
|
||
logger.warning("Failed to load plugin '%s': %s", manifest.name, exc)
|
||
|
||
self._plugins[manifest.name] = loaded
|
||
|
||
def _load_directory_module(self, manifest: PluginManifest) -> types.ModuleType:
|
||
"""Import a directory-based plugin as ``hermes_plugins.<name>``."""
|
||
plugin_dir = Path(manifest.path) # type: ignore[arg-type]
|
||
init_file = plugin_dir / "__init__.py"
|
||
if not init_file.exists():
|
||
raise FileNotFoundError(f"No __init__.py in {plugin_dir}")
|
||
|
||
# Ensure the namespace parent package exists
|
||
if _NS_PARENT not in sys.modules:
|
||
ns_pkg = types.ModuleType(_NS_PARENT)
|
||
ns_pkg.__path__ = [] # type: ignore[attr-defined]
|
||
ns_pkg.__package__ = _NS_PARENT
|
||
sys.modules[_NS_PARENT] = ns_pkg
|
||
|
||
module_name = f"{_NS_PARENT}.{manifest.name.replace('-', '_')}"
|
||
spec = importlib.util.spec_from_file_location(
|
||
module_name,
|
||
init_file,
|
||
submodule_search_locations=[str(plugin_dir)],
|
||
)
|
||
if spec is None or spec.loader is None:
|
||
raise ImportError(f"Cannot create module spec for {init_file}")
|
||
|
||
module = importlib.util.module_from_spec(spec)
|
||
module.__package__ = module_name
|
||
module.__path__ = [str(plugin_dir)] # type: ignore[attr-defined]
|
||
sys.modules[module_name] = module
|
||
spec.loader.exec_module(module)
|
||
return module
|
||
|
||
def _load_entrypoint_module(self, manifest: PluginManifest) -> types.ModuleType:
|
||
"""Load a pip-installed plugin via its entry-point reference."""
|
||
eps = importlib.metadata.entry_points()
|
||
if hasattr(eps, "select"):
|
||
group_eps = eps.select(group=ENTRY_POINTS_GROUP)
|
||
elif isinstance(eps, dict):
|
||
group_eps = eps.get(ENTRY_POINTS_GROUP, [])
|
||
else:
|
||
group_eps = [ep for ep in eps if ep.group == ENTRY_POINTS_GROUP]
|
||
|
||
for ep in group_eps:
|
||
if ep.name == manifest.name:
|
||
return ep.load()
|
||
|
||
raise ImportError(
|
||
f"Entry point '{manifest.name}' not found in group '{ENTRY_POINTS_GROUP}'"
|
||
)
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Hook invocation
|
||
# -----------------------------------------------------------------------
|
||
|
||
def invoke_hook(self, hook_name: str, **kwargs: Any) -> List[Any]:
|
||
"""Call all registered callbacks for *hook_name*.
|
||
|
||
Each callback is wrapped in its own try/except so a misbehaving
|
||
plugin cannot break the core agent loop.
|
||
|
||
Returns a list of non-``None`` return values from callbacks.
|
||
|
||
For ``pre_llm_call``, callbacks may return a dict describing
|
||
context to inject into the current turn's user message::
|
||
|
||
{"context": "recalled text..."}
|
||
"recalled text..." # plain string, equivalent
|
||
|
||
Context is ALWAYS injected into the user message, never the
|
||
system prompt. This preserves the prompt cache prefix — the
|
||
system prompt stays identical across turns so cached tokens
|
||
are reused. All injected context is ephemeral — never
|
||
persisted to session DB.
|
||
"""
|
||
callbacks = self._hooks.get(hook_name, [])
|
||
results: List[Any] = []
|
||
for cb in callbacks:
|
||
try:
|
||
ret = cb(**kwargs)
|
||
if ret is not None:
|
||
results.append(ret)
|
||
except Exception as exc:
|
||
logger.warning(
|
||
"Hook '%s' callback %s raised: %s",
|
||
hook_name,
|
||
getattr(cb, "__name__", repr(cb)),
|
||
exc,
|
||
)
|
||
return results
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Introspection
|
||
# -----------------------------------------------------------------------
|
||
|
||
def list_plugins(self) -> List[Dict[str, Any]]:
|
||
"""Return a list of info dicts for all discovered plugins."""
|
||
result: List[Dict[str, Any]] = []
|
||
for name, loaded in sorted(self._plugins.items()):
|
||
result.append(
|
||
{
|
||
"name": name,
|
||
"version": loaded.manifest.version,
|
||
"description": loaded.manifest.description,
|
||
"source": loaded.manifest.source,
|
||
"enabled": loaded.enabled,
|
||
"tools": len(loaded.tools_registered),
|
||
"hooks": len(loaded.hooks_registered),
|
||
"commands": len(loaded.commands_registered),
|
||
"error": loaded.error,
|
||
}
|
||
)
|
||
return result
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Plugin skill lookups
|
||
# -----------------------------------------------------------------------
|
||
|
||
def find_plugin_skill(self, qualified_name: str) -> Optional[Path]:
|
||
"""Return the ``Path`` to a plugin skill's SKILL.md, or ``None``."""
|
||
entry = self._plugin_skills.get(qualified_name)
|
||
return entry["path"] if entry else None
|
||
|
||
def list_plugin_skills(self, plugin_name: str) -> List[str]:
|
||
"""Return sorted bare names of all skills registered by *plugin_name*."""
|
||
prefix = f"{plugin_name}:"
|
||
return sorted(
|
||
e["bare_name"]
|
||
for qn, e in self._plugin_skills.items()
|
||
if qn.startswith(prefix)
|
||
)
|
||
|
||
def remove_plugin_skill(self, qualified_name: str) -> None:
|
||
"""Remove a stale registry entry (silently ignores missing keys)."""
|
||
self._plugin_skills.pop(qualified_name, None)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Module-level singleton & convenience functions
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_plugin_manager: Optional[PluginManager] = None
|
||
|
||
|
||
def get_plugin_manager() -> PluginManager:
|
||
"""Return (and lazily create) the global PluginManager singleton."""
|
||
global _plugin_manager
|
||
if _plugin_manager is None:
|
||
_plugin_manager = PluginManager()
|
||
return _plugin_manager
|
||
|
||
|
||
def discover_plugins() -> None:
|
||
"""Discover and load all plugins (idempotent)."""
|
||
get_plugin_manager().discover_and_load()
|
||
|
||
|
||
def invoke_hook(hook_name: str, **kwargs: Any) -> List[Any]:
|
||
"""Invoke a lifecycle hook on all loaded plugins.
|
||
|
||
Returns a list of non-``None`` return values from plugin callbacks.
|
||
"""
|
||
return get_plugin_manager().invoke_hook(hook_name, **kwargs)
|
||
|
||
|
||
|
||
def get_pre_tool_call_block_message(
|
||
tool_name: str,
|
||
args: Optional[Dict[str, Any]],
|
||
task_id: str = "",
|
||
session_id: str = "",
|
||
tool_call_id: str = "",
|
||
) -> Optional[str]:
|
||
"""Check ``pre_tool_call`` hooks for a blocking directive.
|
||
|
||
Plugins that need to enforce policy (rate limiting, security
|
||
restrictions, approval workflows) can return::
|
||
|
||
{"action": "block", "message": "Reason the tool was blocked"}
|
||
|
||
from their ``pre_tool_call`` callback. The first valid block
|
||
directive wins. Invalid or irrelevant hook return values are
|
||
silently ignored so existing observer-only hooks are unaffected.
|
||
"""
|
||
hook_results = invoke_hook(
|
||
"pre_tool_call",
|
||
tool_name=tool_name,
|
||
args=args if isinstance(args, dict) else {},
|
||
task_id=task_id,
|
||
session_id=session_id,
|
||
tool_call_id=tool_call_id,
|
||
)
|
||
|
||
for result in hook_results:
|
||
if not isinstance(result, dict):
|
||
continue
|
||
if result.get("action") != "block":
|
||
continue
|
||
message = result.get("message")
|
||
if isinstance(message, str) and message:
|
||
return message
|
||
|
||
return None
|
||
|
||
|
||
def get_plugin_context_engine():
|
||
"""Return the plugin-registered context engine, or None."""
|
||
return get_plugin_manager()._context_engine
|
||
|
||
|
||
def get_plugin_command_handler(name: str) -> Optional[Callable]:
|
||
"""Return the handler for a plugin-registered slash command, or ``None``."""
|
||
entry = get_plugin_manager()._plugin_commands.get(name)
|
||
return entry["handler"] if entry else None
|
||
|
||
|
||
def get_plugin_commands() -> Dict[str, dict]:
|
||
"""Return the full plugin commands dict (name → {handler, description, plugin}).
|
||
|
||
Safe to call before discovery — returns an empty dict if no plugins loaded.
|
||
"""
|
||
return get_plugin_manager()._plugin_commands
|
||
|
||
|
||
def get_plugin_toolsets() -> List[tuple]:
|
||
"""Return plugin toolsets as ``(key, label, description)`` tuples.
|
||
|
||
Used by the ``hermes tools`` TUI so plugin-provided toolsets appear
|
||
alongside the built-in ones and can be toggled on/off per platform.
|
||
"""
|
||
manager = get_plugin_manager()
|
||
if not manager._plugin_tool_names:
|
||
return []
|
||
|
||
try:
|
||
from tools.registry import registry
|
||
except Exception:
|
||
return []
|
||
|
||
# Group plugin tool names by their toolset
|
||
toolset_tools: Dict[str, List[str]] = {}
|
||
toolset_plugin: Dict[str, LoadedPlugin] = {}
|
||
for tool_name in manager._plugin_tool_names:
|
||
entry = registry.get_entry(tool_name)
|
||
if not entry:
|
||
continue
|
||
ts = entry.toolset
|
||
toolset_tools.setdefault(ts, []).append(entry.name)
|
||
|
||
# Map toolsets back to the plugin that registered them
|
||
for _name, loaded in manager._plugins.items():
|
||
for tool_name in loaded.tools_registered:
|
||
entry = registry.get_entry(tool_name)
|
||
if entry and entry.toolset in toolset_tools:
|
||
toolset_plugin.setdefault(entry.toolset, loaded)
|
||
|
||
result = []
|
||
for ts_key in sorted(toolset_tools):
|
||
plugin = toolset_plugin.get(ts_key)
|
||
label = f"🔌 {ts_key.replace('_', ' ').title()}"
|
||
if plugin and plugin.manifest.description:
|
||
desc = plugin.manifest.description
|
||
else:
|
||
desc = ", ".join(sorted(toolset_tools[ts_key]))
|
||
result.append((ts_key, label, desc))
|
||
|
||
return result
|