diff --git a/agent/agent_init.py b/agent/agent_init.py index bcad584e87c..79b5522a292 100644 --- a/agent/agent_init.py +++ b/agent/agent_init.py @@ -1522,6 +1522,7 @@ def init_agent( platform=agent.platform or "cli", model=agent.model, context_length=getattr(agent.context_compressor, "context_length", 0), + conversation_id=getattr(agent, "_gateway_session_key", None), ) except Exception as _ce_err: _ra().logger.debug("Context engine on_session_start: %s", _ce_err) diff --git a/agent/context_engine.py b/agent/context_engine.py index c30a7a84752..bb426fc189d 100644 --- a/agent/context_engine.py +++ b/agent/context_engine.py @@ -71,7 +71,12 @@ class ContextEngine(ABC): def update_from_response(self, usage: Dict[str, Any]) -> None: """Update tracked token usage from an API response. - Called after every LLM call with the usage dict from the response. + Called after every LLM call with a normalized usage dict. The legacy + keys ``prompt_tokens``, ``completion_tokens``, and ``total_tokens`` + are always present. Newer hosts also include canonical buckets: + ``input_tokens``, ``output_tokens``, ``cache_read_tokens``, + ``cache_write_tokens``, and ``reasoning_tokens``. Engines should + treat those fields as optional for compatibility with older hosts. """ @abstractmethod diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index a620f343e99..e11dc7c171d 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -421,6 +421,7 @@ def compress_context( agent.session_id or "", boundary_reason="compression", old_session_id=_old_sid, + conversation_id=getattr(agent, "_gateway_session_key", None), ) except Exception as _ce_err: logger.debug("context engine on_session_start (compression): %s", _ce_err) diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index f7422b0f98b..56da202de83 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -1769,10 +1769,19 @@ def run_conversation( prompt_tokens = canonical_usage.prompt_tokens completion_tokens = canonical_usage.output_tokens total_tokens = canonical_usage.total_tokens + # Forward canonical token + cache buckets so context engines + # can make decisions on cache hit ratios / reasoning costs, + # not just legacy aggregate tokens. Legacy keys stay for + # back-compat with engines that only read prompt/completion/total. usage_dict = { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens, + "input_tokens": canonical_usage.input_tokens, + "output_tokens": canonical_usage.output_tokens, + "cache_read_tokens": canonical_usage.cache_read_tokens, + "cache_write_tokens": canonical_usage.cache_write_tokens, + "reasoning_tokens": canonical_usage.reasoning_tokens, } agent.context_compressor.update_from_response(usage_dict) diff --git a/hermes_cli/plugins_cmd.py b/hermes_cli/plugins_cmd.py index 937fc7f7f64..d3f7b0803cb 100644 --- a/hermes_cli/plugins_cmd.py +++ b/hermes_cli/plugins_cmd.py @@ -864,12 +864,35 @@ def _discover_memory_providers() -> list[tuple[str, str]]: def _discover_context_engines() -> list[tuple[str, str]]: - """Return [(name, description), ...] for available context engines.""" + """Return [(name, description), ...] for available context engines. + + Includes repo-shipped engines from ``plugins/context_engine/`` AND + plugin-registered engines (third-party engines installed as Hermes + plugins via ``ctx.register_context_engine``). Repo-shipped descriptions + win when a plugin-registered engine collides on name. + """ + engines: list[tuple[str, str]] = [] + seen: set[str] = set() + try: from plugins.context_engine import discover_context_engines - return [(name, desc) for name, desc, _avail in discover_context_engines()] + for name, desc, _avail in discover_context_engines(): + if name not in seen: + engines.append((name, desc)) + seen.add(name) except Exception: - return [] + pass + + try: + from hermes_cli.plugins import discover_plugins, get_plugin_context_engine + discover_plugins() + plugin_engine = get_plugin_context_engine() + if plugin_engine and getattr(plugin_engine, "name", None) and plugin_engine.name not in seen: + engines.append((plugin_engine.name, "installed plugin")) + except Exception: + pass + + return engines def _get_current_memory_provider() -> str: diff --git a/plugins/context_engine/__init__.py b/plugins/context_engine/__init__.py index da9206dc349..906ade4a34c 100644 --- a/plugins/context_engine/__init__.py +++ b/plugins/context_engine/__init__.py @@ -174,7 +174,7 @@ def _load_engine_from_dir(engine_dir: Path) -> Optional["ContextEngine"]: # Try register(ctx) pattern first (how plugins are written) if hasattr(mod, "register"): - collector = _EngineCollector() + collector = _EngineCollector(engine_name=name) try: mod.register(collector) if collector.engine: @@ -197,14 +197,80 @@ def _load_engine_from_dir(engine_dir: Path) -> Optional["ContextEngine"]: class _EngineCollector: - """Fake plugin context that captures register_context_engine calls.""" + """Fake plugin context that captures register_context_engine calls. - def __init__(self): + Plugin context engines using the standard ``register(ctx)`` pattern may + also call ``ctx.register_command(...)`` to expose slash commands (e.g. + ``/lcm``). Forward those to the global plugin command registry so they + behave identically to commands registered by normal plugins. + """ + + def __init__(self, engine_name: str = ""): self.engine = None + self._engine_name = engine_name or "context_engine" + self._registered_commands: list[str] = [] def register_context_engine(self, engine): self.engine = engine + def register_command( + self, + name: str, + handler, + description: str = "", + args_hint: str = "", + ) -> None: + """Forward to the global plugin command registry.""" + clean = (name or "").lower().strip().lstrip("/").replace(" ", "-") + if not clean: + logger.warning( + "Context engine '%s' tried to register a command with an empty name.", + self._engine_name, + ) + return + + # Reject conflicts with built-in commands. + try: + from hermes_cli.commands import resolve_command + if resolve_command(clean) is not None: + logger.warning( + "Context engine '%s' tried to register command '/%s' which conflicts " + "with a built-in command. Skipping.", + self._engine_name, clean, + ) + return + except Exception: + pass + + try: + from hermes_cli.plugins import get_plugin_manager + manager = get_plugin_manager() + if clean in manager._plugin_commands: + # Don't clobber a regular plugin's command — same conflict + # policy the plugin system uses for plugin-vs-plugin collisions. + logger.warning( + "Context engine '%s' tried to register command '/%s' which " + "is already registered by a plugin. Skipping.", + self._engine_name, clean, + ) + return + manager._plugin_commands[clean] = { + "handler": handler, + "description": description or "Context engine command", + "plugin": f"context-engine:{self._engine_name}", + "args_hint": (args_hint or "").strip(), + } + self._registered_commands.append(clean) + logger.debug( + "Context engine '%s' registered command: /%s", + self._engine_name, clean, + ) + except Exception as exc: + logger.debug( + "Context engine '%s' could not register /%s: %s", + self._engine_name, clean, exc, + ) + # No-op for other registration methods def register_tool(self, *args, **kwargs): pass diff --git a/run_agent.py b/run_agent.py index d238458b5c7..f43a7958795 100644 --- a/run_agent.py +++ b/run_agent.py @@ -527,7 +527,81 @@ class AIAgent: "Session DB creation failed (will retry next turn): %s", e ) - def reset_session_state(self): + def _transition_context_engine_session( + self, + *, + old_session_id: Optional[str] = None, + new_session_id: Optional[str] = None, + previous_messages: Optional[list] = None, + carry_over_context: bool = False, + reset_engine: bool = True, + **extra_context, + ) -> None: + """Notify the active context engine about a host session transition. + + Generic host-side lifecycle helper. The built-in compressor keeps its + existing reset behavior; plugin engines that implement richer hooks + (``on_session_end``, ``on_session_reset``, ``on_session_start``, + ``carry_over_new_session_context``) can flush old-session state, + reset runtime counters, bind to the new session, and optionally + carry retained context forward. + """ + engine = getattr(self, "context_compressor", None) + if not engine: + return + + if old_session_id and previous_messages is not None and hasattr(engine, "on_session_end"): + try: + engine.on_session_end(old_session_id, previous_messages) + except Exception as exc: + logger.debug("context engine on_session_end during transition: %s", exc) + + if reset_engine and hasattr(engine, "on_session_reset"): + try: + engine.on_session_reset() + except Exception as exc: + logger.debug("context engine on_session_reset during transition: %s", exc) + + should_start = bool( + old_session_id + or previous_messages is not None + or carry_over_context + or extra_context + ) + target_session_id = new_session_id or getattr(self, "session_id", "") or "" + if should_start and target_session_id and hasattr(engine, "on_session_start"): + start_context = { + "old_session_id": old_session_id, + "carry_over_context": carry_over_context, + "platform": getattr(self, "platform", None) or os.environ.get("HERMES_SESSION_SOURCE", "cli"), + "model": getattr(self, "model", ""), + "context_length": getattr(engine, "context_length", None), + "conversation_id": getattr(self, "_gateway_session_key", None), + } + start_context.update(extra_context) + start_context = {k: v for k, v in start_context.items() if v not in (None, "")} + try: + engine.on_session_start(target_session_id, **start_context) + except Exception as exc: + logger.debug("context engine on_session_start during transition: %s", exc) + + if ( + carry_over_context + and old_session_id + and target_session_id + and hasattr(engine, "carry_over_new_session_context") + ): + try: + engine.carry_over_new_session_context(old_session_id, target_session_id) + except Exception as exc: + logger.debug("context engine carry_over_new_session_context during transition: %s", exc) + + def reset_session_state( + self, + previous_messages: Optional[list] = None, + old_session_id: Optional[str] = None, + carry_over_context: bool = False, + ): """Reset all session-scoped token counters to 0 for a fresh session. This method encapsulates the reset logic for all session-level metrics @@ -541,9 +615,12 @@ class AIAgent: The method safely handles optional attributes (e.g., context compressor) using ``hasattr`` checks. - - This keeps the counter reset logic DRY and maintainable in one place - rather than scattering it across multiple methods. + + When ``previous_messages`` / ``old_session_id`` / ``carry_over_context`` + are provided, the active context engine is notified through the + full transition lifecycle (``_transition_context_engine_session``) + instead of a bare reset. Default callers pass nothing and keep the + existing reset-only behavior. """ # Token usage counters self.session_total_tokens = 0 @@ -562,9 +639,14 @@ class AIAgent: # Turn counter (added after reset_session_state was first written — #2635) self._user_turn_count = 0 - # Context engine reset (works for both built-in compressor and plugins) - if hasattr(self, "context_compressor") and self.context_compressor: - self.context_compressor.on_session_reset() + # Context engine reset/transition (works for built-in compressor and plugins) + self._transition_context_engine_session( + old_session_id=old_session_id, + new_session_id=getattr(self, "session_id", None), + previous_messages=previous_messages, + carry_over_context=carry_over_context, + reset_engine=True, + ) def _ensure_lmstudio_runtime_loaded(self, config_context_length: Optional[int] = None) -> None: """ diff --git a/tests/agent/test_context_engine_host_contract.py b/tests/agent/test_context_engine_host_contract.py new file mode 100644 index 00000000000..6ab1a22261c --- /dev/null +++ b/tests/agent/test_context_engine_host_contract.py @@ -0,0 +1,290 @@ +"""Regressions for the context-engine host contract. + +These tests pin the five generic host-side guarantees that external context +engine plugins (e.g. hermes-lcm) rely on: + +1. ``_transition_context_engine_session`` drives the full lifecycle + (on_session_end → on_session_reset → on_session_start → optional + carry_over_new_session_context) and ``reset_session_state`` delegates + to it when callers pass session metadata. + +2. ``on_session_start`` receives ``conversation_id`` derived from + ``_gateway_session_key`` at agent init time. + +3. ``conversation_loop`` forwards canonical cache buckets + (``cache_read_tokens``, ``cache_write_tokens``, ``input_tokens``, + ``output_tokens``, ``reasoning_tokens``) to the engine's + ``update_from_response``, on top of the legacy aggregate keys. + +4. ``_discover_context_engines`` includes plugin-registered engines (not + just repo-shipped engines under ``plugins/context_engine/``). + +5. The repo-shipped ``_EngineCollector`` honors ``ctx.register_command`` + from a plugin engine's ``register(ctx)`` entry point and routes it + to the global plugin command registry. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from run_agent import AIAgent + + +def _bare_agent() -> AIAgent: + agent = object.__new__(AIAgent) + agent.session_id = "test-session" + agent.model = "fake-model" + agent.platform = "telegram" + agent._gateway_session_key = "agent:main:telegram:dm:42" + return agent + + +def test_transition_runs_full_lifecycle_in_order(): + """End → reset → start → carry_over, in that order, when all inputs apply.""" + events: list[str] = [] + engine = MagicMock() + engine.context_length = 200_000 + engine.on_session_end.side_effect = lambda *a, **kw: events.append("on_session_end") + engine.on_session_reset.side_effect = lambda *a, **kw: events.append("on_session_reset") + engine.on_session_start.side_effect = lambda *a, **kw: events.append("on_session_start") + engine.carry_over_new_session_context.side_effect = lambda *a, **kw: events.append("carry_over") + + agent = _bare_agent() + agent.context_compressor = engine + + agent._transition_context_engine_session( + old_session_id="old-sid", + new_session_id="new-sid", + previous_messages=[{"role": "user", "content": "hi"}], + carry_over_context=True, + ) + + assert events == [ + "on_session_end", + "on_session_reset", + "on_session_start", + "carry_over", + ] + + +def test_transition_passes_conversation_id_from_gateway_session_key(): + """on_session_start receives ``conversation_id`` from ``_gateway_session_key``.""" + engine = MagicMock() + engine.context_length = 200_000 + captured: dict = {} + engine.on_session_start.side_effect = lambda sid, **kw: captured.update(kw) + + agent = _bare_agent() + agent.context_compressor = engine + + agent._transition_context_engine_session( + old_session_id="old-sid", + new_session_id="new-sid", + previous_messages=[{"role": "user", "content": "hi"}], + ) + + assert captured.get("conversation_id") == "agent:main:telegram:dm:42" + assert captured.get("old_session_id") == "old-sid" + assert captured.get("platform") == "telegram" + + +def test_transition_skips_optional_hooks_when_engine_lacks_them(): + """Engines that don't implement on_session_end/carry_over still work.""" + class MinimalEngine: + def __init__(self): + self.context_length = 100_000 + self.reset_called = False + self.start_called_with = None + + def on_session_reset(self): + self.reset_called = True + + def on_session_start(self, sid, **kw): + self.start_called_with = (sid, kw) + + engine = MinimalEngine() + agent = _bare_agent() + agent.context_compressor = engine + + # Should not raise even though on_session_end / carry_over are missing. + agent._transition_context_engine_session( + old_session_id="old", + new_session_id="new", + previous_messages=[{"role": "user", "content": "hi"}], + carry_over_context=True, + ) + + assert engine.reset_called is True + assert engine.start_called_with is not None + new_sid, kw = engine.start_called_with + assert new_sid == "new" + assert kw.get("old_session_id") == "old" + + +def test_reset_session_state_delegates_to_transition_when_args_provided(): + """``reset_session_state(previous_messages=..., old_session_id=...)`` fires full lifecycle.""" + engine = MagicMock() + engine.context_length = 100_000 + + agent = _bare_agent() + agent.context_compressor = engine + + agent.reset_session_state( + previous_messages=[{"role": "user", "content": "hi"}], + old_session_id="old-sid", + ) + + assert engine.on_session_end.called + assert engine.on_session_reset.called + assert engine.on_session_start.called + # No carry_over_context, so carry_over hook NOT called. + assert not engine.carry_over_new_session_context.called + + +def test_reset_session_state_default_call_only_resets(): + """Bare ``reset_session_state()`` still only resets the engine (no end/start).""" + engine = MagicMock() + engine.context_length = 100_000 + + agent = _bare_agent() + agent.context_compressor = engine + + agent.reset_session_state() + + assert engine.on_session_reset.called + assert not engine.on_session_end.called + assert not engine.on_session_start.called + + +def test_update_from_response_forwards_canonical_cache_buckets(): + """conversation_loop passes cache_read/write/reasoning tokens to engine.""" + # Test the contract directly: a usage_dict built from CanonicalUsage must + # contain the canonical buckets in addition to the legacy keys. We don't + # spin up the full conversation loop; we just verify the dict shape. + from agent.usage_pricing import CanonicalUsage + + canonical = CanonicalUsage( + input_tokens=1000, + output_tokens=500, + cache_read_tokens=800, + cache_write_tokens=200, + reasoning_tokens=50, + ) + usage_dict = { + "prompt_tokens": canonical.prompt_tokens, + "completion_tokens": canonical.output_tokens, + "total_tokens": canonical.total_tokens, + "input_tokens": canonical.input_tokens, + "output_tokens": canonical.output_tokens, + "cache_read_tokens": canonical.cache_read_tokens, + "cache_write_tokens": canonical.cache_write_tokens, + "reasoning_tokens": canonical.reasoning_tokens, + } + + # Legacy keys present + assert usage_dict["prompt_tokens"] == canonical.prompt_tokens + assert usage_dict["completion_tokens"] == 500 + assert usage_dict["total_tokens"] == canonical.total_tokens + # Canonical cache + reasoning buckets present + assert usage_dict["cache_read_tokens"] == 800 + assert usage_dict["cache_write_tokens"] == 200 + assert usage_dict["reasoning_tokens"] == 50 + assert usage_dict["input_tokens"] == 1000 + assert usage_dict["output_tokens"] == 500 + + +def test_discover_context_engines_includes_plugin_registered_engines(monkeypatch): + """Plugin-registered context engines appear in the ``hermes plugins`` picker.""" + from hermes_cli import plugins_cmd + + fake_repo = lambda: [("compressor", "built-in", True)] + + class FakePluginEngine: + name = "lcm" + + monkeypatch.setattr( + "plugins.context_engine.discover_context_engines", + fake_repo, + ) + monkeypatch.setattr( + "hermes_cli.plugins.discover_plugins", + lambda *_a, **_kw: None, + ) + monkeypatch.setattr( + "hermes_cli.plugins.get_plugin_context_engine", + lambda: FakePluginEngine(), + ) + + engines = plugins_cmd._discover_context_engines() + names = [n for n, _desc in engines] + assert "compressor" in names + assert "lcm" in names + + +def test_discover_context_engines_dedupes_by_name(monkeypatch): + """Repo-shipped engine wins when name collides with a plugin-registered one.""" + from hermes_cli import plugins_cmd + + class FakePluginEngine: + name = "compressor" # same name as repo-shipped + + monkeypatch.setattr( + "plugins.context_engine.discover_context_engines", + lambda: [("compressor", "built-in compressor", True)], + ) + monkeypatch.setattr( + "hermes_cli.plugins.discover_plugins", + lambda *_a, **_kw: None, + ) + monkeypatch.setattr( + "hermes_cli.plugins.get_plugin_context_engine", + lambda: FakePluginEngine(), + ) + + engines = plugins_cmd._discover_context_engines() + # Only one entry — the repo-shipped one. Description is preserved. + assert engines == [("compressor", "built-in compressor")] + + +def test_engine_collector_forwards_register_command_to_plugin_manager(): + """A plugin context engine can register a slash command via ``ctx.register_command``.""" + from plugins.context_engine import _EngineCollector + from hermes_cli.plugins import get_plugin_manager + + handler = lambda raw_args: f"echo: {raw_args}" + + collector = _EngineCollector(engine_name="my-lcm") + collector.register_command( + "my-lcm-test-cmd", + handler, + description="test command from a context engine", + args_hint="", + ) + + manager = get_plugin_manager() + try: + assert "my-lcm-test-cmd" in manager._plugin_commands + entry = manager._plugin_commands["my-lcm-test-cmd"] + assert entry["handler"] is handler + assert entry["args_hint"] == "" + assert entry["plugin"] == "context-engine:my-lcm" + finally: + # Clean up so we don't leak the registration across tests. + manager._plugin_commands.pop("my-lcm-test-cmd", None) + + +def test_engine_collector_rejects_builtin_command_conflicts(): + """Context engine cannot shadow built-in slash commands like /help.""" + from plugins.context_engine import _EngineCollector + from hermes_cli.plugins import get_plugin_manager + + collector = _EngineCollector(engine_name="my-lcm") + collector.register_command("help", lambda *_: "shadow") + + manager = get_plugin_manager() + # Must NOT have overwritten / registered against built-in /help. + assert "help" not in manager._plugin_commands or \ + manager._plugin_commands["help"].get("plugin") != "context-engine:my-lcm"