feat(plugins): add register_auxiliary_task() to PluginContext API

Auxiliary LLM tasks (vision, compression, web_extract, etc.) currently
require modifications to core files for any plugin that needs its own
task slot — specifically the _AUX_TASKS list in hermes_cli/main.py and
the hardcoded env-var bridging dict in gateway/run.py. This violates
the 'plugins must not modify core files' rule and forces every memory
or context plugin that wants its own auxiliary task to either fork
core or open a coupled core+plugin PR.

This change adds a generic plugin surface for auxiliary task
registration:

    ctx.register_auxiliary_task(
        key='memory_retain_filter',
        display_name='Memory retain filter',
        description='hindsight pre-retain dedup/extract',
        defaults={'timeout': 30, 'extra_body': {'reasoning_effort': 'low'}},
    )

After registration, the task automatically:

  - Appears in 'hermes model → Configure auxiliary models' picker via
    a new _all_aux_tasks() merge of built-in + plugin tasks
  - Has its provider/model/base_url/api_key bridged from config.yaml
    to AUXILIARY_<KEY_UPPER>_* env vars at gateway startup
    (gateway/run.py now uses a dynamic bridged-keys set instead of
    a hardcoded per-task dict)
  - Gets plugin-declared defaults (timeout, extra_body, etc.) layered
    underneath user config so unconfigured plugin tasks still work
    (agent/auxiliary_client._get_auxiliary_task_config)
  - Resets to auto via 'Reset all to auto' alongside built-ins

Validation:

  - Rejects shadowing of built-in keys (vision, compression, etc.)
  - Rejects invalid key shapes (must match [A-Za-z0-9_]+)
  - Rejects cross-plugin collisions (clear error)
  - Allows same-plugin re-registration (idempotent updates)

Plugin discovery failures (rare) fall back gracefully — the aux
config UI still shows built-in tasks if get_plugin_auxiliary_tasks()
raises, and gateway env-var bridging keeps working for built-ins.

Built-in tasks remain hardcoded in _AUX_TASKS for stability — they're
the baseline UX, and DEFAULT_CONFIG already ships their defaults.
Plugin tasks layer on top.

Tests: 15 new tests in test_plugin_auxiliary_tasks.py covering API
validation, manager state lifecycle, helper sort order, _all_aux_tasks
merge semantics, _reset_aux_to_auto inclusion of plugin tasks, and
default-layering in auxiliary_client.

Updates the gateway-bridge code-parity test (test_auxiliary_config_bridge)
to assert the new dynamic shape rather than the hardcoded literal env
var names which no longer appear post-refactor.

Motivation: this unblocks PR #20262 (hindsight smart retain pipeline)
and similar plugins that need a dedicated aux task slot. The change
is non-breaking — built-in env vars (AUXILIARY_VISION_PROVIDER, etc.)
keep working since they're produced by the same f-string template
that built the hardcoded names.
This commit is contained in:
Edison 2026-05-20 16:36:30 -05:00 committed by Teknium
parent e8fa415a9e
commit e752c9454e
6 changed files with 597 additions and 47 deletions

View file

@ -4344,7 +4344,17 @@ _DEFAULT_AUX_TIMEOUT = 30.0
def _get_auxiliary_task_config(task: str) -> Dict[str, Any]:
"""Return the config dict for auxiliary.<task>, or {} when unavailable."""
"""Return the config dict for auxiliary.<task>, or {} when unavailable.
For plugin-registered auxiliary tasks (see
:meth:`hermes_cli.plugins.PluginContext.register_auxiliary_task`) the
plugin's declared *defaults* are layered underneath the user's config
so an unconfigured plugin task still works:
plugin defaults config.yaml auxiliary.<task> (user wins)
Built-in tasks ignore this path (their defaults live in DEFAULT_CONFIG).
"""
if not task:
return {}
try:
@ -4354,7 +4364,27 @@ def _get_auxiliary_task_config(task: str) -> Dict[str, Any]:
return {}
aux = config.get("auxiliary", {}) if isinstance(config, dict) else {}
task_config = aux.get(task, {}) if isinstance(aux, dict) else {}
return task_config if isinstance(task_config, dict) else {}
if not isinstance(task_config, dict):
task_config = {}
# Layer plugin-declared defaults underneath user config so
# ctx.register_auxiliary_task(defaults={...}) takes effect without
# forcing the user to write config.yaml entries.
try:
from hermes_cli.plugins import get_plugin_auxiliary_tasks
for _entry in get_plugin_auxiliary_tasks():
if _entry.get("key") == task:
_defaults = _entry.get("defaults") or {}
if isinstance(_defaults, dict):
merged = dict(_defaults)
merged.update(task_config)
return merged
break
except Exception:
# Plugin discovery failure must not break aux task config reads.
pass
return task_config
def _get_task_timeout(task: str, default: float = _DEFAULT_AUX_TIMEOUT) -> float:

View file

@ -774,31 +774,29 @@ if _config_path.exists():
os.environ[_env_var] = str(_val)
# Compression config is read directly from config.yaml by run_agent.py
# and auxiliary_client.py — no env var bridging needed.
# Auxiliary model/direct-endpoint overrides (vision, web_extract).
# Each task has provider/model/base_url/api_key; bridge non-default values to env vars.
# Auxiliary model/direct-endpoint overrides (vision, web_extract,
# approval, plus any plugin-registered auxiliary tasks).
# Each task has provider/model/base_url/api_key; bridge non-default
# values to env vars named AUXILIARY_<KEY_UPPER>_*. The legacy
# hard-coded list (vision/web_extract/approval) is replaced by a
# dynamic loop so plugin-registered tasks benefit from the same
# config→env bridging without core knowing about each one.
_auxiliary_cfg = _cfg.get("auxiliary", {})
if _auxiliary_cfg and isinstance(_auxiliary_cfg, dict):
_aux_task_env = {
"vision": {
"provider": "AUXILIARY_VISION_PROVIDER",
"model": "AUXILIARY_VISION_MODEL",
"base_url": "AUXILIARY_VISION_BASE_URL",
"api_key": "AUXILIARY_VISION_API_KEY",
},
"web_extract": {
"provider": "AUXILIARY_WEB_EXTRACT_PROVIDER",
"model": "AUXILIARY_WEB_EXTRACT_MODEL",
"base_url": "AUXILIARY_WEB_EXTRACT_BASE_URL",
"api_key": "AUXILIARY_WEB_EXTRACT_API_KEY",
},
"approval": {
"provider": "AUXILIARY_APPROVAL_PROVIDER",
"model": "AUXILIARY_APPROVAL_MODEL",
"base_url": "AUXILIARY_APPROVAL_BASE_URL",
"api_key": "AUXILIARY_APPROVAL_API_KEY",
},
}
for _task_key, _env_map in _aux_task_env.items():
# Built-in tasks that previously had explicit env-var bridging.
# Kept here as the canonical bridged set; plugin tasks are added
# below via the plugin auxiliary registry.
_aux_bridged_keys = {"vision", "web_extract", "approval"}
try:
from hermes_cli.plugins import get_plugin_auxiliary_tasks
for _entry in get_plugin_auxiliary_tasks():
_aux_bridged_keys.add(_entry["key"])
except Exception:
# Plugin discovery failure must not break gateway startup;
# built-in bridging stays intact.
pass
for _task_key in _aux_bridged_keys:
_task_cfg = _auxiliary_cfg.get(_task_key, {})
if not isinstance(_task_cfg, dict):
continue
@ -806,14 +804,15 @@ if _config_path.exists():
_model = str(_task_cfg.get("model", "")).strip()
_base_url = str(_task_cfg.get("base_url", "")).strip()
_api_key = str(_task_cfg.get("api_key", "")).strip()
_upper = _task_key.upper()
if _prov and _prov != "auto":
os.environ[_env_map["provider"]] = _prov
os.environ[f"AUXILIARY_{_upper}_PROVIDER"] = _prov
if _model:
os.environ[_env_map["model"]] = _model
os.environ[f"AUXILIARY_{_upper}_MODEL"] = _model
if _base_url:
os.environ[_env_map["base_url"]] = _base_url
os.environ[f"AUXILIARY_{_upper}_BASE_URL"] = _base_url
if _api_key:
os.environ[_env_map["api_key"]] = _api_key
os.environ[f"AUXILIARY_{_upper}_API_KEY"] = _api_key
# config.yaml is the documented, authoritative source for these
# settings — it unconditionally wins over .env values. Previously
# the guards below read `if X not in os.environ` and let stale

View file

@ -2505,6 +2505,27 @@ _AUX_TASKS: list[tuple[str, str, str]] = [
]
def _all_aux_tasks() -> list[tuple[str, str, str]]:
"""Return built-in + plugin-registered auxiliary tasks for picker/menu use.
Built-in tasks come first (preserving order), followed by plugin tasks
sorted by key. Used by ``_aux_config_menu``, ``_reset_aux_to_auto``, and
display-name lookups so plugin-registered tasks (registered via
:meth:`hermes_cli.plugins.PluginContext.register_auxiliary_task`) appear
in the same surfaces as built-in ones without core knowing about them.
"""
tasks = list(_AUX_TASKS)
try:
from hermes_cli.plugins import get_plugin_auxiliary_tasks
for entry in get_plugin_auxiliary_tasks():
tasks.append((entry["key"], entry["display_name"], entry["description"]))
except Exception:
# Plugin discovery failure must not break the aux config UI.
# Built-in tasks remain available.
pass
return tasks
def _format_aux_current(task_cfg: dict) -> str:
"""Render the current aux config for display in the task menu."""
if not isinstance(task_cfg, dict):
@ -2555,7 +2576,11 @@ def _save_aux_choice(
def _reset_aux_to_auto() -> int:
"""Reset every known aux task back to auto/empty. Returns number reset."""
"""Reset every known aux task back to auto/empty. Returns number reset.
Includes plugin-registered tasks (via ``_all_aux_tasks``) so a plugin
that contributed an auxiliary task gets reset alongside built-ins.
"""
from hermes_cli.config import load_config, save_config
cfg = load_config()
@ -2564,7 +2589,7 @@ def _reset_aux_to_auto() -> int:
aux = {}
cfg["auxiliary"] = aux
count = 0
for task, _name, _desc in _AUX_TASKS:
for task, _name, _desc in _all_aux_tasks():
entry = aux.setdefault(task, {})
if not isinstance(entry, dict):
entry = {}
@ -2607,10 +2632,11 @@ def _aux_config_menu() -> None:
print()
# Build the task menu with current settings inline
name_col = max(len(name) for _, name, _ in _AUX_TASKS) + 2
desc_col = max(len(desc) for _, _, desc in _AUX_TASKS) + 4
all_tasks = _all_aux_tasks()
name_col = max(len(name) for _, name, _ in all_tasks) + 2
desc_col = max(len(desc) for _, _, desc in all_tasks) + 4
entries: list[tuple[str, str]] = []
for task_key, name, desc in _AUX_TASKS:
for task_key, name, desc in all_tasks:
task_cfg = (
aux.get(task_key, {}) if isinstance(aux.get(task_key), dict) else {}
)
@ -2661,7 +2687,7 @@ def _aux_select_for_task(task: str) -> None:
current_model = str(task_cfg.get("model") or "").strip()
current_base_url = str(task_cfg.get("base_url") or "").strip()
display_name = next((name for key, name, _ in _AUX_TASKS if key == task), task)
display_name = next((name for key, name, _ in _all_aux_tasks() if key == task), task)
# Gather authenticated providers (has credentials + curated model list)
try:
@ -2732,7 +2758,7 @@ def _aux_flow_provider_model(
from hermes_cli.auth import _prompt_model_selection
from hermes_cli.models import get_pricing_for_provider
display_name = next((name for key, name, _ in _AUX_TASKS if key == task), task)
display_name = next((name for key, name, _ in _all_aux_tasks() if key == task), task)
# Fetch live pricing for this provider (non-blocking)
pricing: dict = {}
@ -2778,7 +2804,7 @@ def _aux_flow_custom_endpoint(task: str, task_cfg: dict) -> None:
"""Prompt for a direct OpenAI-compatible base_url + optional api_key/model."""
import getpass
display_name = next((name for key, name, _ in _AUX_TASKS if key == task), task)
display_name = next((name for key, name, _ in _all_aux_tasks() if key == task), task)
current_base_url = str(task_cfg.get("base_url") or "").strip()
current_model = str(task_cfg.get("model") or "").strip()

View file

@ -698,6 +698,119 @@ class PluginContext:
# -- hook registration --------------------------------------------------
# -- auxiliary task registration ---------------------------------------
def register_auxiliary_task(
self,
key: str,
*,
display_name: str,
description: str,
defaults: Optional[Dict[str, Any]] = None,
) -> None:
"""Register a plugin-defined auxiliary LLM task.
Auxiliary tasks are LLM-backed side jobs (vision analysis, web extraction,
compression, smart-approval, etc.) that route through ``auxiliary_client.py``.
Each task has its own ``auxiliary.<key>`` config block where users can
pin a provider/model independent of the main chat model.
Plugins use this to declare their own auxiliary tasks without touching
core files. After registration, the task:
- Appears in the ``hermes model Configure auxiliary models`` picker
- Has its provider/model/base_url/api_key bridged from config.yaml to
``AUXILIARY_<KEY_UPPER>_*`` env vars at gateway startup
- Gets default routing fields (provider="auto", model="", etc.) merged
into loaded configs so ``cfg.get("auxiliary", {}).get(key)`` works
Args:
key: stable task key (snake_case). Used in config ``auxiliary.<key>``
and env vars ``AUXILIARY_<KEY_UPPER>_*``. Must not shadow a
built-in task key (vision, compression, web_extract, approval,
mcp, title_generation, skills_hub, curator).
display_name: human-readable name shown in the picker.
description: short one-line description shown next to the name.
defaults: optional dict of default routing fields. Recognized keys:
``provider`` (default "auto"), ``model`` (default ""),
``base_url`` (default ""), ``api_key`` (default ""),
``timeout`` (default 60), ``extra_body`` (default {}),
plus any task-specific extras (e.g. ``download_timeout``).
Unknown keys are preserved verbatim the plugin owns the
schema for its own task.
Raises:
ValueError: if *key* is empty, contains invalid characters, or
shadows a built-in auxiliary task key.
Example:
ctx.register_auxiliary_task(
key="memory_retain_filter",
display_name="Memory retain filter",
description="hindsight pre-retain dedup/extract",
defaults={"provider": "auto", "timeout": 30},
)
"""
# Validate key shape
if not key or not isinstance(key, str):
raise ValueError(
f"Plugin '{self.manifest.name}' tried to register auxiliary task "
f"with invalid key {key!r}"
)
if not all(c.isalnum() or c == "_" for c in key):
raise ValueError(
f"Plugin '{self.manifest.name}' auxiliary task key {key!r} "
f"must contain only alphanumeric characters and underscores"
)
# Lazy import to avoid circular: hermes_cli.main imports plugins indirectly
from hermes_cli.main import _AUX_TASKS as _BUILTIN_AUX_TASKS
builtin_keys = {k for k, _name, _desc in _BUILTIN_AUX_TASKS}
if key in builtin_keys:
raise ValueError(
f"Plugin '{self.manifest.name}' cannot register auxiliary task "
f"{key!r} — that key is reserved for a built-in task. "
f"Pick a plugin-namespaced key (e.g. '{self.manifest.name}_{key}')."
)
# Reject duplicate registrations across plugins
existing = self._manager._aux_tasks.get(key)
if existing is not None and existing.get("plugin") != self.manifest.name:
raise ValueError(
f"Plugin '{self.manifest.name}' cannot register auxiliary task "
f"{key!r} — already registered by plugin "
f"'{existing.get('plugin')}'"
)
# Normalize defaults — plugin owns the schema, but we ensure routing
# fields exist with sensible types so consumers don't crash.
merged_defaults: Dict[str, Any] = {
"provider": "auto",
"model": "",
"base_url": "",
"api_key": "",
"timeout": 60,
"extra_body": {},
}
if defaults:
for k, v in defaults.items():
merged_defaults[k] = v
self._manager._aux_tasks[key] = {
"key": key,
"display_name": display_name,
"description": description,
"defaults": merged_defaults,
"plugin": self.manifest.name,
}
logger.debug(
"Plugin %s registered auxiliary task: %s (%s)",
self.manifest.name,
key,
display_name,
)
def register_hook(self, hook_name: str, callback: Callable) -> None:
"""Register a lifecycle hook callback.
@ -782,6 +895,9 @@ class PluginManager:
self._cli_ref = None # Set by CLI after plugin discovery
# Plugin skill registry: qualified name → metadata dict.
self._plugin_skills: Dict[str, Dict[str, Any]] = {}
# Plugin-registered auxiliary tasks: key → {key, display_name,
# description, defaults, plugin}. See PluginContext.register_auxiliary_task.
self._aux_tasks: Dict[str, Dict[str, Any]] = {}
# -----------------------------------------------------------------------
# Public
@ -803,6 +919,7 @@ class PluginManager:
self._cli_commands.clear()
self._plugin_commands.clear()
self._plugin_skills.clear()
self._aux_tasks.clear()
self._context_engine = None
self._discovered = True
@ -1548,6 +1665,21 @@ def get_plugin_commands() -> Dict[str, dict]:
return _ensure_plugins_discovered()._plugin_commands
def get_plugin_auxiliary_tasks() -> List[Dict[str, Any]]:
"""Return all plugin-registered auxiliary tasks as a stable-ordered list.
Each entry is the registration dict from
:meth:`PluginContext.register_auxiliary_task`:
``{key, display_name, description, defaults, plugin}``.
Triggers idempotent plugin discovery so callers can read the registry
before any explicit ``discover_plugins()`` call. Sorted by ``key`` for
deterministic ordering in pickers and tests.
"""
manager = _ensure_plugins_discovered()
return [manager._aux_tasks[k] for k in sorted(manager._aux_tasks)]
def get_plugin_toolsets() -> List[tuple]:
"""Return plugin toolsets as ``(key, label, description)`` tuples.

View file

@ -198,22 +198,32 @@ class TestGatewayBridgeCodeParity:
"""Verify the gateway/run.py config bridge contains the auxiliary section."""
def test_gateway_has_auxiliary_bridge(self):
"""The gateway config bridge must include auxiliary.* bridging."""
"""The gateway config bridge must include auxiliary.* bridging.
After the plugin-aux-task API refactor (2026-05), gateway env-var
names are derived dynamically (``AUXILIARY_<KEY_UPPER>_*``) so the
literal strings ``AUXILIARY_VISION_PROVIDER`` etc. no longer appear
in source. Assert the dynamic shape and the canonical built-in keys
bridged set instead.
"""
gateway_path = Path(__file__).parent.parent.parent / "gateway" / "run.py"
# Pin encoding to UTF-8: source files in this repo are UTF-8, but
# Path.read_text() defaults to the system locale — which is cp1252
# on most Western Windows installs and crashes as soon as the file
# contains any non-ASCII byte (e.g. an em-dash in a comment).
content = gateway_path.read_text(encoding="utf-8")
# Check for key patterns that indicate the bridge is present
assert "AUXILIARY_VISION_PROVIDER" in content
assert "AUXILIARY_VISION_MODEL" in content
assert "AUXILIARY_VISION_BASE_URL" in content
assert "AUXILIARY_VISION_API_KEY" in content
assert "AUXILIARY_WEB_EXTRACT_PROVIDER" in content
assert "AUXILIARY_WEB_EXTRACT_MODEL" in content
assert "AUXILIARY_WEB_EXTRACT_BASE_URL" in content
assert "AUXILIARY_WEB_EXTRACT_API_KEY" in content
# Dynamic env-var derivation present
assert 'f"AUXILIARY_{_upper}_PROVIDER"' in content
assert 'f"AUXILIARY_{_upper}_MODEL"' in content
assert 'f"AUXILIARY_{_upper}_BASE_URL"' in content
assert 'f"AUXILIARY_{_upper}_API_KEY"' in content
# Built-in bridged keys present
assert "_aux_bridged_keys" in content
assert '"vision"' in content
assert '"web_extract"' in content
assert '"approval"' in content
# Plugin-aux-task discovery hooked into bridging
assert "get_plugin_auxiliary_tasks" in content
def test_gateway_no_compression_env_bridge(self):
"""Gateway should NOT bridge compression config to env vars (config-only)."""

View file

@ -0,0 +1,353 @@
"""Tests for the plugin auxiliary-task registration API.
Covers:
- PluginContext.register_auxiliary_task() validation
- PluginManager._aux_tasks storage + force-rediscovery clearing
- get_plugin_auxiliary_tasks() module-level helper
- _all_aux_tasks() merge of built-in + plugin tasks
- _reset_aux_to_auto() includes plugin tasks
- _get_auxiliary_task_config() layers plugin defaults under user config
"""
from __future__ import annotations
import pytest
from hermes_cli.plugins import (
PluginContext,
PluginManager,
PluginManifest,
get_plugin_auxiliary_tasks,
)
# ── Fixtures ─────────────────────────────────────────────────────────────────
def _make_ctx(name: str = "test_plugin") -> tuple[PluginContext, PluginManager]:
"""Build a PluginContext + fresh PluginManager wired together.
The manager skips discovery (no plugins.yaml, no scan) so the test
can exercise registration paths directly.
"""
manager = PluginManager()
manager._discovered = True # skip auto-discovery on lookup
manifest = PluginManifest(name=name)
ctx = PluginContext(manifest, manager)
return ctx, manager
@pytest.fixture
def patched_manager(monkeypatch):
"""Replace the module-level singleton with a fresh manager for the test.
Restored automatically after the test by monkeypatch.
"""
from hermes_cli import plugins as plugins_mod
fresh = PluginManager()
fresh._discovered = True
monkeypatch.setattr(plugins_mod, "_PLUGIN_MANAGER", fresh, raising=False)
def _stub_get_manager() -> PluginManager:
return fresh
monkeypatch.setattr(plugins_mod, "get_plugin_manager", _stub_get_manager)
monkeypatch.setattr(plugins_mod, "_ensure_plugins_discovered", _stub_get_manager)
yield fresh
# ── PluginContext.register_auxiliary_task ────────────────────────────────────
def test_register_auxiliary_task_basic():
ctx, manager = _make_ctx("my_plugin")
ctx.register_auxiliary_task(
key="my_task",
display_name="My task",
description="a custom side task",
)
assert "my_task" in manager._aux_tasks
entry = manager._aux_tasks["my_task"]
assert entry["key"] == "my_task"
assert entry["display_name"] == "My task"
assert entry["description"] == "a custom side task"
assert entry["plugin"] == "my_plugin"
# Routing defaults populated
assert entry["defaults"]["provider"] == "auto"
assert entry["defaults"]["model"] == ""
assert entry["defaults"]["timeout"] == 60
def test_register_auxiliary_task_with_custom_defaults():
ctx, manager = _make_ctx()
ctx.register_auxiliary_task(
key="custom_task",
display_name="Custom",
description="d",
defaults={"timeout": 30, "extra_body": {"reasoning_effort": "low"}},
)
entry = manager._aux_tasks["custom_task"]
assert entry["defaults"]["timeout"] == 30
assert entry["defaults"]["extra_body"] == {"reasoning_effort": "low"}
# Unspecified defaults still populated
assert entry["defaults"]["provider"] == "auto"
def test_register_auxiliary_task_rejects_builtin_keys():
ctx, _ = _make_ctx()
for builtin in (
"vision",
"compression",
"web_extract",
"approval",
"mcp",
"title_generation",
"skills_hub",
"curator",
):
with pytest.raises(ValueError, match="reserved for a built-in task"):
ctx.register_auxiliary_task(
key=builtin,
display_name="x",
description="x",
)
def test_register_auxiliary_task_rejects_invalid_key_shapes():
ctx, _ = _make_ctx()
for bad in ("", "with-dash", "with.dot", "with space", "with/slash"):
with pytest.raises(ValueError):
ctx.register_auxiliary_task(
key=bad,
display_name="x",
description="x",
)
def test_register_auxiliary_task_allows_same_plugin_re_registration():
"""Re-registration by the same plugin updates the entry (idempotent)."""
ctx, manager = _make_ctx("plug_a")
ctx.register_auxiliary_task(
key="t1", display_name="First", description="first"
)
ctx.register_auxiliary_task(
key="t1", display_name="Second", description="second"
)
assert manager._aux_tasks["t1"]["display_name"] == "Second"
def test_register_auxiliary_task_rejects_cross_plugin_collision():
"""Two different plugins cannot register the same task key."""
manager = PluginManager()
manager._discovered = True
manifest_a = PluginManifest(name="plug_a")
manifest_b = PluginManifest(name="plug_b")
ctx_a = PluginContext(manifest_a, manager)
ctx_b = PluginContext(manifest_b, manager)
ctx_a.register_auxiliary_task(
key="shared", display_name="A", description="a"
)
with pytest.raises(ValueError, match="already registered by plugin 'plug_a'"):
ctx_b.register_auxiliary_task(
key="shared", display_name="B", description="b"
)
# ── PluginManager state lifecycle ────────────────────────────────────────────
def test_force_rediscovery_clears_aux_tasks():
ctx, manager = _make_ctx()
ctx.register_auxiliary_task(
key="will_be_cleared",
display_name="x",
description="x",
)
assert "will_be_cleared" in manager._aux_tasks
manager._discovered = False
# Simulate force=True path: clears state before re-scanning
manager._aux_tasks.clear()
assert manager._aux_tasks == {}
# ── Module-level helper ──────────────────────────────────────────────────────
def test_get_plugin_auxiliary_tasks_returns_sorted_list(patched_manager):
manifest = PluginManifest(name="plug")
ctx = PluginContext(manifest, patched_manager)
ctx.register_auxiliary_task(
key="zeta_task", display_name="Zeta", description="z"
)
ctx.register_auxiliary_task(
key="alpha_task", display_name="Alpha", description="a"
)
ctx.register_auxiliary_task(
key="mike_task", display_name="Mike", description="m"
)
tasks = get_plugin_auxiliary_tasks()
assert [t["key"] for t in tasks] == ["alpha_task", "mike_task", "zeta_task"]
def test_get_plugin_auxiliary_tasks_empty_when_none_registered(patched_manager):
assert get_plugin_auxiliary_tasks() == []
# ── _all_aux_tasks merges built-in + plugin ──────────────────────────────────
def test_all_aux_tasks_includes_plugin_registered(patched_manager):
from hermes_cli.main import _AUX_TASKS, _all_aux_tasks
manifest = PluginManifest(name="hindsight")
ctx = PluginContext(manifest, patched_manager)
ctx.register_auxiliary_task(
key="memory_retain_filter",
display_name="Memory retain filter",
description="hindsight pre-retain dedup/extract",
)
merged = _all_aux_tasks()
keys = [k for k, _, _ in merged]
# Built-ins preserved (and come first)
builtin_keys = [k for k, _, _ in _AUX_TASKS]
assert keys[: len(builtin_keys)] == builtin_keys
# Plugin task appended
assert "memory_retain_filter" in keys
plugin_entry = next(t for t in merged if t[0] == "memory_retain_filter")
assert plugin_entry == (
"memory_retain_filter",
"Memory retain filter",
"hindsight pre-retain dedup/extract",
)
def test_all_aux_tasks_swallows_plugin_discovery_failure(monkeypatch):
"""Plugin discovery failure must not break the aux config UI."""
from hermes_cli import main as main_mod
def _broken():
raise RuntimeError("plugin scan exploded")
monkeypatch.setattr(
"hermes_cli.plugins.get_plugin_auxiliary_tasks", _broken
)
merged = main_mod._all_aux_tasks()
# Built-in tasks still present
assert any(k == "vision" for k, _, _ in merged)
# ── _reset_aux_to_auto includes plugin tasks ─────────────────────────────────
def test_reset_aux_to_auto_resets_plugin_tasks(tmp_path, monkeypatch, patched_manager):
"""Plugin task with non-auto config gets reset alongside built-ins."""
from pathlib import Path
from hermes_cli.config import load_config, save_config
from hermes_cli.main import _reset_aux_to_auto
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
(tmp_path / ".hermes").mkdir(exist_ok=True)
manifest = PluginManifest(name="plug")
ctx = PluginContext(manifest, patched_manager)
ctx.register_auxiliary_task(
key="my_aux",
display_name="My Aux",
description="d",
)
# Manually configure the plugin task to non-auto
cfg = load_config()
aux = cfg.setdefault("auxiliary", {})
aux["my_aux"] = {"provider": "openrouter", "model": "gpt-4o", "base_url": "", "api_key": ""}
save_config(cfg)
n = _reset_aux_to_auto()
assert n >= 1
cfg = load_config()
assert cfg["auxiliary"]["my_aux"]["provider"] == "auto"
assert cfg["auxiliary"]["my_aux"]["model"] == ""
# ── auxiliary_client._get_auxiliary_task_config defaults layering ────────────
def test_get_auxiliary_task_config_layers_plugin_defaults(
tmp_path, monkeypatch, patched_manager
):
"""Plugin-declared defaults appear when user has no config entry."""
from pathlib import Path
from agent.auxiliary_client import _get_auxiliary_task_config
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
(tmp_path / ".hermes").mkdir(exist_ok=True)
manifest = PluginManifest(name="plug")
ctx = PluginContext(manifest, patched_manager)
ctx.register_auxiliary_task(
key="my_filter",
display_name="My filter",
description="x",
defaults={"timeout": 15, "extra_body": {"reasoning_effort": "low"}},
)
# No user config for my_filter — defaults should surface
resolved = _get_auxiliary_task_config("my_filter")
assert resolved["timeout"] == 15
assert resolved["extra_body"] == {"reasoning_effort": "low"}
assert resolved["provider"] == "auto"
def test_get_auxiliary_task_config_user_config_wins_over_plugin_defaults(
tmp_path, monkeypatch, patched_manager
):
"""User's config.yaml entry overrides plugin-declared defaults."""
from pathlib import Path
from hermes_cli.config import load_config, save_config
from agent.auxiliary_client import _get_auxiliary_task_config
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
(tmp_path / ".hermes").mkdir(exist_ok=True)
manifest = PluginManifest(name="plug")
ctx = PluginContext(manifest, patched_manager)
ctx.register_auxiliary_task(
key="my_filter",
display_name="My filter",
description="x",
defaults={"timeout": 15, "provider": "auto"},
)
# User overrides timeout + provider via config.yaml
cfg = load_config()
aux = cfg.setdefault("auxiliary", {})
aux["my_filter"] = {"timeout": 90, "provider": "nous"}
save_config(cfg)
resolved = _get_auxiliary_task_config("my_filter")
assert resolved["timeout"] == 90 # user wins
assert resolved["provider"] == "nous" # user wins
def test_get_auxiliary_task_config_unknown_task_returns_empty(
tmp_path, monkeypatch, patched_manager
):
from pathlib import Path
from agent.auxiliary_client import _get_auxiliary_task_config
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
(tmp_path / ".hermes").mkdir(exist_ok=True)
assert _get_auxiliary_task_config("nonexistent") == {}