feat(context-engine): host contract for external context engines

Condenses the substance of PRs #16453, #17453, #16451, #17600, and #13373
into a minimal generic host contract that external context engine plugins
(e.g. hermes-lcm) need to integrate cleanly. Drops scaffolding that
duplicated existing infrastructure or had marginal value.

Five concrete changes:

1. `_transition_context_engine_session()` on AIAgent — generic lifecycle
   helper that fires on_session_end → on_session_reset → on_session_start
   → optional carry_over_new_session_context. Engines implement only the
   hooks they need; missing hooks are skipped. Built-in compressor keeps
   its existing reset-only behavior because callers default to no
   metadata. `reset_session_state()` now optionally accepts
   previous_messages / old_session_id / carry_over_context and delegates
   to the transition helper when provided. (#16453)

2. `conversation_id` passed to `on_session_start()` — both the
   agent-init call site and the compression-boundary call site now
   forward `self._gateway_session_key` so plugin engines have a stable
   conversation identity that survives session_id rotation (compression
   splits, /new, resume). The key already existed on AIAgent; it just
   wasn't reaching engines. (#16453)

3. Canonical cache buckets forwarded to engines — the usage dict passed
   to `update_from_response()` now includes input_tokens, output_tokens,
   cache_read_tokens, cache_write_tokens, and reasoning_tokens on top of
   the legacy prompt/completion/total keys. Engines can make decisions on
   cache-hit ratios and reasoning costs instead of only aggregates. ABC
   docstring updated. (#17453)

4. Plugin-registered context engines visible in the picker —
   `_discover_context_engines()` in plugins_cmd.py now also includes
   engines registered via `ctx.register_context_engine()` from plugin
   manifests, deduplicating by name so repo-shipped descriptions win on
   collision. (#16451)

5. `_EngineCollector.register_command()` — context engines using the
   standard `register(ctx)` pattern can now expose slash commands (e.g.
   `/lcm`). Routes to the global plugin command registry with the same
   conflict-rejection policy regular plugins use (no shadowing built-ins,
   no clobbering other plugins). Previously these calls hit a no-op and
   the slash commands silently never appeared. (#17600)

Dropped from the original 5 PRs:

- Compression boundary signal (`boundary_reason="compression"`) from
  #16453 — already on main at `agent/conversation_compression.py:412-424`,
  landed via the bg-review extraction.

- `discover_plugins()` before fallback in run_agent.py from #16451 —
  redundant: `get_plugin_context_engine()` already routes through
  `_ensure_plugins_discovered()` which is idempotent.

- Runtime identity diagnostics method + helpers from #13373 (+251 LOC) —
  operators can already read engine state via `engine.get_status()`;
  the diagnostics view added marginal value relative to its surface area.

- The 553-LOC slash-command machinery from #17600 — replaced with a
  20-LOC `register_command` method on the collector that reuses the
  existing plugin command registry instead of building a parallel one.

Net: ~215 LOC of host-contract changes + 282 LOC of focused tests, vs
~1,176 LOC across the original 5 PRs.

Co-authored-by: Tosko4 <1294707+Tosko4@users.noreply.github.com>

Closes #16453.
Closes #17453.
Closes #16451.
Closes #17600.
Closes #13373.
Related: stephenschoettler/hermes-lcm#68.
This commit is contained in:
teknium1 2026-05-28 01:38:13 -07:00 committed by Teknium
parent fb9f3a4ef9
commit 9b5dae17a5
8 changed files with 491 additions and 14 deletions

View file

@ -1522,6 +1522,7 @@ def init_agent(
platform=agent.platform or "cli",
model=agent.model,
context_length=getattr(agent.context_compressor, "context_length", 0),
conversation_id=getattr(agent, "_gateway_session_key", None),
)
except Exception as _ce_err:
_ra().logger.debug("Context engine on_session_start: %s", _ce_err)

View file

@ -71,7 +71,12 @@ class ContextEngine(ABC):
def update_from_response(self, usage: Dict[str, Any]) -> None:
"""Update tracked token usage from an API response.
Called after every LLM call with the usage dict from the response.
Called after every LLM call with a normalized usage dict. The legacy
keys ``prompt_tokens``, ``completion_tokens``, and ``total_tokens``
are always present. Newer hosts also include canonical buckets:
``input_tokens``, ``output_tokens``, ``cache_read_tokens``,
``cache_write_tokens``, and ``reasoning_tokens``. Engines should
treat those fields as optional for compatibility with older hosts.
"""
@abstractmethod

View file

@ -421,6 +421,7 @@ def compress_context(
agent.session_id or "",
boundary_reason="compression",
old_session_id=_old_sid,
conversation_id=getattr(agent, "_gateway_session_key", None),
)
except Exception as _ce_err:
logger.debug("context engine on_session_start (compression): %s", _ce_err)

View file

@ -1769,10 +1769,19 @@ def run_conversation(
prompt_tokens = canonical_usage.prompt_tokens
completion_tokens = canonical_usage.output_tokens
total_tokens = canonical_usage.total_tokens
# Forward canonical token + cache buckets so context engines
# can make decisions on cache hit ratios / reasoning costs,
# not just legacy aggregate tokens. Legacy keys stay for
# back-compat with engines that only read prompt/completion/total.
usage_dict = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"input_tokens": canonical_usage.input_tokens,
"output_tokens": canonical_usage.output_tokens,
"cache_read_tokens": canonical_usage.cache_read_tokens,
"cache_write_tokens": canonical_usage.cache_write_tokens,
"reasoning_tokens": canonical_usage.reasoning_tokens,
}
agent.context_compressor.update_from_response(usage_dict)

View file

@ -864,12 +864,35 @@ def _discover_memory_providers() -> list[tuple[str, str]]:
def _discover_context_engines() -> list[tuple[str, str]]:
"""Return [(name, description), ...] for available context engines."""
"""Return [(name, description), ...] for available context engines.
Includes repo-shipped engines from ``plugins/context_engine/`` AND
plugin-registered engines (third-party engines installed as Hermes
plugins via ``ctx.register_context_engine``). Repo-shipped descriptions
win when a plugin-registered engine collides on name.
"""
engines: list[tuple[str, str]] = []
seen: set[str] = set()
try:
from plugins.context_engine import discover_context_engines
return [(name, desc) for name, desc, _avail in discover_context_engines()]
for name, desc, _avail in discover_context_engines():
if name not in seen:
engines.append((name, desc))
seen.add(name)
except Exception:
return []
pass
try:
from hermes_cli.plugins import discover_plugins, get_plugin_context_engine
discover_plugins()
plugin_engine = get_plugin_context_engine()
if plugin_engine and getattr(plugin_engine, "name", None) and plugin_engine.name not in seen:
engines.append((plugin_engine.name, "installed plugin"))
except Exception:
pass
return engines
def _get_current_memory_provider() -> str:

View file

@ -174,7 +174,7 @@ def _load_engine_from_dir(engine_dir: Path) -> Optional["ContextEngine"]:
# Try register(ctx) pattern first (how plugins are written)
if hasattr(mod, "register"):
collector = _EngineCollector()
collector = _EngineCollector(engine_name=name)
try:
mod.register(collector)
if collector.engine:
@ -197,14 +197,80 @@ def _load_engine_from_dir(engine_dir: Path) -> Optional["ContextEngine"]:
class _EngineCollector:
"""Fake plugin context that captures register_context_engine calls."""
"""Fake plugin context that captures register_context_engine calls.
def __init__(self):
Plugin context engines using the standard ``register(ctx)`` pattern may
also call ``ctx.register_command(...)`` to expose slash commands (e.g.
``/lcm``). Forward those to the global plugin command registry so they
behave identically to commands registered by normal plugins.
"""
def __init__(self, engine_name: str = ""):
self.engine = None
self._engine_name = engine_name or "context_engine"
self._registered_commands: list[str] = []
def register_context_engine(self, engine):
self.engine = engine
def register_command(
self,
name: str,
handler,
description: str = "",
args_hint: str = "",
) -> None:
"""Forward to the global plugin command registry."""
clean = (name or "").lower().strip().lstrip("/").replace(" ", "-")
if not clean:
logger.warning(
"Context engine '%s' tried to register a command with an empty name.",
self._engine_name,
)
return
# Reject conflicts with built-in commands.
try:
from hermes_cli.commands import resolve_command
if resolve_command(clean) is not None:
logger.warning(
"Context engine '%s' tried to register command '/%s' which conflicts "
"with a built-in command. Skipping.",
self._engine_name, clean,
)
return
except Exception:
pass
try:
from hermes_cli.plugins import get_plugin_manager
manager = get_plugin_manager()
if clean in manager._plugin_commands:
# Don't clobber a regular plugin's command — same conflict
# policy the plugin system uses for plugin-vs-plugin collisions.
logger.warning(
"Context engine '%s' tried to register command '/%s' which "
"is already registered by a plugin. Skipping.",
self._engine_name, clean,
)
return
manager._plugin_commands[clean] = {
"handler": handler,
"description": description or "Context engine command",
"plugin": f"context-engine:{self._engine_name}",
"args_hint": (args_hint or "").strip(),
}
self._registered_commands.append(clean)
logger.debug(
"Context engine '%s' registered command: /%s",
self._engine_name, clean,
)
except Exception as exc:
logger.debug(
"Context engine '%s' could not register /%s: %s",
self._engine_name, clean, exc,
)
# No-op for other registration methods
def register_tool(self, *args, **kwargs):
pass

View file

@ -527,7 +527,81 @@ class AIAgent:
"Session DB creation failed (will retry next turn): %s", e
)
def reset_session_state(self):
def _transition_context_engine_session(
self,
*,
old_session_id: Optional[str] = None,
new_session_id: Optional[str] = None,
previous_messages: Optional[list] = None,
carry_over_context: bool = False,
reset_engine: bool = True,
**extra_context,
) -> None:
"""Notify the active context engine about a host session transition.
Generic host-side lifecycle helper. The built-in compressor keeps its
existing reset behavior; plugin engines that implement richer hooks
(``on_session_end``, ``on_session_reset``, ``on_session_start``,
``carry_over_new_session_context``) can flush old-session state,
reset runtime counters, bind to the new session, and optionally
carry retained context forward.
"""
engine = getattr(self, "context_compressor", None)
if not engine:
return
if old_session_id and previous_messages is not None and hasattr(engine, "on_session_end"):
try:
engine.on_session_end(old_session_id, previous_messages)
except Exception as exc:
logger.debug("context engine on_session_end during transition: %s", exc)
if reset_engine and hasattr(engine, "on_session_reset"):
try:
engine.on_session_reset()
except Exception as exc:
logger.debug("context engine on_session_reset during transition: %s", exc)
should_start = bool(
old_session_id
or previous_messages is not None
or carry_over_context
or extra_context
)
target_session_id = new_session_id or getattr(self, "session_id", "") or ""
if should_start and target_session_id and hasattr(engine, "on_session_start"):
start_context = {
"old_session_id": old_session_id,
"carry_over_context": carry_over_context,
"platform": getattr(self, "platform", None) or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
"model": getattr(self, "model", ""),
"context_length": getattr(engine, "context_length", None),
"conversation_id": getattr(self, "_gateway_session_key", None),
}
start_context.update(extra_context)
start_context = {k: v for k, v in start_context.items() if v not in (None, "")}
try:
engine.on_session_start(target_session_id, **start_context)
except Exception as exc:
logger.debug("context engine on_session_start during transition: %s", exc)
if (
carry_over_context
and old_session_id
and target_session_id
and hasattr(engine, "carry_over_new_session_context")
):
try:
engine.carry_over_new_session_context(old_session_id, target_session_id)
except Exception as exc:
logger.debug("context engine carry_over_new_session_context during transition: %s", exc)
def reset_session_state(
self,
previous_messages: Optional[list] = None,
old_session_id: Optional[str] = None,
carry_over_context: bool = False,
):
"""Reset all session-scoped token counters to 0 for a fresh session.
This method encapsulates the reset logic for all session-level metrics
@ -541,9 +615,12 @@ class AIAgent:
The method safely handles optional attributes (e.g., context compressor)
using ``hasattr`` checks.
This keeps the counter reset logic DRY and maintainable in one place
rather than scattering it across multiple methods.
When ``previous_messages`` / ``old_session_id`` / ``carry_over_context``
are provided, the active context engine is notified through the
full transition lifecycle (``_transition_context_engine_session``)
instead of a bare reset. Default callers pass nothing and keep the
existing reset-only behavior.
"""
# Token usage counters
self.session_total_tokens = 0
@ -562,9 +639,14 @@ class AIAgent:
# Turn counter (added after reset_session_state was first written — #2635)
self._user_turn_count = 0
# Context engine reset (works for both built-in compressor and plugins)
if hasattr(self, "context_compressor") and self.context_compressor:
self.context_compressor.on_session_reset()
# Context engine reset/transition (works for built-in compressor and plugins)
self._transition_context_engine_session(
old_session_id=old_session_id,
new_session_id=getattr(self, "session_id", None),
previous_messages=previous_messages,
carry_over_context=carry_over_context,
reset_engine=True,
)
def _ensure_lmstudio_runtime_loaded(self, config_context_length: Optional[int] = None) -> None:
"""

View file

@ -0,0 +1,290 @@
"""Regressions for the context-engine host contract.
These tests pin the five generic host-side guarantees that external context
engine plugins (e.g. hermes-lcm) rely on:
1. ``_transition_context_engine_session`` drives the full lifecycle
(on_session_end on_session_reset on_session_start optional
carry_over_new_session_context) and ``reset_session_state`` delegates
to it when callers pass session metadata.
2. ``on_session_start`` receives ``conversation_id`` derived from
``_gateway_session_key`` at agent init time.
3. ``conversation_loop`` forwards canonical cache buckets
(``cache_read_tokens``, ``cache_write_tokens``, ``input_tokens``,
``output_tokens``, ``reasoning_tokens``) to the engine's
``update_from_response``, on top of the legacy aggregate keys.
4. ``_discover_context_engines`` includes plugin-registered engines (not
just repo-shipped engines under ``plugins/context_engine/``).
5. The repo-shipped ``_EngineCollector`` honors ``ctx.register_command``
from a plugin engine's ``register(ctx)`` entry point and routes it
to the global plugin command registry.
"""
from __future__ import annotations
from unittest.mock import MagicMock
import pytest
from run_agent import AIAgent
def _bare_agent() -> AIAgent:
agent = object.__new__(AIAgent)
agent.session_id = "test-session"
agent.model = "fake-model"
agent.platform = "telegram"
agent._gateway_session_key = "agent:main:telegram:dm:42"
return agent
def test_transition_runs_full_lifecycle_in_order():
"""End → reset → start → carry_over, in that order, when all inputs apply."""
events: list[str] = []
engine = MagicMock()
engine.context_length = 200_000
engine.on_session_end.side_effect = lambda *a, **kw: events.append("on_session_end")
engine.on_session_reset.side_effect = lambda *a, **kw: events.append("on_session_reset")
engine.on_session_start.side_effect = lambda *a, **kw: events.append("on_session_start")
engine.carry_over_new_session_context.side_effect = lambda *a, **kw: events.append("carry_over")
agent = _bare_agent()
agent.context_compressor = engine
agent._transition_context_engine_session(
old_session_id="old-sid",
new_session_id="new-sid",
previous_messages=[{"role": "user", "content": "hi"}],
carry_over_context=True,
)
assert events == [
"on_session_end",
"on_session_reset",
"on_session_start",
"carry_over",
]
def test_transition_passes_conversation_id_from_gateway_session_key():
"""on_session_start receives ``conversation_id`` from ``_gateway_session_key``."""
engine = MagicMock()
engine.context_length = 200_000
captured: dict = {}
engine.on_session_start.side_effect = lambda sid, **kw: captured.update(kw)
agent = _bare_agent()
agent.context_compressor = engine
agent._transition_context_engine_session(
old_session_id="old-sid",
new_session_id="new-sid",
previous_messages=[{"role": "user", "content": "hi"}],
)
assert captured.get("conversation_id") == "agent:main:telegram:dm:42"
assert captured.get("old_session_id") == "old-sid"
assert captured.get("platform") == "telegram"
def test_transition_skips_optional_hooks_when_engine_lacks_them():
"""Engines that don't implement on_session_end/carry_over still work."""
class MinimalEngine:
def __init__(self):
self.context_length = 100_000
self.reset_called = False
self.start_called_with = None
def on_session_reset(self):
self.reset_called = True
def on_session_start(self, sid, **kw):
self.start_called_with = (sid, kw)
engine = MinimalEngine()
agent = _bare_agent()
agent.context_compressor = engine
# Should not raise even though on_session_end / carry_over are missing.
agent._transition_context_engine_session(
old_session_id="old",
new_session_id="new",
previous_messages=[{"role": "user", "content": "hi"}],
carry_over_context=True,
)
assert engine.reset_called is True
assert engine.start_called_with is not None
new_sid, kw = engine.start_called_with
assert new_sid == "new"
assert kw.get("old_session_id") == "old"
def test_reset_session_state_delegates_to_transition_when_args_provided():
"""``reset_session_state(previous_messages=..., old_session_id=...)`` fires full lifecycle."""
engine = MagicMock()
engine.context_length = 100_000
agent = _bare_agent()
agent.context_compressor = engine
agent.reset_session_state(
previous_messages=[{"role": "user", "content": "hi"}],
old_session_id="old-sid",
)
assert engine.on_session_end.called
assert engine.on_session_reset.called
assert engine.on_session_start.called
# No carry_over_context, so carry_over hook NOT called.
assert not engine.carry_over_new_session_context.called
def test_reset_session_state_default_call_only_resets():
"""Bare ``reset_session_state()`` still only resets the engine (no end/start)."""
engine = MagicMock()
engine.context_length = 100_000
agent = _bare_agent()
agent.context_compressor = engine
agent.reset_session_state()
assert engine.on_session_reset.called
assert not engine.on_session_end.called
assert not engine.on_session_start.called
def test_update_from_response_forwards_canonical_cache_buckets():
"""conversation_loop passes cache_read/write/reasoning tokens to engine."""
# Test the contract directly: a usage_dict built from CanonicalUsage must
# contain the canonical buckets in addition to the legacy keys. We don't
# spin up the full conversation loop; we just verify the dict shape.
from agent.usage_pricing import CanonicalUsage
canonical = CanonicalUsage(
input_tokens=1000,
output_tokens=500,
cache_read_tokens=800,
cache_write_tokens=200,
reasoning_tokens=50,
)
usage_dict = {
"prompt_tokens": canonical.prompt_tokens,
"completion_tokens": canonical.output_tokens,
"total_tokens": canonical.total_tokens,
"input_tokens": canonical.input_tokens,
"output_tokens": canonical.output_tokens,
"cache_read_tokens": canonical.cache_read_tokens,
"cache_write_tokens": canonical.cache_write_tokens,
"reasoning_tokens": canonical.reasoning_tokens,
}
# Legacy keys present
assert usage_dict["prompt_tokens"] == canonical.prompt_tokens
assert usage_dict["completion_tokens"] == 500
assert usage_dict["total_tokens"] == canonical.total_tokens
# Canonical cache + reasoning buckets present
assert usage_dict["cache_read_tokens"] == 800
assert usage_dict["cache_write_tokens"] == 200
assert usage_dict["reasoning_tokens"] == 50
assert usage_dict["input_tokens"] == 1000
assert usage_dict["output_tokens"] == 500
def test_discover_context_engines_includes_plugin_registered_engines(monkeypatch):
"""Plugin-registered context engines appear in the ``hermes plugins`` picker."""
from hermes_cli import plugins_cmd
fake_repo = lambda: [("compressor", "built-in", True)]
class FakePluginEngine:
name = "lcm"
monkeypatch.setattr(
"plugins.context_engine.discover_context_engines",
fake_repo,
)
monkeypatch.setattr(
"hermes_cli.plugins.discover_plugins",
lambda *_a, **_kw: None,
)
monkeypatch.setattr(
"hermes_cli.plugins.get_plugin_context_engine",
lambda: FakePluginEngine(),
)
engines = plugins_cmd._discover_context_engines()
names = [n for n, _desc in engines]
assert "compressor" in names
assert "lcm" in names
def test_discover_context_engines_dedupes_by_name(monkeypatch):
"""Repo-shipped engine wins when name collides with a plugin-registered one."""
from hermes_cli import plugins_cmd
class FakePluginEngine:
name = "compressor" # same name as repo-shipped
monkeypatch.setattr(
"plugins.context_engine.discover_context_engines",
lambda: [("compressor", "built-in compressor", True)],
)
monkeypatch.setattr(
"hermes_cli.plugins.discover_plugins",
lambda *_a, **_kw: None,
)
monkeypatch.setattr(
"hermes_cli.plugins.get_plugin_context_engine",
lambda: FakePluginEngine(),
)
engines = plugins_cmd._discover_context_engines()
# Only one entry — the repo-shipped one. Description is preserved.
assert engines == [("compressor", "built-in compressor")]
def test_engine_collector_forwards_register_command_to_plugin_manager():
"""A plugin context engine can register a slash command via ``ctx.register_command``."""
from plugins.context_engine import _EngineCollector
from hermes_cli.plugins import get_plugin_manager
handler = lambda raw_args: f"echo: {raw_args}"
collector = _EngineCollector(engine_name="my-lcm")
collector.register_command(
"my-lcm-test-cmd",
handler,
description="test command from a context engine",
args_hint="<msg>",
)
manager = get_plugin_manager()
try:
assert "my-lcm-test-cmd" in manager._plugin_commands
entry = manager._plugin_commands["my-lcm-test-cmd"]
assert entry["handler"] is handler
assert entry["args_hint"] == "<msg>"
assert entry["plugin"] == "context-engine:my-lcm"
finally:
# Clean up so we don't leak the registration across tests.
manager._plugin_commands.pop("my-lcm-test-cmd", None)
def test_engine_collector_rejects_builtin_command_conflicts():
"""Context engine cannot shadow built-in slash commands like /help."""
from plugins.context_engine import _EngineCollector
from hermes_cli.plugins import get_plugin_manager
collector = _EngineCollector(engine_name="my-lcm")
collector.register_command("help", lambda *_: "shadow")
manager = get_plugin_manager()
# Must NOT have overwritten / registered against built-in /help.
assert "help" not in manager._plugin_commands or \
manager._plugin_commands["help"].get("plugin") != "context-engine:my-lcm"